Skip to content

Apusic ADMQ MQTT 开发集成手册

版本: 1.0 适用产品: Apusic ADMQ MQTT 模块 更新日期: 2026-06


目录

  1. 概述
  2. 快速连接示例
  3. 各语言客户端集成
  4. MQTT 5.0 高级特性使用
  5. 共享订阅开发模式
  6. WebSocket 集成
  7. TLS/SSL 安全连接开发
  8. REST API 集成
  9. 规则引擎与数据集成
  10. 网关协议集成
  11. 最佳实践

1. 概述

1.1 适用对象

本手册面向需要将应用系统接入 Apusic ADMQ MQTT 服务的开发人员,包括:

  • IoT 设备端固件开发
  • 后端服务集成开发
  • 前端 Web/移动端应用开发

1.2 连接参数总览

参数说明默认值
Broker 地址服务器 IP 或域名
TCP 端口明文连接1883
TLS 端口加密连接8883
WebSocket 端口WS 连接8083
WSS 端口WSS 加密连接8084
Client ID客户端唯一标识自定义(不可重复)
用户名认证用户名按配置
密码认证密码按配置
协议版本MQTT 版本3.1.1 或 5.0
Keep Alive心跳间隔(秒)60

1.3 Client ID 规范建议

  • 必须唯一,相同 Client ID 的新连接会踢掉旧连接
  • 建议格式:{设备类型}_{设备序列号}{业务系统}_{UUID}
  • 最大长度:65535 字符
  • 避免使用特殊字符,推荐使用字母、数字、连字符、下划线

2. 快速连接示例

2.1 使用 MQTT 命令行工具测试

bash
# 安装 mosquitto 客户端(测试用)
# macOS: brew install mosquitto
# Ubuntu: apt install mosquitto-clients

# 订阅主题
mosquitto_sub -h localhost -p 1883 -u admin -P public \
    -t "test/#" -v

# 发布消息
mosquitto_pub -h localhost -p 1883 -u admin -P public \
    -t "test/hello" -m "Hello ADMQ" -q 1

2.2 连接认证

ADMQ 默认启用内置数据库认证,开发测试时可通过控制台添加测试用户:

  1. 访问 http://localhost:18083
  2. 进入"访问控制 → 认证" → "内置数据库"
  3. 点击"添加用户",创建用户名和密码

3. 各语言客户端集成

3.1 Java

推荐库: Eclipse Paho Java Client

Maven 依赖:

xml
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

MQTT 3.1.1 示例:

java
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class AdmqMqttExample {

    private static final String BROKER_URL = "tcp://localhost:1883";
    private static final String CLIENT_ID  = "java-client-001";

    public static void main(String[] args) throws MqttException {

        MemoryPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient(BROKER_URL, CLIENT_ID, persistence);

        // 连接参数
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("your-username");
        options.setPassword("your-password".toCharArray());
        options.setCleanSession(false);          // false = 持久会话
        options.setKeepAliveInterval(60);        // 心跳间隔(秒)
        options.setConnectionTimeout(10);        // 连接超时(秒)
        options.setAutomaticReconnect(true);     // 自动重连

        // 消息回调
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("连接断开: " + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                System.out.printf("收到消息 [%s]: %s%n",
                    topic, new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("消息发布完成: " + token.getMessageId());
            }
        });

        // 连接
        client.connect(options);
        System.out.println("已连接到 ADMQ MQTT Broker");

        // 订阅
        client.subscribe("sensors/#", 1);

        // 发布
        MqttMessage msg = new MqttMessage("Hello ADMQ".getBytes());
        msg.setQos(1);
        msg.setRetained(false);
        client.publish("sensors/temperature", msg);

        // 断开(根据业务逻辑决定时机)
        // client.disconnect();
    }
}

MQTT 5.0 示例(Paho v5):

xml
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>
java
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.*;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

MqttClient client = new MqttClient("tcp://localhost:1883", "java-v5-client");
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("your-username");
options.setPassword("your-password".getBytes());
options.setSessionExpiryInterval(3600L);   // MQTT 5.0 会话过期(秒)

