基于 Workerman 的完整 MQTT 服务开发教程

之前写过一篇简单的入门 基于PHP+Webman实现MQTT与WebSocket的物联网设备接入与消息处理 ,这儿补充一下完整的
本文将基于 workermanworkerman/mqtt 实现一个完整的 MQTT 服务,支持以下功能:

  1. MQTT 服务端:接收设备上报数据,支持下发消息。
  2. 设备端:模拟设备,通过 MQTT 协议上报数据。
  3. 服务端下发消息:向设备下发指令。
  4. 身份验证:支持用户名和密码验证。
  5. SSL/TLS 加密通信:保障通信安全。
  6. 持久化设备数据:将设备上报的数据存储到数据库。
  7. 复杂消息处理逻辑:支持多种消息类型的处理。

1. 环境准备

安装依赖

安装 workermanworkerman/mqtt

composer require workerman/workerman
composer require workerman/mqtt

数据库准备

本文使用 MySQL 数据库存储设备数据。创建一个表 device_data

CREATE TABLE device_data (
    id INT AUTO_INCREMENT PRIMARY KEY,
    device_id VARCHAR(50) NOT NULL,
    temperature FLOAT NOT NULL,
    humidity FLOAT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

2. 实现 MQTT 服务端

2.1 基础服务端

以下是一个支持身份验证和 SSL/TLS 加密的 MQTT 服务端实现。

<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个 Worker 实例,监听 1883 端口(MQTT 默认端口)
$worker = new Worker('mqtt://0.0.0.0:1883', [
    'ssl' => [
        'local_cert'  => '/path/to/server.crt', // SSL 证书路径
        'local_pk'    => '/path/to/server.key', // SSL 私钥路径
        'verify_peer' => false,
    ]
]);

// 当客户端连接时触发、注册、验证、查询等操作...
$worker->onConnect = function (TcpConnection $connection) {
    echo "New client connected: " . $connection->getRemoteIp() . "\n";
};

// 当客户端发送数据时触发
$worker->onMessage = function (TcpConnection $connection, $data) {
    // 解析 MQTT 协议数据
    $packet = \Workerman\Mqtt\Parser::parse($data);

    // 处理不同类型的 MQTT 报文
    switch ($packet['type']) {
        case \Workerman\Mqtt\Packet::TYPE_CONNECT:
            // 客户端连接请求
            $username = $packet['username'] ?? '';
            $password = $packet['password'] ?? '';
            if ($username !== 'admin' || $password !== 'password') {
                // 身份验证失败
                echo "Authentication failed for client: " . $connection->getRemoteIp() . "\n";
                $connection->close();
                return;
            }
            echo "Client connected\n";
            // 发送 CONNACK 报文
            $connection->send(\Workerman\Mqtt\Packet::packConnAck(0));
            break;

        case \Workerman\Mqtt\Packet::TYPE_PUBLISH:
            // 客户端发布消息
            $topic = $packet['topic'];
            $payload = $packet['payload'];
            echo "Received message from client: Topic=$topic, Payload=$payload\n";

            // 处理设备上报的数据
            if ($topic === 'device/data') {
                $data = json_decode($payload, true);
                if ($data && isset($data['device_id'], $data['temperature'], $data['humidity'])) {
                    // 持久化数据到数据库
                    saveDeviceData($data);
                }
            }

            // 回复消息
            $connection->send(\Workerman\Mqtt\Packet::packPublish('server/response', 'Message received'));
            break;

        case \Workerman\Mqtt\Packet::TYPE_SUBSCRIBE:
            // 客户端订阅主题
            $topics = $packet['topics'];
            echo "Client subscribed to topics: " . implode(', ', $topics) . "\n";
            // 发送 SUBACK 报文
            $connection->send(\Workerman\Mqtt\Packet::packSubAck($packet['message_id'], [0]));
            break;

        case \Workerman\Mqtt\Packet::TYPE_PINGREQ:
            // 客户端发送心跳
            echo "Client ping\n";
            // 发送 PINGRESP 报文
            $connection->send(\Workerman\Mqtt\Packet::packPingResp());
            break;

        case \Workerman\Mqtt\Packet::TYPE_DISCONNECT:
            // 客户端断开连接
            echo "Client disconnected\n";
            $connection->close();
            break;
    }
};

// 当客户端断开连接时触发
$worker->onClose = function (TcpConnection $connection) {
    echo "Client disconnected: " . $connection->getRemoteIp() . "\n";
};

// 运行 Worker
Worker::runAll();

// 持久化设备数据到数据库
function saveDeviceData($data) {
    $pdo = new PDO('mysql:host=localhost;dbname=mqtt', 'root', 'password');
    $stmt = $pdo->prepare('INSERT INTO device_data (device_id, temperature, humidity) VALUES (?, ?, ?)');
    $stmt->execute([$data['device_id'], $data['temperature'], $data['humidity']]);
    echo "Device data saved to database\n";
}

2.2 运行服务端

在命令行中运行以下命令启动 MQTT 服务端:

php mqtt_server.php start

3. 实现设备端(MQTT 客户端)

以下是一个设备端的 MQTT 客户端实现,支持 SSL/TLS 加密通信。

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个 Worker 实例
$worker = new Worker();

// 当 Worker 启动时执行
$worker->onWorkerStart = function () {
    // 创建一个 MQTT 客户端实例,连接到本地 MQTT 服务端
    $mqtt = new Workerman\Mqtt\Client('mqtt://127.0.0.1:1883', [
        'username' => 'admin', // 用户名
        'password' => 'password', // 密码
        'ssl' => [
            'verify_peer' => false,
        ]
    ]);

    // 当连接成功时执行
    $mqtt->onConnect = function ($mqtt) {
        // 订阅服务端下发的主题
        $mqtt->subscribe('server/response');

        // 模拟设备上报数据
        $mqtt->publish('device/data', json_encode([
            'device_id' => '12345',
            'temperature' => 25.6,
            'humidity' => 60.5,
        ]));
    };

    // 当收到消息时执行
    $mqtt->onMessage = function ($topic, $content) {
        echo "Received message from server: Topic=$topic, Content=$content\n";
    };

    // 连接到 MQTT 服务端
    $mqtt->connect();
};

// 运行 Worker
Worker::runAll();

运行设备端

在命令行中运行以下命令启动设备端:

php device_client.php start

4. 实现服务端下发消息(MQTT 客户端)

以下是一个服务端下发消息的 MQTT 客户端实现。

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个 Worker 实例
$worker = new Worker();

// 当 Worker 启动时执行
$worker->onWorkerStart = function () {
    // 创建一个 MQTT 客户端实例,连接到本地 MQTT 服务端
    $mqtt = new Workerman\Mqtt\Client('mqtt://127.0.0.1:1883', [
        'username' => 'admin', // 用户名
        'password' => 'password', // 密码
        'ssl' => [
            'verify_peer' => false,
        ]
    ]);

    // 当连接成功时执行
    $mqtt->onConnect = function ($mqtt) {
        // 向设备下发消息
        $mqtt->publish('device/command', json_encode([
            'command' => 'reboot',
            'timestamp' => time(),
        ]));
    };

    // 连接到 MQTT 服务端
    $mqtt->connect();
};

// 运行 Worker
Worker::runAll();

运行服务端下发消息

在命令行中运行以下命令启动服务端下发消息:

php server_client.php start

标签: PHP, 物联网, MQTT

相关文章

一些编程语言学习心得

作为一名专注于PHP、Go、Java和前端开发(JavaScript、HTML、CSS)的开发者,还得会运维、会谈客户....不想了,都是泪,今天说说这些年学习编程语言的一些体会,不同编程语言在...

Memcached如何配置分布式使用 并附PHP示例

Memcached是一种高性能的分布式内存对象缓存系统,广泛用于加速动态Web应用程序。通过将数据存储在内存中,Memcached能够显著减少数据库负载,提高应用的响应速度Memcached分布...

使用PHP打造轻量级单文件SQLite数据库管理工具

先声明一下,这是我自己内网使用的一个简单的管理工具,所以安全性方面我肯定是没有测试的~ 如果你要放在公网,请添加相关的权限认证及sql防注入等处理在开发过程中,我们经常需要一个简单易用的数据库管...

PHP 中的 declare 指令

在 PHP 编程中,declare 指令是一个强大的工具,用于控制代码的执行行为。它不仅可以启用严格类型模式,还可以用于其他一些高级功能,如性能监控和字符编码。本文将深入探讨 declare 指...

图片Base64编码

CSR生成

图片无损放大

图片占位符

Excel拆分文件