Skip to content

Apusic ADMQ MQTT 消息中间件用户手册

版本: 1.0 适用产品: Apusic ADMQ MQTT 模块 更新日期: 2026-06


目录

  1. 产品概述
  2. 协议支持
  3. 安装与启动
  4. 连接端口说明
  5. 管理控制台
  6. 核心功能说明
  7. 认证与授权
  8. 配置文件说明
  9. TLS/SSL 加密连接
  10. 集群部署
  11. 日志管理
  12. 监控与告警
  13. 常见问题

1. 产品概述

Apusic ADMQ 是金蝶天燕(Apusic)推出的企业级消息中间件套件,MQTT 模块专为物联网(IoT)场景设计,提供高性能、高可靠的 MQTT 消息服务。

主要特性:

  • 全面兼容 MQTT 3.1 / 3.1.1 / 5.0 协议
  • 支持百万级并发连接(单节点默认上限 1,024,000)
  • 支持 TCP、TLS、WebSocket、WebSocket Secure、QUIC 多种传输协议
  • 内置 Web 管理控制台,可视化运维
  • 支持多节点集群,水平扩展
  • 丰富的认证与授权机制
  • 支持 CoAP、MQTT-SN、LwM2M、STOMP 等多协议网关

ADMQ 套件组成:

ADMQ 是多协议消息中间件套件,除 MQTT 外,同一套框架还支持 Kafka、RabbitMQ、ActiveMQ、RocketMQ、Pulsar 等消息中间件的统一管理。本手册仅针对 MQTT 模块。


2. 协议支持

2.1 MQTT 协议版本

协议版本支持状态说明
MQTT 3.1✅ 支持兼容旧版客户端
MQTT 3.1.1✅ 支持最广泛使用的版本
MQTT 5.0✅ 支持最新版本,扩展特性丰富

三个版本可同时运行,服务端自动协商客户端协议版本,无需额外配置。

2.2 传输协议

传输协议默认端口说明
MQTT over TCP1883标准明文连接
MQTT over TLS/SSL8883加密连接(需配置证书)
MQTT over WebSocket8083浏览器及 HTTP 代理穿透场景
MQTT over WSS8084WebSocket + TLS 加密
MQTT over QUIC14567弱网环境,低延迟

2.3 WebSocket 子协议

WebSocket 连接时支持以下子协议名:

  • mqtt
  • mqtt-v3
  • mqtt-v3.1.1
  • mqtt-v5

2.4 QoS 支持

QoS 级别语义说明
QoS 0最多一次(At most once)不确认,最低开销
QoS 1至少一次(At least once)确认送达,可能重复
QoS 2恰好一次(Exactly once)严格确认,无重复

3. 安装与启动

3.1 目录结构

admq-mqtt/
├── bin/            # 启动脚本
│   ├── admq        # 主命令行工具
│   ├── admq-daemon # 守护进程管理工具
│   └── mqtt/       # MQTT 子命令脚本
├── config/         # 全局配置文件
│   └── mqtt.conf   # MQTT 主配置文件
├── mqtt/           # MQTT 实例目录
│   └── <节点ID>/   # 各节点配置与数据
│       └── etc/    # 节点配置
├── logs/           # 日志目录
└── service/        # Systemd 服务文件

3.2 启动服务

前台启动(调试用):

bash
./bin/admq mqtt server

后台守护进程启动(生产推荐):

bash
./bin/admq-daemon start mqtt server

停止服务:

bash
./bin/admq-daemon stop mqtt server

重启服务:

bash
./bin/admq-daemon restart mqtt server

3.3 Systemd 托管(Linux)

如已安装 Systemd 服务文件:

bash
# 启动
systemctl start admq-mqtt

# 停止
systemctl stop admq-mqtt

# 重启
systemctl restart admq-mqtt

# 开机自启
systemctl enable admq-mqtt

# 查看状态
systemctl status admq-mqtt

3.4 确认服务运行

bash
# 查看端口监听
netstat -tlnp | grep -E '1883|8083|18083'