// 消息属性(MQTT 5.0 特性)
MqttProperties props = new MqttProperties();
props.setMessageExpiryInterval(300L);      // 消息 5 分钟后过期

client.connect(options);

MqttMessage msg = new MqttMessage("payload".getBytes(), 1, false, props);
client.publish("test/topic", msg);

3.2 Python

推荐库: paho-mqtt

bash
pip install paho-mqtt

MQTT 3.1.1 示例:

python
import paho.mqtt.client as mqtt
import time

BROKER_HOST = "localhost"
BROKER_PORT = 1883
CLIENT_ID   = "python-client-001"
USERNAME    = "your-username"
PASSWORD    = "your-password"


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
        # 连接成功后订阅,避免重连后丢失订阅
        client.subscribe("sensors/#", qos=1)
    else:
        print(f"连接失败,错误码: {rc}")


def on_message(client, userdata, msg):
    print(f"收到消息 [{msg.topic}]: {msg.payload.decode()}")


def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"意外断开,将自动重连: {rc}")


client = mqtt.Client(client_id=CLIENT_ID, clean_session=False)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect    = on_connect
client.on_message    = on_message
client.on_disconnect = on_disconnect

# 设置遗嘱消息
client.will_set(
    topic=f"status/{CLIENT_ID}/offline",
    payload="offline",
    qos=1,
    retain=True
)

client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

# 发布消息
client.publish("sensors/temperature", payload="25.5", qos=1)

# 阻塞循环
client.loop_forever()

MQTT 5.0 示例:

python
import paho.mqtt.client as mqtt

client = mqtt.Client(
    client_id="python-v5-client",
    protocol=mqtt.MQTTv5
)
client.username_pw_set("your-username", "your-password")

def on_connect(client, userdata, flags, rc, properties=None):
    print(f"连接结果: {rc}")
    client.subscribe("test/#", qos=1)

client.on_connect = on_connect
client.connect("localhost", 1883, keepalive=60,
               clean_start=False,         # 持久会话
               properties=mqtt.Properties(mqtt.PacketTypes.CONNECT))

client.loop_forever()

3.3 Node.js

推荐库: MQTT.js

bash
npm install mqtt
javascript
const mqtt = require('mqtt');

const client = mqtt.connect('mqtt://localhost:1883', {
    clientId: 'nodejs-client-001',
    username: 'your-username',
    password: 'your-password',
    clean: false,               // 持久会话
    keepalive: 60,
    reconnectPeriod: 3000,      // 断线 3 秒后重连(毫秒)
    connectTimeout: 10000,

    // 遗嘱消息
    will: {
        topic: 'status/nodejs-client-001/offline',
        payload: Buffer.from('offline'),
        qos: 1,
        retain: true
    }
});

client.on('connect', () => {
    console.log('已连接到 ADMQ MQTT Broker');

    // 订阅
    client.subscribe('sensors/#', { qos: 1 }, (err) => {
        if (!err) console.log('订阅成功');
    });

    // 发布
    client.publish('sensors/temperature', '25.5', {
        qos: 1,
        retain: false
    });
});

client.on('message', (topic, message) => {
    console.log(`收到消息 [${topic}]: ${message.toString()}`);
});

client.on('error', (err) => {
    console.error('MQTT 错误:', err.message);
});

client.on('reconnect', () => {
    console.log('正在重连...');
});

MQTT 5.0(Node.js):

javascript
const client = mqtt.connect('mqtt://localhost:1883', {
    protocolVersion: 5,            // 指定 MQTT 5.0
    clientId: 'nodejs-v5-client',
    sessionExpiryInterval: 3600,   // 会话过期(秒)
    properties: {
        sessionExpiryInterval: 3600
    }
});

// 发布带属性的消息(MQTT 5.0)
client.publish('test/topic', 'payload', {
    qos: 1,
    properties: {
        messageExpiryInterval: 300,      // 消息 5 分钟后过期
        userProperties: {
            source: 'nodejs-app',
            version: '1.0'
        }
    }
});

