基于 Workerman 的完整 MQTT 服务开发教程
之前写过一篇简单的入门 基于PHP+Webman实现MQTT与WebSocket的物联网设备接入与消息处理 ,这儿补充一下完整的
本文将基于 workerman
和 workerman/mqtt
实现一个完整的 MQTT 服务,支持以下功能:
- MQTT 服务端:接收设备上报数据,支持下发消息。
- 设备端:模拟设备,通过 MQTT 协议上报数据。
- 服务端下发消息:向设备下发指令。
- 身份验证:支持用户名和密码验证。
- SSL/TLS 加密通信:保障通信安全。
- 持久化设备数据:将设备上报的数据存储到数据库。
- 复杂消息处理逻辑:支持多种消息类型的处理。
1. 环境准备
安装依赖
安装 workerman
和 workerman/mqtt
:
composer require workerman/workerman
composer require workerman/mqtt
数据库准备
本文使用 MySQL 数据库存储设备数据。创建一个表 device_data
:
CREATE TABLE device_data (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(50) NOT NULL,
temperature FLOAT NOT NULL,
humidity FLOAT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2. 实现 MQTT 服务端
2.1 基础服务端
以下是一个支持身份验证和 SSL/TLS 加密的 MQTT 服务端实现。
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个 Worker 实例,监听 1883 端口(MQTT 默认端口)
$worker = new Worker('mqtt://0.0.0.0:1883', [
'ssl' => [
'local_cert' => '/path/to/server.crt', // SSL 证书路径
'local_pk' => '/path/to/server.key', // SSL 私钥路径
'verify_peer' => false,
]
]);
// 当客户端连接时触发、注册、验证、查询等操作...
$worker->onConnect = function (TcpConnection $connection) {
echo "New client connected: " . $connection->getRemoteIp() . "\n";
};
// 当客户端发送数据时触发
$worker->onMessage = function (TcpConnection $connection, $data) {
// 解析 MQTT 协议数据
$packet = \Workerman\Mqtt\Parser::parse($data);
// 处理不同类型的 MQTT 报文
switch ($packet['type']) {
case \Workerman\Mqtt\Packet::TYPE_CONNECT:
// 客户端连接请求
$username = $packet['username'] ?? '';
$password = $packet['password'] ?? '';
if ($username !== 'admin' || $password !== 'password') {
// 身份验证失败
echo "Authentication failed for client: " . $connection->getRemoteIp() . "\n";
$connection->close();
return;
}
echo "Client connected\n";
// 发送 CONNACK 报文
$connection->send(\Workerman\Mqtt\Packet::packConnAck(0));
break;
case \Workerman\Mqtt\Packet::TYPE_PUBLISH:
// 客户端发布消息
$topic = $packet['topic'];
$payload = $packet['payload'];
echo "Received message from client: Topic=$topic, Payload=$payload\n";
// 处理设备上报的数据
if ($topic === 'device/data') {
$data = json_decode($payload, true);
if ($data && isset($data['device_id'], $data['temperature'], $data['humidity'])) {
// 持久化数据到数据库
saveDeviceData($data);
}
}
// 回复消息
$connection->send(\Workerman\Mqtt\Packet::packPublish('server/response', 'Message received'));
break;
case \Workerman\Mqtt\Packet::TYPE_SUBSCRIBE:
// 客户端订阅主题
$topics = $packet['topics'];
echo "Client subscribed to topics: " . implode(', ', $topics) . "\n";
// 发送 SUBACK 报文
$connection->send(\Workerman\Mqtt\Packet::packSubAck($packet['message_id'], [0]));
break;
case \Workerman\Mqtt\Packet::TYPE_PINGREQ:
// 客户端发送心跳
echo "Client ping\n";
// 发送 PINGRESP 报文
$connection->send(\Workerman\Mqtt\Packet::packPingResp());
break;
case \Workerman\Mqtt\Packet::TYPE_DISCONNECT:
// 客户端断开连接
echo "Client disconnected\n";
$connection->close();
break;
}
};
// 当客户端断开连接时触发
$worker->onClose = function (TcpConnection $connection) {
echo "Client disconnected: " . $connection->getRemoteIp() . "\n";
};
// 运行 Worker
Worker::runAll();
// 持久化设备数据到数据库
function saveDeviceData($data) {
$pdo = new PDO('mysql:host=localhost;dbname=mqtt', 'root', 'password');
$stmt = $pdo->prepare('INSERT INTO device_data (device_id, temperature, humidity) VALUES (?, ?, ?)');
$stmt->execute([$data['device_id'], $data['temperature'], $data['humidity']]);
echo "Device data saved to database\n";
}
2.2 运行服务端
在命令行中运行以下命令启动 MQTT 服务端:
php mqtt_server.php start
3. 实现设备端(MQTT 客户端)
以下是一个设备端的 MQTT 客户端实现,支持 SSL/TLS 加密通信。
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个 Worker 实例
$worker = new Worker();
// 当 Worker 启动时执行
$worker->onWorkerStart = function () {
// 创建一个 MQTT 客户端实例,连接到本地 MQTT 服务端
$mqtt = new Workerman\Mqtt\Client('mqtt://127.0.0.1:1883', [
'username' => 'admin', // 用户名
'password' => 'password', // 密码
'ssl' => [
'verify_peer' => false,
]
]);
// 当连接成功时执行
$mqtt->onConnect = function ($mqtt) {
// 订阅服务端下发的主题
$mqtt->subscribe('server/response');
// 模拟设备上报数据
$mqtt->publish('device/data', json_encode([
'device_id' => '12345',
'temperature' => 25.6,
'humidity' => 60.5,
]));
};
// 当收到消息时执行
$mqtt->onMessage = function ($topic, $content) {
echo "Received message from server: Topic=$topic, Content=$content\n";
};
// 连接到 MQTT 服务端
$mqtt->connect();
};
// 运行 Worker
Worker::runAll();
运行设备端
在命令行中运行以下命令启动设备端:
php device_client.php start
4. 实现服务端下发消息(MQTT 客户端)
以下是一个服务端下发消息的 MQTT 客户端实现。
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个 Worker 实例
$worker = new Worker();
// 当 Worker 启动时执行
$worker->onWorkerStart = function () {
// 创建一个 MQTT 客户端实例,连接到本地 MQTT 服务端
$mqtt = new Workerman\Mqtt\Client('mqtt://127.0.0.1:1883', [
'username' => 'admin', // 用户名
'password' => 'password', // 密码
'ssl' => [
'verify_peer' => false,
]
]);
// 当连接成功时执行
$mqtt->onConnect = function ($mqtt) {
// 向设备下发消息
$mqtt->publish('device/command', json_encode([
'command' => 'reboot',
'timestamp' => time(),
]));
};
// 连接到 MQTT 服务端
$mqtt->connect();
};
// 运行 Worker
Worker::runAll();
运行服务端下发消息
在命令行中运行以下命令启动服务端下发消息:
php server_client.php start
版权声明:本文为原创文章,版权归 全栈开发技术博客 所有。
本文链接:https://www.lvtao.net/dev/workerman-mqtt.html
转载时须注明出处及本声明