# 或使用 ss
ss -tlnp | grep -E '1883|8083|18083'

4. 连接端口说明

端口用途默认开启
1883MQTT TCP 明文连接✅ 是
8883MQTT TLS/SSL 加密连接❌ 需配置
8083MQTT WebSocket 连接✅ 是
8084MQTT WebSocket Secure (WSS)❌ 需配置
14567MQTT over QUIC❌ 需配置
18083Web 管理控制台 (HTTP)✅ 是
18084Web 管理控制台 (HTTPS)❌ 需配置

防火墙提示: 生产环境请根据实际需要开放对应端口,建议不要将管理控制台端口(18083/18084)暴露到外网。


5. 管理控制台

5.1 访问地址

浏览器访问:http://<服务器IP>:18083

默认账号:

  • 用户名:admin
  • 密码:public

安全提示: 首次登录后请立即修改默认密码。

5.2 主要功能

功能模块说明
概览实时连接数、消息统计、节点状态
连接管理查看在线客户端,支持踢除连接
订阅管理查看当前所有主题订阅关系
主题主题列表与消息流量统计
保留消息查看与管理保留消息
认证用户内置数据库用户管理
授权规则ACL 规则配置
插件插件管理
告警系统告警信息
设置系统参数调整

5.3 REST API

管理控制台提供完整的 REST API,可用于自动化运维:

  • API 基础路径:http://<服务器IP>:18083/api/v5/
  • API 文档(Swagger):http://<服务器IP>:18083/api-docs

示例:查询在线客户端

bash
curl -u admin:public http://localhost:18083/api/v5/clients

示例:发布消息

bash
curl -u admin:public -X POST http://localhost:18083/api/v5/publish \
  -H "Content-Type: application/json" \
  -d '{"topic": "test/topic", "payload": "Hello ADMQ", "qos": 1}'

6. 核心功能说明

6.1 保留消息(Retained Messages)

保留消息是发布到某个主题时带有 RETAIN 标志的消息。新订阅该主题的客户端会立即收到最新一条保留消息。

配置示例(config/mqtt.conf):

ini
retainer {
    enable = true
    backend {
        type = built_in_database
        storage_type = disc        # ram(内存)或 disc(持久化)
        max_retained_messages = 0  # 0 表示不限制
        index_specs = [
            [1, 2, 3], [1, 3], [2, 3], [3]
        ]
    }
    msg_expiry_interval = 0s       # 消息过期时间,0 表示永不过期
    deliver_rate = infinity        # 每会话投递速率
}

6.2 遗嘱消息(Will Messages)

客户端在连接时可设置遗嘱消息,当连接异常断开时服务端自动发布该消息,用于通知其他订阅者。遗嘱消息是 MQTT 协议内置功能,无需额外配置,客户端在连接时指定即可。

6.3 共享订阅(Shared Subscriptions)

共享订阅允许多个订阅者共同消费同一主题的消息,实现负载均衡。

订阅格式:

$share/<组名>/<主题>

示例:

$share/worker-group/sensors/temperature

负载均衡策略(可配置):

策略说明
round_robin轮询(默认)
random随机选择
round_robin_per_group按组轮询
local优先本节点订阅者
sticky粘滞,保持上次选择的订阅者
hash_clientid按发布者 Client ID 哈希
hash_topic按主题哈希

6.4 通配符订阅

通配符含义示例
+匹配单个层级sensors/+/temp 匹配 sensors/room1/temp
#匹配多个层级(必须在末尾)sensors/# 匹配所有传感器主题

6.5 持久会话

对于 MQTT 3.1.1 客户端,连接时设置 Clean Session = false 即可开启持久会话。 对于 MQTT 5.0 客户端,通过 Session Expiry Interval 控制会话保持时长。

默认会话过期时间: 2 小时(可配置)

ini
mqtt {
    session_expiry_interval = 2h  # 支持 s/m/h/d 等单位
}

6.6 消息队列

客户端离线期间,服务端会将 QoS 1/2 消息存入队列,待客户端重连后投递。