3.4 Go

推荐库: Eclipse Paho Go

bash
go get github.com/eclipse/paho.mqtt.golang
go
package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://localhost:1883")
    opts.SetClientID("go-client-001")
    opts.SetUsername("your-username")
    opts.SetPassword("your-password")
    opts.SetCleanSession(false)       // 持久会话
    opts.SetKeepAlive(60 * time.Second)
    opts.SetAutoReconnect(true)
    opts.SetMaxReconnectInterval(30 * time.Second)

    // 遗嘱消息
    opts.SetWill("status/go-client-001/offline", "offline", 1, true)

    opts.SetOnConnectHandler(func(c mqtt.Client) {
        fmt.Println("连接成功")
        // 订阅
        token := c.Subscribe("sensors/#", 1, func(c mqtt.Client, m mqtt.Message) {
            fmt.Printf("收到消息 [%s]: %s\n", m.Topic(), m.Payload())
        })
        token.Wait()
    })

    opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
        fmt.Printf("连接断开: %v\n", err)
    })

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    // 发布
    token := client.Publish("sensors/temperature", 1, false, "25.5")
    token.Wait()

    select {} // 保持运行
}

3.5 C/C++(嵌入式设备)

推荐库: Eclipse Paho Embedded C / Mosquitto

c
#include "MQTTClient.h"
#include <string.h>

#define BROKER_ADDRESS "tcp://192.168.1.100:1883"
#define CLIENT_ID      "c-device-001"
#define TOPIC          "sensors/temperature"

volatile int connected = 0;

void on_connection_lost(void *context, char *cause) {
    printf("连接断开: %s\n", cause ? cause : "未知原因");
    connected = 0;
}

