外观
Apusic ADMQ MQTT 消息中间件用户手册
版本: 1.0 适用产品: Apusic ADMQ MQTT 模块 更新日期: 2026-06
目录
1. 产品概述
Apusic ADMQ 是金蝶天燕(Apusic)推出的企业级消息中间件套件,MQTT 模块专为物联网(IoT)场景设计,提供高性能、高可靠的 MQTT 消息服务。
主要特性:
- 全面兼容 MQTT 3.1 / 3.1.1 / 5.0 协议
- 支持百万级并发连接(单节点默认上限 1,024,000)
- 支持 TCP、TLS、WebSocket、WebSocket Secure、QUIC 多种传输协议
- 内置 Web 管理控制台,可视化运维
- 支持多节点集群,水平扩展
- 丰富的认证与授权机制
- 支持 CoAP、MQTT-SN、LwM2M、STOMP 等多协议网关
ADMQ 套件组成:
ADMQ 是多协议消息中间件套件,除 MQTT 外,同一套框架还支持 Kafka、RabbitMQ、ActiveMQ、RocketMQ、Pulsar 等消息中间件的统一管理。本手册仅针对 MQTT 模块。
2. 协议支持
2.1 MQTT 协议版本
| 协议版本 | 支持状态 | 说明 |
|---|---|---|
| MQTT 3.1 | ✅ 支持 | 兼容旧版客户端 |
| MQTT 3.1.1 | ✅ 支持 | 最广泛使用的版本 |
| MQTT 5.0 | ✅ 支持 | 最新版本,扩展特性丰富 |
三个版本可同时运行,服务端自动协商客户端协议版本,无需额外配置。
2.2 传输协议
| 传输协议 | 默认端口 | 说明 |
|---|---|---|
| MQTT over TCP | 1883 | 标准明文连接 |
| MQTT over TLS/SSL | 8883 | 加密连接(需配置证书) |
| MQTT over WebSocket | 8083 | 浏览器及 HTTP 代理穿透场景 |
| MQTT over WSS | 8084 | WebSocket + TLS 加密 |
| MQTT over QUIC | 14567 | 弱网环境,低延迟 |
2.3 WebSocket 子协议
WebSocket 连接时支持以下子协议名:
mqttmqtt-v3mqtt-v3.1.1mqtt-v5
2.4 QoS 支持
| QoS 级别 | 语义 | 说明 |
|---|---|---|
| QoS 0 | 最多一次(At most once) | 不确认,最低开销 |
| QoS 1 | 至少一次(At least once) | 确认送达,可能重复 |
| QoS 2 | 恰好一次(Exactly once) | 严格确认,无重复 |
3. 安装与启动
3.1 目录结构
admq-mqtt/
├── bin/ # 启动脚本
│ ├── admq # 主命令行工具
│ ├── admq-daemon # 守护进程管理工具
│ └── mqtt/ # MQTT 子命令脚本
├── config/ # 全局配置文件
│ └── mqtt.conf # MQTT 主配置文件
├── mqtt/ # MQTT 实例目录
│ └── <节点ID>/ # 各节点配置与数据
│ └── etc/ # 节点配置
├── logs/ # 日志目录
└── service/ # Systemd 服务文件3.2 启动服务
前台启动(调试用):
bash
./bin/admq mqtt server后台守护进程启动(生产推荐):
bash
./bin/admq-daemon start mqtt server停止服务:
bash
./bin/admq-daemon stop mqtt server重启服务:
bash
./bin/admq-daemon restart mqtt server3.3 Systemd 托管(Linux)
如已安装 Systemd 服务文件:
bash
# 启动
systemctl start admq-mqtt
# 停止
systemctl stop admq-mqtt
# 重启
systemctl restart admq-mqtt
# 开机自启
systemctl enable admq-mqtt
# 查看状态
systemctl status admq-mqtt3.4 确认服务运行
bash
# 查看端口监听
netstat -tlnp | grep -E '1883|8083|18083'
# 或使用 ss
ss -tlnp | grep -E '1883|8083|18083'4. 连接端口说明
| 端口 | 用途 | 默认开启 |
|---|---|---|
| 1883 | MQTT TCP 明文连接 | ✅ 是 |
| 8883 | MQTT TLS/SSL 加密连接 | ❌ 需配置 |
| 8083 | MQTT WebSocket 连接 | ✅ 是 |
| 8084 | MQTT WebSocket Secure (WSS) | ❌ 需配置 |
| 14567 | MQTT over QUIC | ❌ 需配置 |
| 18083 | Web 管理控制台 (HTTP) | ✅ 是 |
| 18084 | Web 管理控制台 (HTTPS) | ❌ 需配置 |
防火墙提示: 生产环境请根据实际需要开放对应端口,建议不要将管理控制台端口(18083/18084)暴露到外网。
5. 管理控制台
5.1 访问地址
浏览器访问:http://<服务器IP>:18083
默认账号:
- 用户名:
admin - 密码:
public
安全提示: 首次登录后请立即修改默认密码。
5.2 主要功能
| 功能模块 | 说明 |
|---|---|
| 概览 | 实时连接数、消息统计、节点状态 |
| 连接管理 | 查看在线客户端,支持踢除连接 |
| 订阅管理 | 查看当前所有主题订阅关系 |
| 主题 | 主题列表与消息流量统计 |
| 保留消息 | 查看与管理保留消息 |
| 认证用户 | 内置数据库用户管理 |
| 授权规则 | ACL 规则配置 |
| 插件 | 插件管理 |
| 告警 | 系统告警信息 |
| 设置 | 系统参数调整 |
5.3 REST API
管理控制台提供完整的 REST API,可用于自动化运维:
- API 基础路径:
http://<服务器IP>:18083/api/v5/ - API 文档(Swagger):
http://<服务器IP>:18083/api-docs
示例:查询在线客户端
bash
curl -u admin:public http://localhost:18083/api/v5/clients示例:发布消息
bash
curl -u admin:public -X POST http://localhost:18083/api/v5/publish \
-H "Content-Type: application/json" \
-d '{"topic": "test/topic", "payload": "Hello ADMQ", "qos": 1}'6. 核心功能说明
6.1 保留消息(Retained Messages)
保留消息是发布到某个主题时带有 RETAIN 标志的消息。新订阅该主题的客户端会立即收到最新一条保留消息。
配置示例(config/mqtt.conf):
ini
retainer {
enable = true
backend {
type = built_in_database
storage_type = disc # ram(内存)或 disc(持久化)
max_retained_messages = 0 # 0 表示不限制
index_specs = [
[1, 2, 3], [1, 3], [2, 3], [3]
]
}
msg_expiry_interval = 0s # 消息过期时间,0 表示永不过期
deliver_rate = infinity # 每会话投递速率
}6.2 遗嘱消息(Will Messages)
客户端在连接时可设置遗嘱消息,当连接异常断开时服务端自动发布该消息,用于通知其他订阅者。遗嘱消息是 MQTT 协议内置功能,无需额外配置,客户端在连接时指定即可。
6.3 共享订阅(Shared Subscriptions)
共享订阅允许多个订阅者共同消费同一主题的消息,实现负载均衡。
订阅格式:
$share/<组名>/<主题>示例:
$share/worker-group/sensors/temperature负载均衡策略(可配置):
| 策略 | 说明 |
|---|---|
round_robin | 轮询(默认) |
random | 随机选择 |
round_robin_per_group | 按组轮询 |
local | 优先本节点订阅者 |
sticky | 粘滞,保持上次选择的订阅者 |
hash_clientid | 按发布者 Client ID 哈希 |
hash_topic | 按主题哈希 |
6.4 通配符订阅
| 通配符 | 含义 | 示例 |
|---|---|---|
+ | 匹配单个层级 | sensors/+/temp 匹配 sensors/room1/temp |
# | 匹配多个层级(必须在末尾) | sensors/# 匹配所有传感器主题 |
6.5 持久会话
对于 MQTT 3.1.1 客户端,连接时设置 Clean Session = false 即可开启持久会话。 对于 MQTT 5.0 客户端,通过 Session Expiry Interval 控制会话保持时长。
默认会话过期时间: 2 小时(可配置)
ini
mqtt {
session_expiry_interval = 2h # 支持 s/m/h/d 等单位
}6.6 消息队列
客户端离线期间,服务端会将 QoS 1/2 消息存入队列,待客户端重连后投递。
ini
mqtt {
max_mqueue_len = 1000 # 离线消息队列最大长度
mqueue_store_qos0 = true # 是否对 QoS 0 消息也排队
}6.7 消息飞行窗口
控制同时在途(未确认)的消息数量:
ini
mqtt {
max_inflight = 32 # 最大飞行消息数
retry_interval = 30s # QoS 1/2 重发间隔
max_awaiting_rel = 100 # QoS 2 等待 PUBREL 最大数
await_rel_timeout = 300s # 等待 PUBREL 超时
}6.8 MQTT 5.0 特有功能
| 特性 | 说明 |
|---|---|
| 主题别名 | 减少重复主题名传输开销,最大支持 65535 个别名 |
| 消息过期时间 | 消息级别的 TTL 控制 |
| 订阅标识符 | 区分来自不同订阅的消息 |
| 用户属性 | 消息携带自定义键值对 |
| 请求/响应模式 | Response Topic + Correlation Data |
| 流量控制 | 接收最大值(Receive Maximum) |
| 原因码 | 更详细的操作结果反馈 |
7. 认证与授权
7.1 认证方式
ADMQ MQTT 支持多种认证后端,可以同时启用多个,按顺序尝试:
| 认证方式 | 说明 |
|---|---|
| 内置数据库 | 默认方式,账号存储在本地数据库,通过控制台或 API 管理 |
| HTTP | 调用外部 HTTP 接口进行认证 |
| JWT | JSON Web Token 验证 |
| LDAP | 对接企业 LDAP / Active Directory |
| MySQL | 从 MySQL 数据库查询验证 |
| PostgreSQL | 从 PostgreSQL 数据库查询验证 |
| MongoDB | 从 MongoDB 查询验证 |
| Redis | 从 Redis 查询验证 |
| PSK | TLS 预共享密钥认证 |
| 证书 | 客户端 TLS 证书 CN/DN 字段作为用户名 |
示例:使用 HTTP 认证
ini
authentication = [
{
mechanism = password_based
backend = http
enable = true
method = post
url = "http://auth.example.com/mqtt/auth"
body {
username = "${username}"
password = "${password}"
clientid = "${clientid}"
}
headers {
"Content-Type" = "application/json"
}
}
]7.2 授权(ACL)
控制客户端对主题的发布/订阅权限:
ini
authorization {
sources = [
{
type = built_in_database
enable = true
}
]
no_match = allow # 无匹配规则时:allow(允许)或 deny(拒绝)
deny_action = ignore # 拒绝时动作:ignore(忽略)或 disconnect(断开)
cache {
enable = true
max_size = 32
ttl = "1m" # 缓存有效期
}
}主题占位符(可在 ACL 规则中使用):
${username}— 当前客户端用户名${clientid}— 当前客户端 ID${cert_common_name}— 客户端证书 CN 字段
示例 ACL 规则(内置数据库格式):
| 主题 | 动作 | 效果 |
|---|---|---|
data/${username}/# | publish | 只能发布到自己命名空间下的主题 |
broadcast/# | subscribe | 允许订阅广播主题 |
8. 配置文件说明
8.1 主配置文件
路径:config/mqtt.conf
这是 ADMQ MQTT 模块的主配置入口,修改后需重启服务生效。
ini
# 节点标识
node.name = "admq@127.0.0.1" # 集群时需设置为实际 IP
node.cookie = "admqsecretcookie" # 集群节点通信密钥,集群内所有节点必须相同
# TCP 监听
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1024000
}
# WebSocket 监听
listeners.ws.default {
bind = "0.0.0.0:8083"
}
# 管理控制台
dashboard {
listeners.http {
bind = "0.0.0.0:18083"
}
default_username = "admin"
default_password = "public"
}8.2 MQTT 协议参数
ini
mqtt {
idle_timeout = 15s # 等待 CONNECT 包超时时间
max_packet_size = 1MB # 最大报文大小
max_clientid_len = 65535 # 客户端 ID 最大长度
max_topic_levels = 128 # 主题最大层级数
max_qos_allowed = 2 # 允许的最大 QoS
max_topic_alias = 65535 # 最大主题别名数(MQTT 5.0)
retain_available = true # 是否启用保留消息
wildcard_subscription = true # 是否允许通配符订阅
shared_subscription = true # 是否允许共享订阅
exclusive_subscription = false # 是否允许排他订阅
max_subscriptions = infinity # 每个客户端最大订阅数
max_inflight = 32 # 飞行窗口大小
session_expiry_interval = 2h # 会话过期时间(MQTT 3.x)
max_mqueue_len = 1000 # 离线消息队列长度
shared_subscription_strategy = round_robin # 共享订阅策略
}8.3 日志配置
ini
log {
file {
level = warning # 文件日志级别
rotation_size = "100MB" # 单文件最大尺寸
rotation_count = 5 # 保留的历史日志文件数
}
console {
enable = true
level = warning # 控制台日志级别
}
}日志级别从低到高:debug → info → notice → warning → error → critical → alert → emergency
8.4 限速配置
在监听器上配置速率限制:
ini
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1024000
max_conn_rate = "1000/s" # 每秒最大新建连接数
messages_rate = "1000/s" # 每客户端每秒最大消息数
bytes_rate = "1MB/s" # 每客户端每秒最大字节数
}9. TLS/SSL 加密连接
9.1 准备证书
需要以下三个文件(PEM 格式):
cacert.pem— CA 根证书cert.pem— 服务端证书key.pem— 服务端私钥
将证书文件放置于:mqtt/<节点ID>/etc/ssl/ 目录下。
9.2 启用 TLS 监听
在 config/mqtt.conf 中取消注释或添加:
ini
listeners.ssl.default {
bind = "0.0.0.0:8883"
ssl_options {
cacertfile = "${ADMQ_ETC_DIR}/ssl/cacert.pem"
certfile = "${ADMQ_ETC_DIR}/ssl/cert.pem"
keyfile = "${ADMQ_ETC_DIR}/ssl/key.pem"
versions = [tlsv1.3, tlsv1.2]
verify = verify_none # 不验证客户端证书
# verify = verify_peer # 验证客户端证书(双向 TLS)
}
}9.3 双向 TLS(mTLS)
需要客户端提供证书:
ini
listeners.ssl.default {
bind = "0.0.0.0:8883"
ssl_options {
cacertfile = "${ADMQ_ETC_DIR}/ssl/cacert.pem"
certfile = "${ADMQ_ETC_DIR}/ssl/cert.pem"
keyfile = "${ADMQ_ETC_DIR}/ssl/key.pem"
verify = verify_peer
fail_if_no_peer_cert = true # 客户端必须提供证书
}
}9.4 使用证书字段作为用户名
ini
mqtt {
peer_cert_as_username = cn # 使用证书 CN 字段作为用户名
# peer_cert_as_username = dn # 使用 DN 字段
}10. 集群部署
10.1 集群概述
ADMQ MQTT 支持多节点集群,实现水平扩展和高可用。集群内各节点共享客户端连接信息、订阅关系和消息路由。
10.2 集群发现方式
方式一:手动(Manual)
ini
cluster {
name = admq_mqtt_cluster
discovery_strategy = manual
}启动后通过命令手动加入:
bash
./bin/admq mqtt cluster join admq@node2.example.com方式二:静态列表(Static)
ini
cluster {
name = admq_mqtt_cluster
discovery_strategy = static
static {
seeds = ["admq@node1.example.com", "admq@node2.example.com"]
}
}方式三:DNS 发现
ini
cluster {
name = admq_mqtt_cluster
discovery_strategy = dns
dns {
name = "mqtt.example.com"
record_type = a
}
}方式四:Kubernetes 环境
ini
cluster {
name = admq_mqtt_cluster
discovery_strategy = k8s
k8s {
apiserver = "https://kubernetes.default.svc:443"
service_name = admq-mqtt
address_type = ip
namespace = default
}
}方式五:etcd 发现
ini
cluster {
name = admq_mqtt_cluster
discovery_strategy = etcd
etcd {
server = "http://etcd1.example.com:2379"
prefix = "/admq/mqtt"
node_ttl = "1m"
}
}10.3 集群管理命令
bash
# 查看集群状态
./bin/admq mqtt cluster status
# 加入集群
./bin/admq mqtt cluster join admq@node2.example.com
# 离开集群
./bin/admq mqtt cluster leave
# 强制移除节点
./bin/admq mqtt cluster force-leave admq@node3.example.com11. 日志管理
日志文件默认路径:logs/
| 文件 | 说明 |
|---|---|
mqtt-server-<hostname>.log | 主日志文件 |
mqtt-server-<hostname>.out | 标准输出(启动信息等) |
调整日志级别(临时生效):
可通过管理控制台 "设置 → 日志" 界面实时调整,无需重启。
日志轮转:
- 单文件最大:100MB(可配置)
- 最多保留:5 个历史文件(可配置)
12. 监控与告警
12.1 Prometheus 集成
启用 Prometheus 指标采集:
ini
prometheus {
enable = true
push_gateway_server = "http://prometheus-pushgateway:9091"
interval = "15s"
headers {}
job_name = "admq-mqtt"
}指标拉取端点(无需推送网关):
http://<服务器IP>:18083/api/v5/prometheus/stats12.2 内置系统监控
ini
sysmon {
vm {
process_check_interval = 30s
process_high_watermark = 0.80 # 进程数占比告警阈值
process_low_watermark = 0.60
long_gc = 100ms # 垃圾回收耗时告警
long_schedule = 100ms # 调度耗时告警
large_heap = 32MB # 堆内存告警阈值
}
os {
cpu_check_interval = 60s
cpu_high_watermark = 0.80
cpu_low_watermark = 0.60
mem_check_interval = 60s
sysmem_high_watermark = 0.70 # 系统内存告警阈值
procmem_high_watermark = 0.05 # 进程内存告警阈值
}
}12.3 慢订阅检测
识别消息处理缓慢的客户端:
ini
slow_subs {
enable = true
threshold = 500ms
expire_interval = 300s
top_k_num = 10
stats_type = whole
}13. 常见问题
Q1:客户端连接被拒绝,返回 CONNACK 5(未授权)
原因: 用户名密码认证失败或没有匹配的认证规则。
排查步骤:
- 检查
config/mqtt.conf中的authentication配置 - 通过控制台"认证用户"页面确认账号是否存在
- 临时将
no_match设为allow以旁路认证测试连通性
Q2:客户端能连接但无法发布/订阅
原因: ACL 授权规则拒绝了该操作。
排查步骤:
- 查看
authorization.no_match配置 - 通过控制台"授权规则"页面检查规则
- 临时设置
no_match = allow测试
Q3:连接数无法超过系统限制
排查:
bash
# 查看系统文件描述符限制
ulimit -n
# 临时调大
ulimit -n 1000000也可在 Systemd 服务文件中配置:
ini
[Service]
LimitNOFILE=1000000Q4:消息丢失问题
可能原因:
- 使用了 QoS 0,网络抖动导致丢失 → 改用 QoS 1 或 QoS 2
- 离线消息队列满(
max_mqueue_len)→ 调大队列长度 - 消息过期(
message_expiry_interval)→ 调整过期时间
Q5:WebSocket 连接失败
检查点:
- 确认
listeners.ws.default已配置且服务已重启 - 检查防火墙是否放行 8083 端口
- 浏览器 WebSocket 连接时,子协议需指定为
mqtt
如有更多技术问题,请联系 Apusic 技术支持。