ini
mqtt {
    max_mqueue_len = 1000           # 离线消息队列最大长度
    mqueue_store_qos0 = true        # 是否对 QoS 0 消息也排队
}

6.7 消息飞行窗口

控制同时在途(未确认)的消息数量:

ini
mqtt {
    max_inflight = 32               # 最大飞行消息数
    retry_interval = 30s            # QoS 1/2 重发间隔
    max_awaiting_rel = 100          # QoS 2 等待 PUBREL 最大数
    await_rel_timeout = 300s        # 等待 PUBREL 超时
}

6.8 MQTT 5.0 特有功能

特性说明
主题别名减少重复主题名传输开销,最大支持 65535 个别名
消息过期时间消息级别的 TTL 控制
订阅标识符区分来自不同订阅的消息
用户属性消息携带自定义键值对
请求/响应模式Response Topic + Correlation Data
流量控制接收最大值(Receive Maximum)
原因码更详细的操作结果反馈

7. 认证与授权

7.1 认证方式

ADMQ MQTT 支持多种认证后端,可以同时启用多个,按顺序尝试:

认证方式说明
内置数据库默认方式,账号存储在本地数据库,通过控制台或 API 管理
HTTP调用外部 HTTP 接口进行认证
JWTJSON Web Token 验证
LDAP对接企业 LDAP / Active Directory
MySQL从 MySQL 数据库查询验证
PostgreSQL从 PostgreSQL 数据库查询验证
MongoDB从 MongoDB 查询验证
Redis从 Redis 查询验证
PSKTLS 预共享密钥认证
证书客户端 TLS 证书 CN/DN 字段作为用户名

示例:使用 HTTP 认证

ini
authentication = [
    {
        mechanism = password_based
        backend = http
        enable = true
        method = post
        url = "http://auth.example.com/mqtt/auth"
        body {
            username = "${username}"
            password = "${password}"
            clientid = "${clientid}"
        }
        headers {
            "Content-Type" = "application/json"
        }
    }
]

7.2 授权(ACL)

控制客户端对主题的发布/订阅权限:

ini
authorization {
    sources = [
        {
            type = built_in_database
            enable = true
        }
    ]
    no_match = allow          # 无匹配规则时:allow(允许)或 deny(拒绝)
    deny_action = ignore      # 拒绝时动作:ignore(忽略)或 disconnect(断开)
    cache {
        enable = true
        max_size = 32
        ttl = "1m"            # 缓存有效期
    }
}

主题占位符(可在 ACL 规则中使用):

  • ${username} — 当前客户端用户名
  • ${clientid} — 当前客户端 ID
  • ${cert_common_name} — 客户端证书 CN 字段

示例 ACL 规则(内置数据库格式):

主题动作效果
data/${username}/#publish只能发布到自己命名空间下的主题
broadcast/#subscribe允许订阅广播主题

8. 配置文件说明

8.1 主配置文件

路径:config/mqtt.conf

这是 ADMQ MQTT 模块的主配置入口,修改后需重启服务生效。

ini
# 节点标识
node.name = "admq@127.0.0.1"     # 集群时需设置为实际 IP
node.cookie = "admqsecretcookie"  # 集群节点通信密钥,集群内所有节点必须相同

# TCP 监听
listeners.tcp.default {
    bind = "0.0.0.0:1883"
    max_connections = 1024000
}

# WebSocket 监听
listeners.ws.default {
    bind = "0.0.0.0:8083"
}

# 管理控制台
dashboard {
    listeners.http {
        bind = "0.0.0.0:18083"
    }
    default_username = "admin"
    default_password = "public"
}

8.2 MQTT 协议参数