int on_message_arrived(void *context, char *topicName, int topicLen,
                       MQTTClient_message *message) {
    printf("收到消息 [%s]: %.*s\n",
           topicName, message->payloadlen, (char*)message->payload);
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

void on_delivery_complete(void *context, MQTTClient_deliveryToken dt) {
    printf("消息 %d 投递完成\n", dt);
}

int main() {
    MQTTClient client;
    MQTTClient_create(&client, BROKER_ADDRESS, CLIENT_ID,
                      MQTTCLIENT_PERSISTENCE_NONE, NULL);

    MQTTClient_setCallbacks(client, NULL,
                            on_connection_lost,
                            on_message_arrived,
                            on_delivery_complete);

    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    conn_opts.keepAliveInterval = 60;
    conn_opts.cleansession = 0;         /* 0 = 持久会话 */
    conn_opts.username = "your-username";
    conn_opts.password = "your-password";

    int rc = MQTTClient_connect(client, &conn_opts);
    if (rc != MQTTCLIENT_SUCCESS) {
        printf("连接失败: %d\n", rc);
        return 1;
    }

    /* 订阅 */
    MQTTClient_subscribe(client, "commands/#", 1);

    /* 发布 */
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    char payload[] = "25.5";
    pubmsg.payload    = payload;
    pubmsg.payloadlen = strlen(payload);
    pubmsg.qos        = 1;
    pubmsg.retained   = 0;

    MQTTClient_deliveryToken token;
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    MQTTClient_waitForCompletion(client, token, 5000);

    /* 主循环 */
    while (1) {
        sleep(1);
    }

    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return 0;
}

4. MQTT 5.0 高级特性使用

4.1 请求/响应模式

MQTT 5.0 内置了请求/响应语义,无需自行设计消息关联机制。

请求方:

python
import paho.mqtt.client as mqtt
import uuid

client = mqtt.Client(protocol=mqtt.MQTTv5)
client.connect("localhost", 1883)

correlation_id = str(uuid.uuid4())
reply_topic = f"reply/{client_id}/{correlation_id}"

# 订阅应答主题
client.subscribe(reply_topic, qos=1)

# 发送请求
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.ResponseTopic = reply_topic
props.CorrelationData = correlation_id.encode()

client.publish("request/calculate", "42", qos=1, properties=props)

响应方:

python
def on_message(client, userdata, msg):
    props = msg.properties
    result = process(msg.payload)

    # 使用请求中的 ResponseTopic 和 CorrelationData 回复
    reply_props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    reply_props.CorrelationData = props.CorrelationData

    client.publish(props.ResponseTopic, result, qos=1, properties=reply_props)

4.2 消息过期

python
# MQTT 5.0:消息 5 分钟后自动过期
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.MessageExpiryInterval = 300  # 秒

client.publish("alerts/fire", "报警!", qos=2, properties=props)

4.3 用户属性

javascript
// Node.js MQTT 5.0 用户属性
client.publish('data/sensor', JSON.stringify(data), {
    qos: 1,
    properties: {
        userProperties: {
            deviceType: 'temperature-sensor',
            firmware: 'v2.1.0',
            location: 'building-A'
        }
    }
});

4.4 主题别名(减少带宽)

适用于频繁发布到同一主题的场景:

python
# 第一次发布:建立别名
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.TopicAlias = 1   # 别名 ID

client.publish("very/long/topic/name/sensors/temperature/room/101",
               "25.5", qos=0, properties=props)

# 后续发布:只发别名,不发主题名(节省带宽)
props2 = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props2.TopicAlias = 1
client.publish("", "26.0", qos=0, properties=props2)  # 主题名为空

5. 共享订阅开发模式

5.1 负载均衡消费

多个消费者实例共享订阅,实现水平扩展:

python
# worker_1.py
client.subscribe("$share/workers/jobs/#", qos=1)

# worker_2.py(同一组,消息会被分发到其中一个)
client.subscribe("$share/workers/jobs/#", qos=1)

5.2 多组独立消费

不同消费者组各自独立收到所有消息:

python
# 日志组:接收所有消息
client.subscribe("$share/logger/events/#", qos=1)

# 告警组:接收所有消息(与日志组独立)
client.subscribe("$share/alerting/events/#", qos=1)

5.3 选择合适的共享订阅策略

config/mqtt.conf 中配置:

ini
mqtt {
    shared_subscription_strategy = round_robin  # 推荐生产使用
}
场景推荐策略
通用负载均衡round_robin
同一设备消息保证顺序hash_clientid
同一主题消息保证顺序hash_topic
优先本地节点处理local
有状态处理(会话关联)sticky

6. WebSocket 集成

6.1 浏览器端(JavaScript)

html
<!DOCTYPE html>
<html>
<head>
    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<script>
const client = mqtt.connect('ws://localhost:8083/mqtt', {
    clientId: 'browser-' + Math.random().toString(16).substr(2, 8),
    username: 'your-username',
    password: 'your-password',
    clean: true,
    keepalive: 60
});

client.on('connect', () => {
    console.log('MQTT 连接成功');
    client.subscribe('notifications/#', { qos: 1 });
    client.publish('status/browser/online', 'online', { retain: true });
});

client.on('message', (topic, payload) => {
    console.log(`[${topic}] ${payload.toString()}`);
    // 更新 UI...
});

client.on('error', (err) => {
    console.error('MQTT 错误:', err);
});

// 页面关闭时发送离线状态
window.addEventListener('beforeunload', () => {
    client.publish('status/browser/online', 'offline', { retain: true });
    client.end();
});
</script>
</body>
</html>

6.2 Vue.js / React 集成示例

javascript
// mqttService.js - 封装 MQTT 连接
import mqtt from 'mqtt';

class MqttService {
    constructor() {
        this.client = null;
        this.listeners = new Map();
    }

    connect(brokerUrl, options) {
        this.client = mqtt.connect(brokerUrl, options);

        this.client.on('connect', () => {
            console.log('MQTT 已连接');
        });

        this.client.on('message', (topic, payload) => {
            // 精确匹配和通配符匹配
            this.listeners.forEach((callback, pattern) => {
                if (this.topicMatch(pattern, topic)) {
                    callback(topic, payload.toString());
                }
            });
        });
    }

    subscribe(topic, callback) {
        this.client.subscribe(topic, { qos: 1 });
        this.listeners.set(topic, callback);
    }

    publish(topic, payload, options = {}) {
        this.client.publish(topic, payload, { qos: 1, ...options });
    }

    topicMatch(pattern, topic) {
        // 简单通配符匹配(生产建议使用专用库)
        const regex = pattern
            .replace(/\+/g, '[^/]+')
            .replace(/#/g, '.*');
        return new RegExp(`^${regex}$`).test(topic);
    }

    disconnect() {
        if (this.client) {
            this.client.end();
        }
    }
}

export default new MqttService();

7. TLS/SSL 安全连接开发

7.1 Java TLS 连接

java
import org.eclipse.paho.client.mqttv3.*;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class TlsExample {

    public static SSLContext createSslContext(String caCertPath) throws Exception {
        // 加载 CA 证书
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
        X509Certificate caCert;
        try (FileInputStream fis = new FileInputStream(caCertPath)) {
            caCert = (X509Certificate) cf.generateCertificate(fis);
        }

        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca-cert", caCert);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance(
            TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(keyStore);

        SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
        sslContext.init(null, tmf.getTrustManagers(), null);
        return sslContext;
    }

    public static void main(String[] args) throws Exception {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("your-username");
        options.setPassword("your-password".toCharArray());
        options.setSocketFactory(createSslContext("/path/to/cacert.pem").getSocketFactory());

        MqttClient client = new MqttClient("ssl://localhost:8883", "java-tls-client");
        client.connect(options);
        System.out.println("TLS 连接成功");
    }
}

7.2 Python TLS 连接

python
import ssl
import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="python-tls-client")
client.username_pw_set("your-username", "your-password")

# 单向 TLS(只验证服务端证书)
client.tls_set(
    ca_certs="/path/to/cacert.pem",
    tls_version=ssl.PROTOCOL_TLS_CLIENT
)

# 双向 TLS(mTLS,同时提供客户端证书)
# client.tls_set(
#     ca_certs="/path/to/cacert.pem",
#     certfile="/path/to/client.pem",
#     keyfile="/path/to/client.key",
#     tls_version=ssl.PROTOCOL_TLS_CLIENT
# )

client.connect("localhost", 8883)
client.loop_forever()

7.3 Node.js TLS 连接

javascript
const mqtt = require('mqtt');
const fs   = require('fs');

const client = mqtt.connect('mqtts://localhost:8883', {
    clientId: 'nodejs-tls-client',
    username: 'your-username',
    password: 'your-password',

    // 单向 TLS
    ca: fs.readFileSync('/path/to/cacert.pem'),

    // 双向 TLS(mTLS)
    // cert: fs.readFileSync('/path/to/client.pem'),
    // key:  fs.readFileSync('/path/to/client.key'),
});

8. REST API 集成

8.1 认证方式

REST API 使用 HTTP Basic Auth:

  • 用户名:admin(默认)
  • 密码:public(默认)

8.2 常用 API

获取在线客户端列表:

bash
curl -u admin:public \
  "http://localhost:18083/api/v5/clients?page=1&limit=100"

获取指定客户端信息:

bash
curl -u admin:public \
  "http://localhost:18083/api/v5/clients/my-client-id"

踢出客户端:

bash
curl -u admin:public -X DELETE \
  "http://localhost:18083/api/v5/clients/my-client-id"

通过 HTTP API 发布消息:

bash
curl -u admin:public -X POST \
  "http://localhost:18083/api/v5/publish" \
  -H "Content-Type: application/json" \
  -d '{
    "topic":   "sensors/temperature",
    "payload": "25.5",
    "qos":     1,
    "retain":  false
  }'

