构建实时聊天系统:基于Workerman和MongoDB的完整实现
实时聊天系统是现代Web应用中的常见功能,无论是社交网络、客户服务还是协作工具,都需要稳定高效的聊天功能。本文将详细介绍如何使用PHP的Workerman框架构建一个实时聊天系统,该系统支持多人聊天、一对一聊天,并使用MongoDB作为数据存储,同时实现完整的用户认证和权限管理。
问题分析
在构建实时聊天系统时,我们需要解决以下几个关键问题:
- 实时通信:如何实现服务器与客户端之间的实时双向通信?
- 用户认证:如何验证用户身份,确保只有合法用户才能使用聊天功能?
- 消息存储:如何高效地存储和检索聊天消息?
- 会话管理:如何管理多人聊天和一对一聊天的会话?
- 用户状态:如何跟踪用户的在线/离线状态?
- 消息路由:如何确保消息被正确地发送给目标用户或群组?
系统架构
我们的聊天系统将采用以下架构:
- 前端:HTML + CSS + JavaScript,使用WebSocket与服务器通信
- 后端:基于Workerman的PHP WebSocket服务器
- 数据库:MongoDB,用于存储用户信息、聊天记录和会话数据
- 认证机制:基于Token的身份验证
环境准备
在开始实现之前,请确保以下环境已准备就绪:
- PHP 7.0+
- Composer(PHP包管理器)
- MongoDB服务器
- MongoDB PHP驱动
- Workerman框架
可以通过以下命令安装Workerman和MongoDB PHP驱动:
composer require workerman/workerman
pecl install mongodb
实现步骤
1. 搭建基础Workerman环境
首先,我们创建一个基础的Workerman WebSocket服务器:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
// 创建一个WebSocket服务器
$ws_worker = new Worker("websocket://0.0.0.0:2346");
// 设置进程数
$ws_worker->count = 4;
// 存储用户连接的映射
$users = [];
// 当客户端连接时
$ws_worker->onConnect = function($connection) {
echo "New connection\n";
};
// 当客户端发送消息时
$ws_worker->onMessage = function($connection, $data) use (&$users) {
// 处理消息
echo "Received message: $data\n";
};
// 当客户端断开连接时
$ws_worker->onClose = function($connection) use (&$users) {
// 处理断开连接
echo "Connection closed\n";
};
// 运行所有worker
Worker::runAll();
2. 设计MongoDB数据模型
我们需要设计几个主要的集合来存储数据:
- users:存储用户信息
- chat_rooms:存储聊天室信息(多人聊天)
- private_chats:存储一对一聊天信息
- messages:存储所有聊天消息
以下是这些集合的基本结构:
// users集合
{
"_id": ObjectId("..."),
"username": "user1",
"password": "$2y$10$...", // 加密后的密码
"email": "user1@example.com",
"created_at": ISODate("..."),
"last_login": ISODate("..."),
"status": "online" // online, offline, away
}
// chat_rooms集合
{
"_id": ObjectId("..."),
"name": "General Discussion",
"creator_id": ObjectId("..."),
"created_at": ISODate("..."),
"participants": [ObjectId("..."), ObjectId("...")] // 参与者ID数组
}
// private_chats集合
{
"_id": ObjectId("..."),
"user1_id": ObjectId("..."),
"user2_id": ObjectId("..."),
"created_at": ISODate("...")
}
// messages集合
{
"_id": ObjectId("..."),
"sender_id": ObjectId("..."),
"receiver_id": ObjectId("..."), // 对于一对一聊天
"room_id": ObjectId("..."), // 对于群组聊天
"content": "Hello, world!",
"timestamp": ISODate("..."),
"type": "text" // text, image, file等
}
3. 实现用户认证系统
我们需要实现用户登录和注册功能,以及基于Token的身份验证:
<?php
// MongoDB连接
$mongoClient = new MongoDB\Client("mongodb://localhost:27017");
$db = $mongoClient->chat_system;
// 用户认证类
class Auth {
private $db;
public function __construct($db) {
$this->db = $db;
}
// 用户注册
public function register($username, $password, $email) {
$usersCollection = $this->db->users;
// 检查用户名是否已存在
$existingUser = $usersCollection->findOne(['username' => $username]);
if ($existingUser) {
return ['success' => false, 'message' => 'Username already exists'];
}
// 加密密码
$hashedPassword = password_hash($password, PASSWORD_DEFAULT);
// 创建新用户
$result = $usersCollection->insertOne([
'username' => $username,
'password' => $hashedPassword,
'email' => $email,
'created_at' => new MongoDB\BSON\UTCDateTime(),
'status' => 'offline'
]);
return ['success' => true, 'user_id' => $result->getInsertedId()];
}
// 用户登录
public function login($username, $password) {
$usersCollection = $this->db->users;
// 查找用户
$user = $usersCollection->findOne(['username' => $username]);
if (!$user) {
return ['success' => false, 'message' => 'User not found'];
}
// 验证密码
if (!password_verify($password, $user->password)) {
return ['success' => false, 'message' => 'Invalid password'];
}
// 生成Token
$token = bin2hex(random_bytes(32));
// 更新用户状态和Token
$usersCollection->updateOne(
['_id' => $user->_id],
[
'$set' => [
'status' => 'online',
'token' => $token,
'last_login' => new MongoDB\BSON\UTCDateTime()
]
]
);
return [
'success' => true,
'token' => $token,
'user_id' => (string)$user->_id,
'username' => $user->username
];
}
// 验证Token
public function verifyToken($token) {
$usersCollection = $this->db->users;
$user = $usersCollection->findOne(['token' => $token]);
if (!$user) {
return ['success' => false, 'message' => 'Invalid token'];
}
return ['success' => true, 'user_id' => (string)$user->_id];
}
// 用户登出
public function logout($token) {
$usersCollection = $this->db->users;
// 清除Token并更新状态
$usersCollection->updateOne(
['token' => $token],
[
'$set' => [
'status' => 'offline',
'token' => null
]
]
);
return ['success' => true];
}
}
4. 实现多人聊天功能
多人聊天功能需要创建聊天室、加入聊天室、发送消息到聊天室等功能:
<?php
// 聊天室管理类
class ChatRoomManager {
private $db;
public function __construct($db) {
$this->db = $db;
}
// 创建聊天室
public function createRoom($name, $creatorId) {
$roomsCollection = $this->db->chat_rooms;
$result = $roomsCollection->insertOne([
'name' => $name,
'creator_id' => new MongoDB\BSON\ObjectId($creatorId),
'created_at' => new MongoDB\BSON\UTCDateTime(),
'participants' => [new MongoDB\BSON\ObjectId($creatorId)]
]);
return ['success' => true, 'room_id' => (string)$result->getInsertedId()];
}
// 加入聊天室
public function joinRoom($roomId, $userId) {
$roomsCollection = $this->db->chat_rooms;
$result = $roomsCollection->updateOne(
['_id' => new MongoDB\BSON\ObjectId($roomId)],
['$addToSet' => ['participants' => new MongoDB\BSON\ObjectId($userId)]]
);
if ($result->getModifiedCount() === 0) {
return ['success' => false, 'message' => 'Failed to join room'];
}
return ['success' => true];
}
// 发送消息到聊天室
public function sendMessageToRoom($roomId, $senderId, $content) {
$messagesCollection = $this->db->messages;
$result = $messagesCollection->insertOne([
'sender_id' => new MongoDB\BSON\ObjectId($senderId),
'room_id' => new MongoDB\BSON\ObjectId($roomId),
'content' => $content,
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'type' => 'text'
]);
return ['success' => true, 'message_id' => (string)$result->getInsertedId()];
}
// 获取聊天室消息历史
public function getRoomMessages($roomId, $limit = 50, $offset = 0) {
$messagesCollection = $this->db->messages;
$messages = $messagesCollection->find(
['room_id' => new MongoDB\BSON\ObjectId($roomId)],
[
'sort' => ['timestamp' => -1],
'limit' => $limit,
'skip' => $offset
]
);
$messageList = [];
foreach ($messages as $message) {
$messageList[] = [
'message_id' => (string)$message->_id,
'sender_id' => (string)$message->sender_id,
'content' => $message->content,
'timestamp' => $message->timestamp->toDateTime()->format('Y-m-d H:i:s'),
'type' => $message->type
];
}
// 反转数组,使消息按时间顺序排列
$messageList = array_reverse($messageList);
return ['success' => true, 'messages' => $messageList];
}
}
5. 实现一对一聊天功能
一对一聊天功能需要创建私聊会话、发送私聊消息等功能:
<?php
// 私聊管理类
class PrivateChatManager {
private $db;
public function __construct($db) {
$this->db = $db;
}
// 获取或创建私聊会话
public function getOrCreatePrivateChat($user1Id, $user2Id) {
$privateChatsCollection = $this->db->private_chats;
// 查找是否已存在这两个用户之间的私聊会话
$chat = $privateChatsCollection->findOne([
'$or' => [
['user1_id' => new MongoDB\BSON\ObjectId($user1Id), 'user2_id' => new MongoDB\BSON\ObjectId($user2Id)],
['user1_id' => new MongoDB\BSON\ObjectId($user2Id), 'user2_id' => new MongoDB\BSON\ObjectId($user1Id)]
]
]);
if ($chat) {
return ['success' => true, 'chat_id' => (string)$chat->_id];
}
// 创建新的私聊会话
$result = $privateChatsCollection->insertOne([
'user1_id' => new MongoDB\BSON\ObjectId($user1Id),
'user2_id' => new MongoDB\BSON\ObjectId($user2Id),
'created_at' => new MongoDB\BSON\UTCDateTime()
]);
return ['success' => true, 'chat_id' => (string)$result->getInsertedId()];
}
// 发送私聊消息
public function sendPrivateMessage($chatId, $senderId, $receiverId, $content) {
$messagesCollection = $this->db->messages;
$result = $messagesCollection->insertOne([
'sender_id' => new MongoDB\BSON\ObjectId($senderId),
'receiver_id' => new MongoDB\BSON\ObjectId($receiverId),
'chat_id' => new MongoDB\BSON\ObjectId($chatId),
'content' => $content,
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'type' => 'text'
]);
return ['success' => true, 'message_id' => (string)$result->getInsertedId()];
}
// 获取私聊消息历史
public function getPrivateMessages($chatId, $limit = 50, $offset = 0) {
$messagesCollection = $this->db->messages;
$messages = $messagesCollection->find(
['chat_id' => new MongoDB\BSON\ObjectId($chatId)],
[
'sort' => ['timestamp' => -1],
'limit' => $limit,
'skip' => $offset
]
);
$messageList = [];
foreach ($messages as $message) {
$messageList[] = [
'message_id' => (string)$message->_id,
'sender_id' => (string)$message->sender_id,
'receiver_id' => (string)$message->receiver_id,
'content' => $message->content,
'timestamp' => $message->timestamp->toDateTime()->format('Y-m-d H:i:s'),
'type' => $message->type
];
}
// 反转数组,使消息按时间顺序排列
$messageList = array_reverse($messageList);
return ['success' => true, 'messages' => $messageList];
}
}
6. 实现用户状态管理(登录、退出)
我们需要跟踪用户的在线状态,并在用户连接或断开连接时更新:
<?php
// 用户状态管理类
class UserStatusManager {
private $db;
private $users; // 存储用户连接的映射
public function __construct($db, &$users) {
$this->db = $db;
$this->users = &$users;
}
// 用户上线
public function userOnline($userId, $connection) {
$usersCollection = $this->db->users;
// 更新数据库中的用户状态
$usersCollection->updateOne(
['_id' => new MongoDB\BSON\ObjectId($userId)],
['$set' => ['status' => 'online']]
);
// 存储用户连接
$this->users[$userId] = $connection;
return ['success' => true];
}
// 用户下线
public function userOffline($userId) {
$usersCollection = $this->db->users;
// 更新数据库中的用户状态
$usersCollection->updateOne(
['_id' => new MongoDB\BSON\ObjectId($userId)],
['$set' => ['status' => 'offline']]
);
// 移除用户连接
if (isset($this->users[$userId])) {
unset($this->users[$userId]);
}
return ['success' => true];
}
// 向特定用户发送消息
public function sendToUser($userId, $message) {
if (isset($this->users[$userId])) {
$this->users[$userId]->send(json_encode($message));
return ['success' => true];
}
return ['success' => false, 'message' => 'User is not online'];
}
// 向聊天室的所有参与者发送消息
public function sendToRoom($roomId, $message, $excludeUserId = null) {
$roomsCollection = $this->db->chat_rooms;
// 获取聊天室的所有参与者
$room = $roomsCollection->findOne(
['_id' => new MongoDB\BSON\ObjectId($roomId)],
['projection' => ['participants' => 1]]
);
if (!$room) {
return ['success' => false, 'message' => 'Room not found'];
}
// 向每个在线参与者发送消息
foreach ($room->participants as $participantId) {
$participantIdStr = (string)$participantId;
// 排除指定用户
if ($excludeUserId && $participantIdStr === $excludeUserId) {
continue;
}
if (isset($this->users[$participantIdStr])) {
$this->users[$participantIdStr]->send(json_encode($message));
}
}
return ['success' => true];
}
}
7. 整合所有功能
现在,我们将所有功能整合到Workerman服务器中:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
// MongoDB连接
$mongoClient = new MongoDB\Client("mongodb://localhost:27017");
$db = $mongoClient->chat_system;
// 创建一个WebSocket服务器
$ws_worker = new Worker("websocket://0.0.0.0:2346");
// 设置进程数
$ws_worker->count = 4;
// 存储用户连接的映射
$users = [];
// 初始化管理器
$auth = new Auth($db);
$chatRoomManager = new ChatRoomManager($db);
$privateChatManager = new PrivateChatManager($db);
$userStatusManager = new UserStatusManager($db, $users);
// 当客户端连接时
$ws_worker->onConnect = function($connection) {
echo "New connection\n";
};
// 当客户端发送消息时
$ws_worker->onMessage = function($connection, $data) use (&$users, $auth, $chatRoomManager, $privateChatManager, $userStatusManager) {
// 解析JSON数据
$requestData = json_decode($data, true);
if (!$requestData) {
$connection->send(json_encode(['success' => false, 'message' => 'Invalid JSON']));
return;
}
$action = $requestData['action'] ?? '';
$token = $requestData['token'] ?? '';
// 对于不需要认证的操作
switch ($action) {
case 'register':
$result = $auth->register(
$requestData['username'] ?? '',
$requestData['password'] ?? '',
$requestData['email'] ?? ''
);
$connection->send(json_encode($result));
return;
case 'login':
$result = $auth->login(
$requestData['username'] ?? '',
$requestData['password'] ?? ''
);
if ($result['success']) {
// 用户登录成功,存储用户ID和连接的映射
$userId = $result['user_id'];
$users[$userId] = $connection;
$userStatusManager->userOnline($userId, $connection);
}
$connection->send(json_encode($result));
return;
}
// 对于需要认证的操作,先验证Token
if (empty($token)) {
$connection->send(json_encode(['success' => false, 'message' => 'Authentication required']));
return;
}
$authResult = $auth->verifyToken($token);
if (!$authResult['success']) {
$connection->send(json_encode(['success' => false, 'message' => 'Invalid token']));
return;
}
$userId = $authResult['user_id'];
// 处理需要认证的操作
switch ($action) {
case 'logout':
$result = $auth->logout($token);
if ($result['success']) {
$userStatusManager->userOffline($userId);
unset($users[$userId]);
}
$connection->send(json_encode($result));
break;
case 'create_room':
$result = $chatRoomManager->createRoom(
$requestData['room_name'] ?? '',
$userId
);
$connection->send(json_encode($result));
break;
case 'send_room_message':
$roomId = $requestData['room_id'] ?? '';
$content = $requestData['content'] ?? '';
// 保存消息到数据库
$result = $chatRoomManager->sendMessageToRoom($roomId, $userId, $content);
if ($result['success']) {
// 向聊天室的所有参与者发送消息
$messageData = [
'action' => 'new_room_message',
'room_id' => $roomId,
'message_id' => $result['message_id'],
'sender_id' => $userId,
'content' => $content,
'timestamp' => date('Y-m-d H:i:s')
];
$userStatusManager->sendToRoom($roomId, $messageData);
}
$connection->send(json_encode($result));
break;
case 'send_private_message':
$chatId = $requestData['chat_id'] ?? '';
$receiverId = $requestData['receiver_id'] ?? '';
$content = $requestData['content'] ?? '';
// 保存消息到数据库
$result = $privateChatManager->sendPrivateMessage($chatId, $userId, $receiverId, $content);
if ($result['success']) {
// 向接收者发送消息
$messageData = [
'action' => 'new_private_message',
'chat_id' => $chatId,
'message_id' => $result['message_id'],
'sender_id' => $userId,
'receiver_id' => $receiverId,
'content' => $content,
'timestamp' => date('Y-m-d H:i:s')
];
$userStatusManager->sendToUser($receiverId, $messageData);
// 也向发送者发送消息(用于在UI中显示)
$userStatusManager->sendToUser($userId, $messageData);
}
$connection->send(json_encode($result));
break;
}
};
// 当客户端断开连接时
$ws_worker->onClose = function($connection) use (&$users, $userStatusManager) {
echo "Connection closed\n";
// 查找断开连接的用户
foreach ($users as $userId => $conn) {
if ($conn === $connection) {
// 更新用户状态
$userStatusManager->userOffline($userId);
unset($users[$userId]);
break;
}
}
};
// 运行所有worker
Worker::runAll();
难点讲解
1. WebSocket连接管理与用户状态同步
在实时聊天系统中,一个关键难点是如何管理WebSocket连接与用户状态的同步。当用户连接或断开连接时,我们需要及时更新用户状态,并确保消息能够正确路由。
在我们的实现中,我们使用了全局的$users
数组来存储用户ID与WebSocket连接的映射:
// 存储用户连接的映射
$users = [];
// 用户上线
public function userOnline($userId, $connection) {
// 更新数据库中的用户状态
$usersCollection->updateOne(
['_id' => new MongoDB\BSON\ObjectId($userId)],
['$set' => ['status' => 'online']]
);
// 存储用户连接
$this->users[$userId] = $connection;
return ['success' => true];
}
这种方法在单进程环境下工作良好,但在多进程环境下(如我们设置的$ws_worker->count = 4
),每个进程都有自己的$users
数组,导致用户连接信息无法在不同进程间共享。解决方案是使用外部存储(如Redis)来共享用户连接信息,或者使用Workerman的Channel组件进行进程间通信。
2. 消息路由与实时推送
另一个难点是如何确保消息能够实时推送到正确的用户。在我们的实现中,我们通过以下方式处理:
// 向特定用户发送消息
public function sendToUser($userId, $message) {
if (isset($this->users[$userId])) {
$this->users[$userId]->send(json_encode($message));
return ['success' => true];
}
return ['success' => false, 'message' => 'User is not online'];
}
// 向聊天室的所有参与者发送消息
public function sendToRoom($roomId, $message, $excludeUserId = null) {
// 获取聊天室的所有参与者
$room = $roomsCollection->findOne(
['_id' => new MongoDB\BSON\ObjectId($roomId)],
['projection' => ['participants' => 1]]
);
// 向每个在线参与者发送消息
foreach ($room->participants as $participantId) {
$participantIdStr = (string)$participantId;
// 排除指定用户
if ($excludeUserId && $participantIdStr === $excludeUserId) {
continue;
}
if (isset($this->users[$participantIdStr])) {
$this->users[$participantIdStr]->send(json_encode($message));
}
}
return ['success' => true];
}
这种方法的效率取决于参与者的数量。对于大型聊天室,可能需要优化消息推送机制,例如使用发布/订阅模式。
3. MongoDB数据模型设计
在设计MongoDB数据模型时,我们需要平衡查询效率和数据一致性。对于聊天系统,我们选择了将消息存储在单独的集合中,并通过引用关联到用户和聊天室:
// messages集合
{
"_id": ObjectId("..."),
"sender_id": ObjectId("..."),
"receiver_id": ObjectId("..."), // 对于一对一聊天
"room_id": ObjectId("..."), // 对于群组聊天
"content": "Hello, world!",
"timestamp": ISODate("..."),
"type": "text" // text, image, file等
}
这种设计的优点是灵活性高,可以轻松支持不同类型的聊天消息。缺点是获取聊天历史时需要进行额外的查询。对于高并发场景,可能需要考虑分片或使用专门的时间序列数据库来存储消息。
4. 用户认证与安全性
在WebSocket环境中实现用户认证是一个挑战,因为WebSocket连接在建立后不会发送HTTP头信息。我们的解决方案是使用Token认证:
// 用户登录
public function login($username, $password) {
// 验证用户名和密码...
// 生成Token
$token = bin2hex(random_bytes(32));
// 更新用户状态和Token
$usersCollection->updateOne(
['_id' => $user->_id],
[
'$set' => [
'status' => 'online',
'token' => $token,
'last_login' => new MongoDB\BSON\UTCDateTime()
]
]
);
return [
'success' => true,
'token' => $token,
'user_id' => (string)$user->_id,
'username' => $user->username
];
}
为了提高安全性,可以考虑以下措施:
- 设置Token过期时间
- 使用HTTPS加密WebSocket连接(wss://)
- 实现Token刷新机制
- 限制登录尝试次数,防止暴力破解
这个系统还可以进一步扩展,例如:
- 添加文件传输功能
- 实现消息已读状态
- 添加消息推送通知
- 实现消息加密
- 添加聊天机器人功能
- 实现消息搜索功能
版权声明:本文为原创文章,版权归 全栈开发技术博客 所有。
本文链接:https://www.lvtao.net/dev/real-time-chat-system-with-workerman-and-mongodb.html
转载时须注明出处及本声明