ini
mqtt {
    idle_timeout = 15s              # 等待 CONNECT 包超时时间
    max_packet_size = 1MB           # 最大报文大小
    max_clientid_len = 65535        # 客户端 ID 最大长度
    max_topic_levels = 128          # 主题最大层级数
    max_qos_allowed = 2             # 允许的最大 QoS
    max_topic_alias = 65535         # 最大主题别名数(MQTT 5.0)
    retain_available = true         # 是否启用保留消息
    wildcard_subscription = true    # 是否允许通配符订阅
    shared_subscription = true      # 是否允许共享订阅
    exclusive_subscription = false  # 是否允许排他订阅
    max_subscriptions = infinity    # 每个客户端最大订阅数
    max_inflight = 32               # 飞行窗口大小
    session_expiry_interval = 2h    # 会话过期时间(MQTT 3.x)
    max_mqueue_len = 1000           # 离线消息队列长度
    shared_subscription_strategy = round_robin  # 共享订阅策略
}

8.3 日志配置

ini
log {
    file {
        level = warning             # 文件日志级别
        rotation_size = "100MB"     # 单文件最大尺寸
        rotation_count = 5          # 保留的历史日志文件数
    }
    console {
        enable = true
        level = warning             # 控制台日志级别
    }
}

日志级别从低到高:debuginfonoticewarningerrorcriticalalertemergency

8.4 限速配置

在监听器上配置速率限制:

ini
listeners.tcp.default {
    bind = "0.0.0.0:1883"
    max_connections = 1024000
    max_conn_rate = "1000/s"    # 每秒最大新建连接数
    messages_rate = "1000/s"    # 每客户端每秒最大消息数
    bytes_rate = "1MB/s"        # 每客户端每秒最大字节数
}

9. TLS/SSL 加密连接

9.1 准备证书

需要以下三个文件(PEM 格式):

  • cacert.pem — CA 根证书
  • cert.pem — 服务端证书
  • key.pem — 服务端私钥

将证书文件放置于:mqtt/<节点ID>/etc/ssl/ 目录下。

9.2 启用 TLS 监听

config/mqtt.conf 中取消注释或添加:

ini
listeners.ssl.default {
    bind = "0.0.0.0:8883"
    ssl_options {
        cacertfile = "${ADMQ_ETC_DIR}/ssl/cacert.pem"
        certfile   = "${ADMQ_ETC_DIR}/ssl/cert.pem"
        keyfile    = "${ADMQ_ETC_DIR}/ssl/key.pem"
        versions = [tlsv1.3, tlsv1.2]
        verify = verify_none             # 不验证客户端证书
        # verify = verify_peer           # 验证客户端证书(双向 TLS)
    }
}

9.3 双向 TLS(mTLS)

需要客户端提供证书:

ini
listeners.ssl.default {
    bind = "0.0.0.0:8883"
    ssl_options {
        cacertfile = "${ADMQ_ETC_DIR}/ssl/cacert.pem"
        certfile   = "${ADMQ_ETC_DIR}/ssl/cert.pem"
        keyfile    = "${ADMQ_ETC_DIR}/ssl/key.pem"
        verify = verify_peer
        fail_if_no_peer_cert = true      # 客户端必须提供证书
    }
}

9.4 使用证书字段作为用户名

ini
mqtt {
    peer_cert_as_username = cn   # 使用证书 CN 字段作为用户名
    # peer_cert_as_username = dn # 使用 DN 字段
}

10. 集群部署

10.1 集群概述

ADMQ MQTT 支持多节点集群,实现水平扩展和高可用。集群内各节点共享客户端连接信息、订阅关系和消息路由。

10.2 集群发现方式

方式一:手动(Manual)

ini
cluster {
    name = admq_mqtt_cluster
    discovery_strategy = manual
}

启动后通过命令手动加入:

bash
./bin/admq mqtt cluster join admq@node2.example.com

方式二:静态列表(Static)

ini
cluster {
    name = admq_mqtt_cluster
    discovery_strategy = static
    static {
        seeds = ["admq@node1.example.com", "admq@node2.example.com"]
    }
}

方式三:DNS 发现

ini
cluster {
    name = admq_mqtt_cluster
    discovery_strategy = dns
    dns {
        name = "mqtt.example.com"
        record_type = a
    }
}

方式四:Kubernetes 环境