批量发布消息:

bash
curl -u admin:public -X POST \
  "http://localhost:18083/api/v5/publish/bulk" \
  -H "Content-Type: application/json" \
  -d '[
    {"topic": "device/001/cmd", "payload": "start", "qos": 1},
    {"topic": "device/002/cmd", "payload": "stop",  "qos": 1}
  ]'

查询主题订阅者:

bash
curl -u admin:public \
  "http://localhost:18083/api/v5/subscriptions?topic=sensors%2Ftemperature"

获取保留消息:

bash
curl -u admin:public \
  "http://localhost:18083/api/v5/retainer/messages"

删除保留消息:

bash
curl -u admin:public -X DELETE \
  "http://localhost:18083/api/v5/retainer/messages/sensors%2Ftemperature"

8.3 Java 调用 REST API

java
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.Base64;

public class AdmqRestClient {

    private final HttpClient httpClient = HttpClient.newHttpClient();
    private final String baseUrl;
    private final String credentials;

    public AdmqRestClient(String host, int port, String user, String password) {
        this.baseUrl = "http://" + host + ":" + port + "/api/v5";
        this.credentials = Base64.getEncoder()
            .encodeToString((user + ":" + password).getBytes());
    }

    public void publish(String topic, String payload, int qos) throws Exception {
        String body = String.format(
            "{\"topic\":\"%s\",\"payload\":\"%s\",\"qos\":%d}",
            topic, payload, qos);

        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + "/publish"))
            .header("Authorization", "Basic " + credentials)
            .header("Content-Type", "application/json")
            .POST(HttpRequest.BodyPublishers.ofString(body))
            .build();

        HttpResponse<String> response = httpClient.send(request,
            HttpResponse.BodyHandlers.ofString());
        System.out.println("发布结果: " + response.statusCode());
    }

    public static void main(String[] args) throws Exception {
        AdmqRestClient client = new AdmqRestClient(
            "localhost", 18083, "admin", "public");
        client.publish("test/topic", "Hello from REST API", 1);
    }
}

