外观
ADMQ ActiveMQ 用户手册
版本:1.0
目录
1. 产品概述
ADMQ ActiveMQ 是 Apusic 推出的企业级 JMS 消息中间件,集成了 Apusic 统一授权体系和监控上报能力,适合以下场景:
- 企业应用集成(EAI):系统间消息传递,替代点对点 API 调用
- JMS 应用迁移:无缝承接 Java EE / Jakarta EE 应用的 JMS 需求
- Apache Camel 集成:利用 Camel 实现复杂路由和 ETL
- 传统企业 ESB:作为轻量 ESB 的消息核心
核心能力
| 能力 | 说明 |
|---|---|
| JMS 1.1/2.0 | 完整的 Java 消息服务规范实现 |
| Queue(点对点) | 消息在队列中积累,至少一个消费者处理 |
| Topic(发布/订阅) | 一对多广播,支持持久订阅 |
| 多协议支持 | OpenWire(默认)、AMQP 1.0、STOMP、MQTT 3.1.1 |
| KahaDB 持久化 | 默认高性能文件数据库 |
| JDBC 持久化 | 支持 MySQL/PostgreSQL/Oracle 等关系数据库 |
| 消息调度器 | 延迟消息、定时消息、循环消息 |
| 死信队列 | 自动捕获处理失败的消息 |
| Apache Camel | 内置 Camel 2.25.4 路由引擎 |
端口一览
| 端口 | 协议 | 用途 | 默认状态 |
|---|---|---|---|
| 61616 | TCP | OpenWire(默认JMS协议) | 启用 |
| 8161 | HTTP | Web 管理控制台 / REST API | 启用 |
| 7072 | TCP | JMX 监控端口 | 启用 |
| 5672 | TCP | AMQP 1.0 | 禁用(可启用) |
| 61613 | TCP | STOMP | 禁用(可启用) |
| 1883 | TCP | MQTT 3.1.1 | 禁用(可启用) |
2. 系统要求
硬件建议
| 配置项 | 最低 | 推荐(生产) |
|---|---|---|
| CPU | 2 核 | 8 核+ |
| 内存 | 2 GB | 16 GB+ |
| 磁盘 | 20 GB | 500 GB SSD |
| 网络 | 100 Mbps | 1 Gbps |
软件要求
- 操作系统:Linux x86_64 / ARM64(CentOS 7+, Ubuntu 18.04+)、macOS(提供原生支持)
- Java:JDK 11+(产品内置,无需手动安装)
注意:ADMQ ActiveMQ 是四款 ADMQ 产品中唯一明确支持 macOS 的(
bin/macosx/目录存在 macOS 启动脚本)。
3. 安装与部署
3.1 目录结构
admq-activemq/
├── bin/ # 启动脚本
│ ├── linux/ # Linux 脚本
│ ├── macosx/ # macOS 脚本(独有)
│ ├── start.sh # ADMQ 统一启动
│ └── stop.sh
├── activemq/ # ActiveMQ 5.19.4 发行包
│ ├── bin/
│ │ ├── activemq # Linux 主控脚本
│ │ └── wrapper.jar # Java Service Wrapper
│ ├── lib/ # ActiveMQ 核心 JAR
│ │ ├── activemq-all-5.19.4.jar
│ │ ├── apusic-license-admq-2.0.jar # ← Apusic 注入
│ │ ├── apusic-license-core-2.0.jar # ← Apusic 注入
│ │ ├── admq-monitor-1.0.0-apusic.jar # ← Apusic 注入
│ │ └── acls-client-1.0.9.jar # ← Apusic 注入
│ ├── conf/
│ │ ├── activemq.xml # ActiveMQ 主配置
│ │ ├── jetty.xml # Web 控制台配置
│ │ ├── users.properties # 用户定义
│ │ ├── groups.properties # 组定义
│ │ ├── log4j2.properties # 日志配置
│ │ └── wrapper.conf # Java Service Wrapper 配置
│ ├── data/ # KahaDB 数据目录
│ └── webapps/ # Jetty Web 应用
│ └── admin/ # Web 控制台
├── config/
│ └── activemq-broker.conf # ADMQ 主配置
├── lib/ # Apusic 工具 JAR(备份)
├── license/ # 授权文件目录
└── logs/3.2 快速安装
bash
# 1. 解压安装包
tar -xzf admq-activemq-1.0.0.tar.gz -C /opt/
cd /opt/admq-activemq
# 2. 放置授权文件
cp your-license.xml license/
# 3. 修改主配置(可选)
vi config/activemq-broker.conf
# 4. 启动
./bin/start.sh3.3 macOS 安装(开发环境)
bash
# macOS 专用启动
./bin/macosx/start.sh
# 或使用原生 ActiveMQ 脚本
./activemq/bin/macosx/activemq start4. 配置说明
4.1 ADMQ 主配置 activemq-broker.conf
properties
# ===== 节点配置 =====
node_id=1
node_name=activemq-broker-1
cluster_name=admq-activemq-cluster
# ===== 网络配置 =====
broker_host=0.0.0.0
openwire_port=61616
web_console_port=8161
# ===== 协议开关 =====
amqp_enabled=false
amqp_port=5672
stomp_enabled=false
stomp_port=61613
mqtt_enabled=false
mqtt_port=1883
# ===== 持久化配置 =====
persistence_type=kahadb # kahadb 或 jdbc
kahadb_dir=/var/lib/admq-activemq/kahadb
# jdbc_url=jdbc:mysql://localhost/activemq
# jdbc_user=activemq
# jdbc_password=activemq
# ===== 监控上报 =====
monitor_role=activemq
monitor_username=__sys__monitor
monitor_password=11111111
manager_addr=http://admq-manager:8080
# ===== JMX =====
jmx_port=70724.2 ActiveMQ 核心配置 activemq/conf/activemq.xml
主配置文件采用 Spring XML 格式:
xml
<beans xmlns="http://www.springframework.org/schema/beans" ...>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}">
<!-- 传输协议配置 -->
<transportConnectors>
<!-- OpenWire(默认启用) -->
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!-- AMQP(默认禁用,取消注释启用) -->
<!-- <transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- STOMP(默认禁用) -->
<!-- <transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- MQTT(默认禁用) -->
<!-- <transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
</transportConnectors>
<!-- KahaDB 持久化 -->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!-- 死信队列策略 -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- 系统资源限制 -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
</broker>
</beans>4.3 JVM 调优 activemq/conf/wrapper.conf
properties
# JVM 内存配置
wrapper.java.additional.1=-Xms2G
wrapper.java.additional.2=-Xmx4G
wrapper.java.additional.3=-XX:+UseG1GC
wrapper.java.additional.4=-XX:MaxGCPauseMillis=20
wrapper.java.additional.5=-Dactivemq.data=/var/lib/admq-activemq4.4 Web 控制台配置 jetty.xml
重要 Bug:
jetty.xml中硬编码了生产环境 IP172.20.140.224(Apusic 测试环境地址)。部署时必须修改为实际 IP 或0.0.0.0。
xml
<!-- 需要修改这里的 host 配置 -->
<Set name="host">0.0.0.0</Set> <!-- 原来是 172.20.140.224 -->
<Set name="port">8161</Set>5. 启动与停止
bash
# 启动
./bin/start.sh
# 停止
./bin/stop.sh
# 查看状态
./bin/status.sh
# 查看实时日志
tail -f activemq/data/activemq.log
# 使用 activemq 脚本直接控制
./activemq/bin/activemq start
./activemq/bin/activemq stop
./activemq/bin/activemq status
./activemq/bin/activemq console # 前台运行(调试用)6. 目的地管理(Queue / Topic)
6.1 通过 Web 控制台管理
访问 http://localhost:8161/admin(默认用户名/密码:admin/admin)
- Queues 页面:查看队列列表、消息数量、消费者数量,支持发送测试消息、清空队列、删除队列
- Topics 页面:查看主题列表,查看订阅者数量
- Subscribers 页面:查看持久订阅者状态
6.2 通过命令行管理
ActiveMQ 提供 activemq-admin 工具:
bash
# 查看连接统计
./activemq/bin/activemq-admin query \
--jmxurl service:jmx:rmi:///jndi/rmi://localhost:7072/jmxrmi
# 发送测试消息
./activemq/bin/activemq producer \
--destination queue://TEST.QUEUE \
--message "Hello ActiveMQ" \
--messageCount 10
# 消费消息
./activemq/bin/activemq consumer \
--destination queue://TEST.QUEUE6.3 通过 REST API 管理
ActiveMQ 内置 Jolokia HTTP/JMX 桥接:
bash
# 查看队列统计
curl http://localhost:8161/api/jolokia/read/\
org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,\
destinationName=TEST.QUEUE/QueueSize
# 发送消息
curl -X POST http://localhost:8161/api/message/TEST.QUEUE?type=queue \
-H "Content-Type: application/json" \
-u admin:admin \
-d '{"body":"Hello","headers":{"JMSPriority":4}}'
# 清除队列
curl -X DELETE http://localhost:8161/api/message/TEST.QUEUE?type=queue \
-u admin:admin7. 用户与权限管理
7.1 用户定义 conf/users.properties
properties
# 格式:用户名=密码
admin=admin
user=user
publisher=publisher-secret
subscriber=subscriber-secret7.2 组定义 conf/groups.properties
properties
# 格式:组名=用户1,用户2,...
admins=admin
users=user,publisher,subscriber
publishers=publisher
subscribers=subscriber7.3 授权配置 activemq.xml
xml
<plugins>
<simpleAuthenticationPlugin anonymousAccessAllowed="false">
<users>
<authenticationUser username="admin" password="admin" groups="admins,users"/>
<authenticationUser username="publisher" password="pub-pass" groups="publishers,users"/>
<authenticationUser username="subscriber" password="sub-pass" groups="subscribers,users"/>
</users>
</simpleAuthenticationPlugin>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<!-- admin 拥有所有权限 -->
<authorizationEntry topic=">" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins"/>
<!-- publisher 可以写入 ORDER 开头的队列 -->
<authorizationEntry queue="ORDER.>" read="admins" write="publishers" admin="admins"/>
<!-- subscriber 可以读取 ORDER 队列 -->
<authorizationEntry queue="ORDER.>" read="subscribers" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>8. 消息持久化
8.1 KahaDB(默认)
KahaDB 是 ActiveMQ 的默认高性能文件数据库,基于 B-Tree 索引:
xml
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
journalMaxFileLength="32mb"
enableJournalDiskSyncs="false"
indexWriteBatchSize="10000"
indexCacheSize="10000"/>
</persistenceAdapter>KahaDB 文件结构:
kahadb/
├── db-1.log # 消息日志(journal),32MB 轮转
├── db-2.log
├── db.data # B-Tree 索引数据
├── db.redo # 崩溃恢复日志
└── lock # 排他锁文件8.2 JDBC 持久化(MySQL)
xml
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.data}"
dataSource="#mysql-ds"/>
</persistenceAdapter>
<!-- 在 beans 段定义数据源 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxTotal" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>8.3 消息调度器(延迟消息)
xml
<!-- 启用调度器插件 -->
<broker schedulerSupport="true" ...>java
// Java 代码发送延迟消息
Message msg = session.createTextMessage("延迟10秒发送");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000); // 10秒后投递
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5000); // 每5秒重复
msg.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3); // 重复3次
producer.send(msg);8.4 死信队列(DLQ)
默认配置下,处理失败(重新投递 6 次后)的消息进入 ActiveMQ.DLQ。
自定义 DLQ 策略:
xml
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 为每个 Queue 创建独立的 DLQ -->
<policyEntry queue="ORDER.>" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>9. 集群与高可用
9.1 Master-Slave(KahaDB 共享存储)
xml
<!-- Master 节点配置 -->
<broker brokerName="master" ...>
<transportConnectors>
<transportConnector uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
<persistenceAdapter>
<kahaDB directory="/shared/nfs/kahadb"/> <!-- NFS 共享目录 -->
</persistenceAdapter>
</broker>Slave 启动时会尝试获取 KahaDB 锁文件,Master 持有锁则 Slave 等待。Master 宕机后,Slave 自动获取锁成为新 Master。
9.2 Network of Brokers(分布式网格)
适合需要高吞吐的分布式场景:
xml
<!-- Broker 1 配置 -->
<networkConnectors>
<networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
name="bridge"
duplex="true"
networkTTL="3"
dynamicOnly="true"/>
</networkConnectors>10. 协议支持
10.1 启用 AMQP 1.0
修改 activemq.xml,取消 AMQP 注释:
xml
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?..."/>AMQP 使用场景:跨语言互操作(Python、.NET、Go 等)
10.2 启用 STOMP
xml
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?..."/>10.3 启用 MQTT
注意:ActiveMQ 的 MQTT 实现是协议适配器(类似 RabbitMQ MQTT 插件),不是原生 MQTT Broker。IoT 大规模场景建议使用 ADMQ MQTT。
xml
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?..."/>10.4 多协议统一端点(可选)
xml
<!-- 单端口自动检测协议 -->
<transportConnector name="auto"
uri="auto://0.0.0.0:61618?wireFormat.allowedFormats=default,STOMP,AMQP,MQTT"/>11. 监控与运维
11.1 Web 控制台
| URL | 说明 |
|---|---|
http://localhost:8161/admin | 主控制台(admin/admin) |
http://localhost:8161/api/jolokia | JMX over HTTP(Jolokia) |
http://localhost:8161/api/message/{dest} | REST 消息 API |
11.2 JMX 监控
bash
# 连接 JMX(需要 JDK 中的 jconsole)
jconsole service:jmx:rmi:///jndi/rmi://localhost:7072/jmxrmi11.3 关键 JMX 指标
| MBean | 属性 | 说明 |
|---|---|---|
Broker | TotalMessageCount | 所有队列消息总数 |
Queue=ORDER.* | QueueSize | 队列积压量 |
Queue=ORDER.* | ConsumerCount | 消费者数 |
Queue=ORDER.* | EnqueueCount | 入队消息数 |
Queue=ORDER.* | DequeueCount | 出队消息数 |
Broker | MemoryPercentUsage | 内存使用百分比 |
Broker | StorePercentUsage | 磁盘使用百分比 |
12. 安全配置
12.1 修改默认密码
properties
# conf/users.properties - 必须修改 admin 密码
admin=YOUR_STRONG_PASSWORD
# conf/jetty-realm.properties - Web 控制台密码
admin: YOUR_STRONG_PASSWORD, admin
user: user, user12.2 SSL/TLS 加密
xml
<!-- ssl+nio 协议 -->
<transportConnector name="ssl"
uri="ssl://0.0.0.0:61617?needClientAuth=false"/>properties
# conf/activemq.xml 中引用 SSL 配置
# 还需在 wrapper.conf 中配置 keystore
wrapper.java.additional.X=-Djavax.net.ssl.keyStore=/etc/activemq/ssl/keystore.jks
wrapper.java.additional.X=-Djavax.net.ssl.keyStorePassword=keystore-pass
wrapper.java.additional.X=-Djavax.net.ssl.trustStore=/etc/activemq/ssl/truststore.jks
wrapper.java.additional.X=-Djavax.net.ssl.trustStorePassword=trust-pass12.3 Jasypt 密码加密
ActiveMQ 5.19.4 集成 Jasypt 可对配置文件中的密码加密:
bash
# 加密密码
./activemq/bin/activemq encrypt --password masterpassword --input db-password
# 输出: ENC(xxxxx)xml
<!-- 在 activemq.xml 中使用加密密码 -->
<property name="password" value="ENC(xxxxx)"/>13. 常见问题
Q:Web 控制台无法访问(连接被拒绝)? A:检查 conf/jetty.xml 中是否存在硬编码 IP 172.20.140.224,修改为 0.0.0.0 或实际 IP。
Q:消息积压但消费者不消费? A:检查消费者是否 Pending Ack(事务未提交),查看 conf/activemq.xml 中的 systemUsage 是否达到内存/磁盘限制(Broker 会自动停止接收新消息)。
Q:KahaDB 日志文件过大? A:调整 journalMaxFileLength,或启用日志压缩(cleanupOnStop="true")。
Q:启动时 License 报错? A:确认 license/ 目录中有有效的 apusic.lic,检查授权文件是否过期,确认系统时间正确。
Q:消息乱序? A:Queue 模式下单消费者保证顺序;多消费者或 Network of Brokers 模式无法保证全局顺序,需要业务层处理。
Q:ActiveMQ 内存占用过高? A:调整 systemUsage/memoryUsage,降低 percentOfJvmHeap(默认 70%);检查消费者是否消费够快;考虑启用消息分页(paging)。