ini
cluster {
    name = admq_mqtt_cluster
    discovery_strategy = k8s
    k8s {
        apiserver = "https://kubernetes.default.svc:443"
        service_name = admq-mqtt
        address_type = ip
        namespace = default
    }
}

方式五:etcd 发现

ini
cluster {
    name = admq_mqtt_cluster
    discovery_strategy = etcd
    etcd {
        server = "http://etcd1.example.com:2379"
        prefix = "/admq/mqtt"
        node_ttl = "1m"
    }
}

10.3 集群管理命令

bash
# 查看集群状态
./bin/admq mqtt cluster status

# 加入集群
./bin/admq mqtt cluster join admq@node2.example.com

# 离开集群
./bin/admq mqtt cluster leave

# 强制移除节点
./bin/admq mqtt cluster force-leave admq@node3.example.com

11. 日志管理

日志文件默认路径:logs/

文件说明
mqtt-server-<hostname>.log主日志文件
mqtt-server-<hostname>.out标准输出(启动信息等)

调整日志级别(临时生效):

可通过管理控制台 "设置 → 日志" 界面实时调整,无需重启。

日志轮转:

  • 单文件最大:100MB(可配置)
  • 最多保留:5 个历史文件(可配置)

12. 监控与告警

12.1 Prometheus 集成

启用 Prometheus 指标采集:

ini
prometheus {
    enable = true
    push_gateway_server = "http://prometheus-pushgateway:9091"
    interval = "15s"
    headers {}
    job_name = "admq-mqtt"
}

指标拉取端点(无需推送网关):

http://<服务器IP>:18083/api/v5/prometheus/stats

12.2 内置系统监控

ini
sysmon {
    vm {
        process_check_interval = 30s
        process_high_watermark = 0.80     # 进程数占比告警阈值
        process_low_watermark = 0.60
        long_gc = 100ms                   # 垃圾回收耗时告警
        long_schedule = 100ms             # 调度耗时告警
        large_heap = 32MB                 # 堆内存告警阈值
    }
    os {
        cpu_check_interval = 60s
        cpu_high_watermark = 0.80
        cpu_low_watermark = 0.60
        mem_check_interval = 60s
        sysmem_high_watermark = 0.70      # 系统内存告警阈值
        procmem_high_watermark = 0.05     # 进程内存告警阈值
    }
}

12.3 慢订阅检测

识别消息处理缓慢的客户端:

ini
slow_subs {
    enable = true
    threshold = 500ms
    expire_interval = 300s
    top_k_num = 10
    stats_type = whole
}

13. 常见问题

Q1:客户端连接被拒绝,返回 CONNACK 5(未授权)

原因: 用户名密码认证失败或没有匹配的认证规则。

排查步骤:

  1. 检查 config/mqtt.conf 中的 authentication 配置
  2. 通过控制台"认证用户"页面确认账号是否存在
  3. 临时将 no_match 设为 allow 以旁路认证测试连通性

Q2:客户端能连接但无法发布/订阅

原因: ACL 授权规则拒绝了该操作。

排查步骤:

  1. 查看 authorization.no_match 配置
  2. 通过控制台"授权规则"页面检查规则
  3. 临时设置 no_match = allow 测试

Q3:连接数无法超过系统限制

排查:

bash
# 查看系统文件描述符限制
ulimit -n

# 临时调大
ulimit -n 1000000

也可在 Systemd 服务文件中配置:

ini
[Service]
LimitNOFILE=1000000

Q4:消息丢失问题

可能原因:

  • 使用了 QoS 0,网络抖动导致丢失 → 改用 QoS 1 或 QoS 2
  • 离线消息队列满(max_mqueue_len)→ 调大队列长度
  • 消息过期(message_expiry_interval)→ 调整过期时间

Q5:WebSocket 连接失败

检查点:

  1. 确认 listeners.ws.default 已配置且服务已重启
  2. 检查防火墙是否放行 8083 端口
  3. 浏览器 WebSocket 连接时,子协议需指定为 mqtt

如有更多技术问题,请联系 Apusic 技术支持。

金蝶天燕(Apusic)企业级消息中间件套件