9. 规则引擎与数据集成

9.1 规则引擎概述

ADMQ 内置规则引擎,支持使用类 SQL 语法对消息进行实时处理和路由,无需在应用层编写额外代码:

sql
-- 示例:筛选温度超过 30 度的消息,转发到 HTTP 接口
SELECT
    payload.temperature AS temp,
    topic,
    clientid,
    timestamp
FROM
    "sensors/#"
WHERE
    payload.temperature > 30

9.2 消息桥接到其他 MQTT Broker

通过 MQTT Bridge 将消息转发到另一个 MQTT 服务:

ini
bridges.mqtt.to_cloud {
    enable = true
    server = "ssl://cloud.example.com:8883"
    proto_ver = 5
    clientid = "admq-bridge-to-cloud"
    ssl {
        enable = true
        cacertfile = "${ADMQ_ETC_DIR}/ssl/cloud-ca.pem"
    }
    ingress {
        remote.topic = "cloud/#"
        local.topic  = "from-cloud/${topic}"
    }
    egress {
        local.topic  = "upload/#"
        remote.topic = "from-edge/${topic}"
        remote.qos   = 1
    }
}

9.3 消息桥接到 HTTP 服务

ini
bridges.http.to_backend {
    enable = true
    url    = "http://backend.example.com/mqtt/message"
    method = post
    headers {
        "Content-Type" = "application/json"
        "X-API-Key"    = "your-api-key"
    }
    body = "${payload}"
}

10. 网关协议集成

10.1 MQTT-SN(传感器网络)

适用于资源受限的传感器设备:

ini
gateway.mqttsn {
    enable = true
    gateway_id = 1
    broadcast = true
    enable_qos3 = true
    listeners.udp.default {
        bind = "0.0.0.0:1884"
    }
}

10.2 CoAP

适用于使用 CoAP 协议的 IoT 设备:

ini
gateway.coap {
    enable = true
    mountpoint = "coap/"
    connection_required = false
    listeners.udp.default {
        bind = "0.0.0.0:5683"
    }
}

CoAP 消息发布示例(等价于 MQTT publish):

