外观
Apusic ADMQ MQTT 开发集成手册
版本: 1.0 适用产品: Apusic ADMQ MQTT 模块 更新日期: 2026-06
目录
1. 概述
1.1 适用对象
本手册面向需要将应用系统接入 Apusic ADMQ MQTT 服务的开发人员,包括:
- IoT 设备端固件开发
- 后端服务集成开发
- 前端 Web/移动端应用开发
1.2 连接参数总览
| 参数 | 说明 | 默认值 |
|---|---|---|
| Broker 地址 | 服务器 IP 或域名 | — |
| TCP 端口 | 明文连接 | 1883 |
| TLS 端口 | 加密连接 | 8883 |
| WebSocket 端口 | WS 连接 | 8083 |
| WSS 端口 | WSS 加密连接 | 8084 |
| Client ID | 客户端唯一标识 | 自定义(不可重复) |
| 用户名 | 认证用户名 | 按配置 |
| 密码 | 认证密码 | 按配置 |
| 协议版本 | MQTT 版本 | 3.1.1 或 5.0 |
| Keep Alive | 心跳间隔(秒) | 60 |
1.3 Client ID 规范建议
- 必须唯一,相同 Client ID 的新连接会踢掉旧连接
- 建议格式:
{设备类型}_{设备序列号}或{业务系统}_{UUID} - 最大长度:65535 字符
- 避免使用特殊字符,推荐使用字母、数字、连字符、下划线
2. 快速连接示例
2.1 使用 MQTT 命令行工具测试
bash
# 安装 mosquitto 客户端(测试用)
# macOS: brew install mosquitto
# Ubuntu: apt install mosquitto-clients
# 订阅主题
mosquitto_sub -h localhost -p 1883 -u admin -P public \
-t "test/#" -v
# 发布消息
mosquitto_pub -h localhost -p 1883 -u admin -P public \
-t "test/hello" -m "Hello ADMQ" -q 12.2 连接认证
ADMQ 默认启用内置数据库认证,开发测试时可通过控制台添加测试用户:
- 访问
http://localhost:18083 - 进入"访问控制 → 认证" → "内置数据库"
- 点击"添加用户",创建用户名和密码
3. 各语言客户端集成
3.1 Java
推荐库: Eclipse Paho Java Client
Maven 依赖:
xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>MQTT 3.1.1 示例:
java
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class AdmqMqttExample {
private static final String BROKER_URL = "tcp://localhost:1883";
private static final String CLIENT_ID = "java-client-001";
public static void main(String[] args) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(BROKER_URL, CLIENT_ID, persistence);
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("your-username");
options.setPassword("your-password".toCharArray());
options.setCleanSession(false); // false = 持久会话
options.setKeepAliveInterval(60); // 心跳间隔(秒)
options.setConnectionTimeout(10); // 连接超时(秒)
options.setAutomaticReconnect(true); // 自动重连
// 消息回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.printf("收到消息 [%s]: %s%n",
topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息发布完成: " + token.getMessageId());
}
});
// 连接
client.connect(options);
System.out.println("已连接到 ADMQ MQTT Broker");
// 订阅
client.subscribe("sensors/#", 1);
// 发布
MqttMessage msg = new MqttMessage("Hello ADMQ".getBytes());
msg.setQos(1);
msg.setRetained(false);
client.publish("sensors/temperature", msg);
// 断开(根据业务逻辑决定时机)
// client.disconnect();
}
}MQTT 5.0 示例(Paho v5):
xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>java
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.*;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
MqttClient client = new MqttClient("tcp://localhost:1883", "java-v5-client");
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("your-username");
options.setPassword("your-password".getBytes());
options.setSessionExpiryInterval(3600L); // MQTT 5.0 会话过期(秒)
// 消息属性(MQTT 5.0 特性)
MqttProperties props = new MqttProperties();
props.setMessageExpiryInterval(300L); // 消息 5 分钟后过期
client.connect(options);
MqttMessage msg = new MqttMessage("payload".getBytes(), 1, false, props);
client.publish("test/topic", msg);3.2 Python
推荐库: paho-mqtt
bash
pip install paho-mqttMQTT 3.1.1 示例:
python
import paho.mqtt.client as mqtt
import time
BROKER_HOST = "localhost"
BROKER_PORT = 1883
CLIENT_ID = "python-client-001"
USERNAME = "your-username"
PASSWORD = "your-password"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
# 连接成功后订阅,避免重连后丢失订阅
client.subscribe("sensors/#", qos=1)
else:
print(f"连接失败,错误码: {rc}")
def on_message(client, userdata, msg):
print(f"收到消息 [{msg.topic}]: {msg.payload.decode()}")
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"意外断开,将自动重连: {rc}")
client = mqtt.Client(client_id=CLIENT_ID, clean_session=False)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 设置遗嘱消息
client.will_set(
topic=f"status/{CLIENT_ID}/offline",
payload="offline",
qos=1,
retain=True
)
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# 发布消息
client.publish("sensors/temperature", payload="25.5", qos=1)
# 阻塞循环
client.loop_forever()MQTT 5.0 示例:
python
import paho.mqtt.client as mqtt
client = mqtt.Client(
client_id="python-v5-client",
protocol=mqtt.MQTTv5
)
client.username_pw_set("your-username", "your-password")
def on_connect(client, userdata, flags, rc, properties=None):
print(f"连接结果: {rc}")
client.subscribe("test/#", qos=1)
client.on_connect = on_connect
client.connect("localhost", 1883, keepalive=60,
clean_start=False, # 持久会话
properties=mqtt.Properties(mqtt.PacketTypes.CONNECT))
client.loop_forever()3.3 Node.js
推荐库: MQTT.js
bash
npm install mqttjavascript
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883', {
clientId: 'nodejs-client-001',
username: 'your-username',
password: 'your-password',
clean: false, // 持久会话
keepalive: 60,
reconnectPeriod: 3000, // 断线 3 秒后重连(毫秒)
connectTimeout: 10000,
// 遗嘱消息
will: {
topic: 'status/nodejs-client-001/offline',
payload: Buffer.from('offline'),
qos: 1,
retain: true
}
});
client.on('connect', () => {
console.log('已连接到 ADMQ MQTT Broker');
// 订阅
client.subscribe('sensors/#', { qos: 1 }, (err) => {
if (!err) console.log('订阅成功');
});
// 发布
client.publish('sensors/temperature', '25.5', {
qos: 1,
retain: false
});
});
client.on('message', (topic, message) => {
console.log(`收到消息 [${topic}]: ${message.toString()}`);
});
client.on('error', (err) => {
console.error('MQTT 错误:', err.message);
});
client.on('reconnect', () => {
console.log('正在重连...');
});MQTT 5.0(Node.js):
javascript
const client = mqtt.connect('mqtt://localhost:1883', {
protocolVersion: 5, // 指定 MQTT 5.0
clientId: 'nodejs-v5-client',
sessionExpiryInterval: 3600, // 会话过期(秒)
properties: {
sessionExpiryInterval: 3600
}
});
// 发布带属性的消息(MQTT 5.0)
client.publish('test/topic', 'payload', {
qos: 1,
properties: {
messageExpiryInterval: 300, // 消息 5 分钟后过期
userProperties: {
source: 'nodejs-app',
version: '1.0'
}
}
});3.4 Go
推荐库: Eclipse Paho Go
bash
go get github.com/eclipse/paho.mqtt.golanggo
package main
import (
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("go-client-001")
opts.SetUsername("your-username")
opts.SetPassword("your-password")
opts.SetCleanSession(false) // 持久会话
opts.SetKeepAlive(60 * time.Second)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(30 * time.Second)
// 遗嘱消息
opts.SetWill("status/go-client-001/offline", "offline", 1, true)
opts.SetOnConnectHandler(func(c mqtt.Client) {
fmt.Println("连接成功")
// 订阅
token := c.Subscribe("sensors/#", 1, func(c mqtt.Client, m mqtt.Message) {
fmt.Printf("收到消息 [%s]: %s\n", m.Topic(), m.Payload())
})
token.Wait()
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
fmt.Printf("连接断开: %v\n", err)
})
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 发布
token := client.Publish("sensors/temperature", 1, false, "25.5")
token.Wait()
select {} // 保持运行
}3.5 C/C++(嵌入式设备)
推荐库: Eclipse Paho Embedded C / Mosquitto
c
#include "MQTTClient.h"
#include <string.h>
#define BROKER_ADDRESS "tcp://192.168.1.100:1883"
#define CLIENT_ID "c-device-001"
#define TOPIC "sensors/temperature"
volatile int connected = 0;
void on_connection_lost(void *context, char *cause) {
printf("连接断开: %s\n", cause ? cause : "未知原因");
connected = 0;
}
int on_message_arrived(void *context, char *topicName, int topicLen,
MQTTClient_message *message) {
printf("收到消息 [%s]: %.*s\n",
topicName, message->payloadlen, (char*)message->payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void on_delivery_complete(void *context, MQTTClient_deliveryToken dt) {
printf("消息 %d 投递完成\n", dt);
}
int main() {
MQTTClient client;
MQTTClient_create(&client, BROKER_ADDRESS, CLIENT_ID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_setCallbacks(client, NULL,
on_connection_lost,
on_message_arrived,
on_delivery_complete);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 0; /* 0 = 持久会话 */
conn_opts.username = "your-username";
conn_opts.password = "your-password";
int rc = MQTTClient_connect(client, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS) {
printf("连接失败: %d\n", rc);
return 1;
}
/* 订阅 */
MQTTClient_subscribe(client, "commands/#", 1);
/* 发布 */
MQTTClient_message pubmsg = MQTTClient_message_initializer;
char payload[] = "25.5";
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = 1;
pubmsg.retained = 0;
MQTTClient_deliveryToken token;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
MQTTClient_waitForCompletion(client, token, 5000);
/* 主循环 */
while (1) {
sleep(1);
}
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return 0;
}4. MQTT 5.0 高级特性使用
4.1 请求/响应模式
MQTT 5.0 内置了请求/响应语义,无需自行设计消息关联机制。
请求方:
python
import paho.mqtt.client as mqtt
import uuid
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.connect("localhost", 1883)
correlation_id = str(uuid.uuid4())
reply_topic = f"reply/{client_id}/{correlation_id}"
# 订阅应答主题
client.subscribe(reply_topic, qos=1)
# 发送请求
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.ResponseTopic = reply_topic
props.CorrelationData = correlation_id.encode()
client.publish("request/calculate", "42", qos=1, properties=props)响应方:
python
def on_message(client, userdata, msg):
props = msg.properties
result = process(msg.payload)
# 使用请求中的 ResponseTopic 和 CorrelationData 回复
reply_props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
reply_props.CorrelationData = props.CorrelationData
client.publish(props.ResponseTopic, result, qos=1, properties=reply_props)4.2 消息过期
python
# MQTT 5.0:消息 5 分钟后自动过期
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.MessageExpiryInterval = 300 # 秒
client.publish("alerts/fire", "报警!", qos=2, properties=props)4.3 用户属性
javascript
// Node.js MQTT 5.0 用户属性
client.publish('data/sensor', JSON.stringify(data), {
qos: 1,
properties: {
userProperties: {
deviceType: 'temperature-sensor',
firmware: 'v2.1.0',
location: 'building-A'
}
}
});4.4 主题别名(减少带宽)
适用于频繁发布到同一主题的场景:
python
# 第一次发布:建立别名
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.TopicAlias = 1 # 别名 ID
client.publish("very/long/topic/name/sensors/temperature/room/101",
"25.5", qos=0, properties=props)
# 后续发布:只发别名,不发主题名(节省带宽)
props2 = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props2.TopicAlias = 1
client.publish("", "26.0", qos=0, properties=props2) # 主题名为空5. 共享订阅开发模式
5.1 负载均衡消费
多个消费者实例共享订阅,实现水平扩展:
python
# worker_1.py
client.subscribe("$share/workers/jobs/#", qos=1)
# worker_2.py(同一组,消息会被分发到其中一个)
client.subscribe("$share/workers/jobs/#", qos=1)5.2 多组独立消费
不同消费者组各自独立收到所有消息:
python
# 日志组:接收所有消息
client.subscribe("$share/logger/events/#", qos=1)
# 告警组:接收所有消息(与日志组独立)
client.subscribe("$share/alerting/events/#", qos=1)5.3 选择合适的共享订阅策略
在 config/mqtt.conf 中配置:
ini
mqtt {
shared_subscription_strategy = round_robin # 推荐生产使用
}| 场景 | 推荐策略 |
|---|---|
| 通用负载均衡 | round_robin |
| 同一设备消息保证顺序 | hash_clientid |
| 同一主题消息保证顺序 | hash_topic |
| 优先本地节点处理 | local |
| 有状态处理(会话关联) | sticky |
6. WebSocket 集成
6.1 浏览器端(JavaScript)
html
<!DOCTYPE html>
<html>
<head>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<script>
const client = mqtt.connect('ws://localhost:8083/mqtt', {
clientId: 'browser-' + Math.random().toString(16).substr(2, 8),
username: 'your-username',
password: 'your-password',
clean: true,
keepalive: 60
});
client.on('connect', () => {
console.log('MQTT 连接成功');
client.subscribe('notifications/#', { qos: 1 });
client.publish('status/browser/online', 'online', { retain: true });
});
client.on('message', (topic, payload) => {
console.log(`[${topic}] ${payload.toString()}`);
// 更新 UI...
});
client.on('error', (err) => {
console.error('MQTT 错误:', err);
});
// 页面关闭时发送离线状态
window.addEventListener('beforeunload', () => {
client.publish('status/browser/online', 'offline', { retain: true });
client.end();
});
</script>
</body>
</html>6.2 Vue.js / React 集成示例
javascript
// mqttService.js - 封装 MQTT 连接
import mqtt from 'mqtt';
class MqttService {
constructor() {
this.client = null;
this.listeners = new Map();
}
connect(brokerUrl, options) {
this.client = mqtt.connect(brokerUrl, options);
this.client.on('connect', () => {
console.log('MQTT 已连接');
});
this.client.on('message', (topic, payload) => {
// 精确匹配和通配符匹配
this.listeners.forEach((callback, pattern) => {
if (this.topicMatch(pattern, topic)) {
callback(topic, payload.toString());
}
});
});
}
subscribe(topic, callback) {
this.client.subscribe(topic, { qos: 1 });
this.listeners.set(topic, callback);
}
publish(topic, payload, options = {}) {
this.client.publish(topic, payload, { qos: 1, ...options });
}
topicMatch(pattern, topic) {
// 简单通配符匹配(生产建议使用专用库)
const regex = pattern
.replace(/\+/g, '[^/]+')
.replace(/#/g, '.*');
return new RegExp(`^${regex}$`).test(topic);
}
disconnect() {
if (this.client) {
this.client.end();
}
}
}
export default new MqttService();7. TLS/SSL 安全连接开发
7.1 Java TLS 连接
java
import org.eclipse.paho.client.mqttv3.*;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
public class TlsExample {
public static SSLContext createSslContext(String caCertPath) throws Exception {
// 加载 CA 证书
CertificateFactory cf = CertificateFactory.getInstance("X.509");
X509Certificate caCert;
try (FileInputStream fis = new FileInputStream(caCertPath)) {
caCert = (X509Certificate) cf.generateCertificate(fis);
}
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca-cert", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext;
}
public static void main(String[] args) throws Exception {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("your-username");
options.setPassword("your-password".toCharArray());
options.setSocketFactory(createSslContext("/path/to/cacert.pem").getSocketFactory());
MqttClient client = new MqttClient("ssl://localhost:8883", "java-tls-client");
client.connect(options);
System.out.println("TLS 连接成功");
}
}7.2 Python TLS 连接
python
import ssl
import paho.mqtt.client as mqtt
client = mqtt.Client(client_id="python-tls-client")
client.username_pw_set("your-username", "your-password")
# 单向 TLS(只验证服务端证书)
client.tls_set(
ca_certs="/path/to/cacert.pem",
tls_version=ssl.PROTOCOL_TLS_CLIENT
)
# 双向 TLS(mTLS,同时提供客户端证书)
# client.tls_set(
# ca_certs="/path/to/cacert.pem",
# certfile="/path/to/client.pem",
# keyfile="/path/to/client.key",
# tls_version=ssl.PROTOCOL_TLS_CLIENT
# )
client.connect("localhost", 8883)
client.loop_forever()7.3 Node.js TLS 连接
javascript
const mqtt = require('mqtt');
const fs = require('fs');
const client = mqtt.connect('mqtts://localhost:8883', {
clientId: 'nodejs-tls-client',
username: 'your-username',
password: 'your-password',
// 单向 TLS
ca: fs.readFileSync('/path/to/cacert.pem'),
// 双向 TLS(mTLS)
// cert: fs.readFileSync('/path/to/client.pem'),
// key: fs.readFileSync('/path/to/client.key'),
});8. REST API 集成
8.1 认证方式
REST API 使用 HTTP Basic Auth:
- 用户名:
admin(默认) - 密码:
public(默认)
8.2 常用 API
获取在线客户端列表:
bash
curl -u admin:public \
"http://localhost:18083/api/v5/clients?page=1&limit=100"获取指定客户端信息:
bash
curl -u admin:public \
"http://localhost:18083/api/v5/clients/my-client-id"踢出客户端:
bash
curl -u admin:public -X DELETE \
"http://localhost:18083/api/v5/clients/my-client-id"通过 HTTP API 发布消息:
bash
curl -u admin:public -X POST \
"http://localhost:18083/api/v5/publish" \
-H "Content-Type: application/json" \
-d '{
"topic": "sensors/temperature",
"payload": "25.5",
"qos": 1,
"retain": false
}'批量发布消息:
bash
curl -u admin:public -X POST \
"http://localhost:18083/api/v5/publish/bulk" \
-H "Content-Type: application/json" \
-d '[
{"topic": "device/001/cmd", "payload": "start", "qos": 1},
{"topic": "device/002/cmd", "payload": "stop", "qos": 1}
]'查询主题订阅者:
bash
curl -u admin:public \
"http://localhost:18083/api/v5/subscriptions?topic=sensors%2Ftemperature"获取保留消息:
bash
curl -u admin:public \
"http://localhost:18083/api/v5/retainer/messages"删除保留消息:
bash
curl -u admin:public -X DELETE \
"http://localhost:18083/api/v5/retainer/messages/sensors%2Ftemperature"8.3 Java 调用 REST API
java
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.Base64;
public class AdmqRestClient {
private final HttpClient httpClient = HttpClient.newHttpClient();
private final String baseUrl;
private final String credentials;
public AdmqRestClient(String host, int port, String user, String password) {
this.baseUrl = "http://" + host + ":" + port + "/api/v5";
this.credentials = Base64.getEncoder()
.encodeToString((user + ":" + password).getBytes());
}
public void publish(String topic, String payload, int qos) throws Exception {
String body = String.format(
"{\"topic\":\"%s\",\"payload\":\"%s\",\"qos\":%d}",
topic, payload, qos);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/publish"))
.header("Authorization", "Basic " + credentials)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
System.out.println("发布结果: " + response.statusCode());
}
public static void main(String[] args) throws Exception {
AdmqRestClient client = new AdmqRestClient(
"localhost", 18083, "admin", "public");
client.publish("test/topic", "Hello from REST API", 1);
}
}9. 规则引擎与数据集成
9.1 规则引擎概述
ADMQ 内置规则引擎,支持使用类 SQL 语法对消息进行实时处理和路由,无需在应用层编写额外代码:
sql
-- 示例:筛选温度超过 30 度的消息,转发到 HTTP 接口
SELECT
payload.temperature AS temp,
topic,
clientid,
timestamp
FROM
"sensors/#"
WHERE
payload.temperature > 309.2 消息桥接到其他 MQTT Broker
通过 MQTT Bridge 将消息转发到另一个 MQTT 服务:
ini
bridges.mqtt.to_cloud {
enable = true
server = "ssl://cloud.example.com:8883"
proto_ver = 5
clientid = "admq-bridge-to-cloud"
ssl {
enable = true
cacertfile = "${ADMQ_ETC_DIR}/ssl/cloud-ca.pem"
}
ingress {
remote.topic = "cloud/#"
local.topic = "from-cloud/${topic}"
}
egress {
local.topic = "upload/#"
remote.topic = "from-edge/${topic}"
remote.qos = 1
}
}9.3 消息桥接到 HTTP 服务
ini
bridges.http.to_backend {
enable = true
url = "http://backend.example.com/mqtt/message"
method = post
headers {
"Content-Type" = "application/json"
"X-API-Key" = "your-api-key"
}
body = "${payload}"
}10. 网关协议集成
10.1 MQTT-SN(传感器网络)
适用于资源受限的传感器设备:
ini
gateway.mqttsn {
enable = true
gateway_id = 1
broadcast = true
enable_qos3 = true
listeners.udp.default {
bind = "0.0.0.0:1884"
}
}10.2 CoAP
适用于使用 CoAP 协议的 IoT 设备:
ini
gateway.coap {
enable = true
mountpoint = "coap/"
connection_required = false
listeners.udp.default {
bind = "0.0.0.0:5683"
}
}CoAP 消息发布示例(等价于 MQTT publish):
PUT coap://localhost:5683/ps/sensors/temperature
Payload: 25.510.3 STOMP
适用于需要通过 STOMP 协议接入的系统(如部分消息队列客户端):
ini
gateway.stomp {
enable = true
frame.max_headers = 10
frame.max_header_length = 1024
frame.max_body_length = 65536
listeners.tcp.default {
bind = "0.0.0.0:61613"
}
}11. 最佳实践
11.1 Client ID 管理
// 推荐格式
{产品线}-{设备类型}-{设备序列号}
// 示例
factory-sensor-SN20240101001
webapp-browser-uuid-xxx
backend-service-order-processor- 每个客户端使用唯一且固定的 Client ID
- 不要动态生成随机 Client ID(会导致持久会话无法恢复)
- 对设备管理场景,Client ID 最好能映射到设备资产 ID
11.2 QoS 选择策略
| 场景 | 推荐 QoS | 理由 |
|---|---|---|
| 传感器高频数据(每秒多次) | QoS 0 | 偶发丢失可接受,低延迟 |
| 设备状态上报 | QoS 1 | 保证送达,允许重复 |
| 控制指令下发 | QoS 1 或 2 | 必须执行,QoS 2 避免重复执行 |
| 支付/关键业务事件 | QoS 2 | 严格一次语义 |
11.3 主题设计规范
// 推荐:层次清晰,可扩展
{区域}/{设备类型}/{设备ID}/{数据类型}
// 示例
factory/sensor/SN001/temperature
factory/sensor/SN001/humidity
factory/actuator/AC001/cmd
factory/actuator/AC001/status
// 规则:
// - 不以 / 开头或结尾
// - 不使用空格
// - 区分大小写(temperature ≠ Temperature)
// - 系统保留主题以 $ 开头(如 $SYS/、$share/)11.4 重连策略
python
# Python 示例:指数退避重连
import time
import paho.mqtt.client as mqtt
MAX_RETRY_INTERVAL = 60 # 最大重连间隔(秒)
retry_interval = 1
def on_disconnect(client, userdata, rc):
global retry_interval
if rc != 0:
print(f"意外断开,{retry_interval}秒后重连")
time.sleep(retry_interval)
retry_interval = min(retry_interval * 2, MAX_RETRY_INTERVAL)
try:
client.reconnect()
retry_interval = 1 # 重连成功后重置
except Exception as e:
print(f"重连失败: {e}")11.5 消息 Payload 规范
推荐使用 JSON 格式:
json
{
"device_id": "SN001",
"timestamp": 1717430400,
"data": {
"temperature": 25.5,
"humidity": 60.2
},
"version": "1.0"
}注意事项:
- Payload 默认最大 1MB(
max_packet_size),大文件建议改用文件存储+MQTT传递URL - 二进制数据需 Base64 编码后传输
- 时间戳使用 Unix 时间戳(秒),避免时区问题
11.6 保留消息使用规范
适合用于保留消息的数据:
- 设备在线/离线状态
- 设备最新配置
- 系统当前运行模式
不适合的数据:
- 高频变化的传感器数值(每次更新都会存储)
- 一次性事件(用 QoS 1 即可)
11.7 遗嘱消息最佳实践
python
# 上线时发布"在线"保留消息
client.publish(f"status/{device_id}/online", "online", qos=1, retain=True)
# 连接时设置遗嘱:异常断线时自动发布"离线"
client.will_set(
topic=f"status/{device_id}/online",
payload="offline",
qos=1,
retain=True
)11.8 安全建议
- 生产环境必须启用 TLS,禁止在公网使用明文 1883 端口
- 每个设备使用独立的账号,避免共享同一用户名密码
- 启用 ACL 授权,限制设备只能访问自己的主题命名空间
- 定期轮换密码,配合 HTTP 认证后端实现动态认证
- 修改默认管理员密码,控制台端口不要暴露到外网
如有更多集成问题,请联系 Apusic 技术支持。