PUT coap://localhost:5683/ps/sensors/temperature
Payload: 25.5

10.3 STOMP

适用于需要通过 STOMP 协议接入的系统(如部分消息队列客户端):

ini
gateway.stomp {
    enable = true
    frame.max_headers = 10
    frame.max_header_length = 1024
    frame.max_body_length = 65536
    listeners.tcp.default {
        bind = "0.0.0.0:61613"
    }
}

11. 最佳实践

11.1 Client ID 管理

// 推荐格式
{产品线}-{设备类型}-{设备序列号}
// 示例
factory-sensor-SN20240101001
webapp-browser-uuid-xxx
backend-service-order-processor
  • 每个客户端使用唯一且固定的 Client ID
  • 不要动态生成随机 Client ID(会导致持久会话无法恢复)
  • 对设备管理场景,Client ID 最好能映射到设备资产 ID

11.2 QoS 选择策略

场景推荐 QoS理由
传感器高频数据(每秒多次)QoS 0偶发丢失可接受,低延迟
设备状态上报QoS 1保证送达,允许重复
控制指令下发QoS 1 或 2必须执行,QoS 2 避免重复执行
支付/关键业务事件QoS 2严格一次语义

11.3 主题设计规范

// 推荐:层次清晰,可扩展
{区域}/{设备类型}/{设备ID}/{数据类型}

// 示例
factory/sensor/SN001/temperature
factory/sensor/SN001/humidity
factory/actuator/AC001/cmd
factory/actuator/AC001/status

// 规则:
// - 不以 / 开头或结尾
// - 不使用空格
// - 区分大小写(temperature ≠ Temperature)
// - 系统保留主题以 $ 开头(如 $SYS/、$share/)

11.4 重连策略

python
# Python 示例:指数退避重连
import time
import paho.mqtt.client as mqtt

MAX_RETRY_INTERVAL = 60  # 最大重连间隔(秒)
retry_interval = 1

def on_disconnect(client, userdata, rc):
    global retry_interval
    if rc != 0:
        print(f"意外断开,{retry_interval}秒后重连")
        time.sleep(retry_interval)
        retry_interval = min(retry_interval * 2, MAX_RETRY_INTERVAL)
        try:
            client.reconnect()
            retry_interval = 1  # 重连成功后重置
        except Exception as e:
            print(f"重连失败: {e}")

11.5 消息 Payload 规范

推荐使用 JSON 格式:

json
{
    "device_id": "SN001",
    "timestamp": 1717430400,
    "data": {
        "temperature": 25.5,
        "humidity": 60.2
    },
    "version": "1.0"
}

注意事项:

  • Payload 默认最大 1MB(max_packet_size),大文件建议改用文件存储+MQTT传递URL
  • 二进制数据需 Base64 编码后传输
  • 时间戳使用 Unix 时间戳(秒),避免时区问题

11.6 保留消息使用规范

适合用于保留消息的数据:

  • 设备在线/离线状态
  • 设备最新配置
  • 系统当前运行模式

不适合的数据:

  • 高频变化的传感器数值(每次更新都会存储)
  • 一次性事件(用 QoS 1 即可)

11.7 遗嘱消息最佳实践

python
# 上线时发布"在线"保留消息
client.publish(f"status/{device_id}/online", "online", qos=1, retain=True)

# 连接时设置遗嘱:异常断线时自动发布"离线"
client.will_set(
    topic=f"status/{device_id}/online",
    payload="offline",
    qos=1,
    retain=True
)

11.8 安全建议

  1. 生产环境必须启用 TLS,禁止在公网使用明文 1883 端口
  2. 每个设备使用独立的账号,避免共享同一用户名密码
  3. 启用 ACL 授权,限制设备只能访问自己的主题命名空间
  4. 定期轮换密码,配合 HTTP 认证后端实现动态认证
  5. 修改默认管理员密码,控制台端口不要暴露到外网

如有更多集成问题,请联系 Apusic 技术支持。

金蝶天燕(Apusic)企业级消息中间件套件