Skip to content

ADMQ ActiveMQ 用户手册

版本:1.0


目录

  1. 产品概述
  2. 系统要求
  3. 安装与部署
  4. 配置说明
  5. 启动与停止
  6. 目的地管理(Queue / Topic)
  7. 用户与权限管理
  8. 消息持久化
  9. 集群与高可用
  10. 协议支持
  11. 监控与运维
  12. 安全配置
  13. 常见问题

1. 产品概述

ADMQ ActiveMQ 是 Apusic 推出的企业级 JMS 消息中间件,集成了 Apusic 统一授权体系和监控上报能力,适合以下场景:

  • 企业应用集成(EAI):系统间消息传递,替代点对点 API 调用
  • JMS 应用迁移:无缝承接 Java EE / Jakarta EE 应用的 JMS 需求
  • Apache Camel 集成:利用 Camel 实现复杂路由和 ETL
  • 传统企业 ESB:作为轻量 ESB 的消息核心

核心能力

能力说明
JMS 1.1/2.0完整的 Java 消息服务规范实现
Queue(点对点)消息在队列中积累,至少一个消费者处理
Topic(发布/订阅)一对多广播,支持持久订阅
多协议支持OpenWire(默认)、AMQP 1.0、STOMP、MQTT 3.1.1
KahaDB 持久化默认高性能文件数据库
JDBC 持久化支持 MySQL/PostgreSQL/Oracle 等关系数据库
消息调度器延迟消息、定时消息、循环消息
死信队列自动捕获处理失败的消息
Apache Camel内置 Camel 2.25.4 路由引擎

端口一览

端口协议用途默认状态
61616TCPOpenWire(默认JMS协议)启用
8161HTTPWeb 管理控制台 / REST API启用
7072TCPJMX 监控端口启用
5672TCPAMQP 1.0禁用(可启用)
61613TCPSTOMP禁用(可启用)
1883TCPMQTT 3.1.1禁用(可启用)

2. 系统要求

硬件建议

配置项最低推荐(生产)
CPU2 核8 核+
内存2 GB16 GB+
磁盘20 GB500 GB SSD
网络100 Mbps1 Gbps

软件要求

  • 操作系统:Linux x86_64 / ARM64(CentOS 7+, Ubuntu 18.04+)、macOS(提供原生支持)
  • Java:JDK 11+(产品内置,无需手动安装)

注意:ADMQ ActiveMQ 是四款 ADMQ 产品中唯一明确支持 macOS 的(bin/macosx/ 目录存在 macOS 启动脚本)。


3. 安装与部署

3.1 目录结构

admq-activemq/
├── bin/                         # 启动脚本
│   ├── linux/                   # Linux 脚本
│   ├── macosx/                  # macOS 脚本(独有)
│   ├── start.sh                 # ADMQ 统一启动
│   └── stop.sh
├── activemq/                    # ActiveMQ 5.19.4 发行包
│   ├── bin/
│   │   ├── activemq             # Linux 主控脚本
│   │   └── wrapper.jar          # Java Service Wrapper
│   ├── lib/                     # ActiveMQ 核心 JAR
│   │   ├── activemq-all-5.19.4.jar
│   │   ├── apusic-license-admq-2.0.jar    # ← Apusic 注入
│   │   ├── apusic-license-core-2.0.jar    # ← Apusic 注入
│   │   ├── admq-monitor-1.0.0-apusic.jar  # ← Apusic 注入
│   │   └── acls-client-1.0.9.jar          # ← Apusic 注入
│   ├── conf/
│   │   ├── activemq.xml         # ActiveMQ 主配置
│   │   ├── jetty.xml            # Web 控制台配置
│   │   ├── users.properties     # 用户定义
│   │   ├── groups.properties    # 组定义
│   │   ├── log4j2.properties    # 日志配置
│   │   └── wrapper.conf         # Java Service Wrapper 配置
│   ├── data/                    # KahaDB 数据目录
│   └── webapps/                 # Jetty Web 应用
│       └── admin/               # Web 控制台
├── config/
│   └── activemq-broker.conf     # ADMQ 主配置
├── lib/                         # Apusic 工具 JAR(备份)
├── license/                     # 授权文件目录
└── logs/

3.2 快速安装

bash
# 1. 解压安装包
tar -xzf admq-activemq-1.0.0.tar.gz -C /opt/
cd /opt/admq-activemq

# 2. 放置授权文件
cp your-license.xml license/

# 3. 修改主配置(可选)
vi config/activemq-broker.conf

# 4. 启动
./bin/start.sh

3.3 macOS 安装(开发环境)

bash
# macOS 专用启动
./bin/macosx/start.sh

# 或使用原生 ActiveMQ 脚本
./activemq/bin/macosx/activemq start

4. 配置说明

4.1 ADMQ 主配置 activemq-broker.conf

properties
# ===== 节点配置 =====
node_id=1
node_name=activemq-broker-1
cluster_name=admq-activemq-cluster

# ===== 网络配置 =====
broker_host=0.0.0.0
openwire_port=61616
web_console_port=8161

# ===== 协议开关 =====
amqp_enabled=false
amqp_port=5672
stomp_enabled=false
stomp_port=61613
mqtt_enabled=false
mqtt_port=1883

# ===== 持久化配置 =====
persistence_type=kahadb               # kahadb 或 jdbc
kahadb_dir=/var/lib/admq-activemq/kahadb
# jdbc_url=jdbc:mysql://localhost/activemq
# jdbc_user=activemq
# jdbc_password=activemq

# ===== 监控上报 =====
monitor_role=activemq
monitor_username=__sys__monitor
monitor_password=11111111
manager_addr=http://admq-manager:8080

# ===== JMX =====
jmx_port=7072

4.2 ActiveMQ 核心配置 activemq/conf/activemq.xml

主配置文件采用 Spring XML 格式:

xml
<beans xmlns="http://www.springframework.org/schema/beans" ...>

  <broker xmlns="http://activemq.apache.org/schema/core"
          brokerName="localhost"
          dataDirectory="${activemq.data}">

    <!-- 传输协议配置 -->
    <transportConnectors>
      <!-- OpenWire(默认启用) -->
      <transportConnector name="openwire"
          uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

      <!-- AMQP(默认禁用,取消注释启用) -->
      <!-- <transportConnector name="amqp"
          uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->

      <!-- STOMP(默认禁用) -->
      <!-- <transportConnector name="stomp"
          uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->

      <!-- MQTT(默认禁用) -->
      <!-- <transportConnector name="mqtt"
          uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
    </transportConnectors>

    <!-- KahaDB 持久化 -->
    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

    <!-- 死信队列策略 -->
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">" >
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="1000"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

    <!-- 系统资源限制 -->
    <systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage percentOfJvmHeap="70"/>
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="100 gb"/>
        </storeUsage>
        <tempUsage>
          <tempUsage limit="50 gb"/>
        </tempUsage>
      </systemUsage>
    </systemUsage>

  </broker>

</beans>

4.3 JVM 调优 activemq/conf/wrapper.conf

properties
# JVM 内存配置
wrapper.java.additional.1=-Xms2G
wrapper.java.additional.2=-Xmx4G
wrapper.java.additional.3=-XX:+UseG1GC
wrapper.java.additional.4=-XX:MaxGCPauseMillis=20
wrapper.java.additional.5=-Dactivemq.data=/var/lib/admq-activemq

4.4 Web 控制台配置 jetty.xml

重要 Bugjetty.xml 中硬编码了生产环境 IP 172.20.140.224(Apusic 测试环境地址)。部署时必须修改为实际 IP 或 0.0.0.0

xml
<!-- 需要修改这里的 host 配置 -->
<Set name="host">0.0.0.0</Set>   <!-- 原来是 172.20.140.224 -->
<Set name="port">8161</Set>

5. 启动与停止

bash
# 启动
./bin/start.sh

# 停止
./bin/stop.sh

# 查看状态
./bin/status.sh

# 查看实时日志
tail -f activemq/data/activemq.log

# 使用 activemq 脚本直接控制
./activemq/bin/activemq start
./activemq/bin/activemq stop
./activemq/bin/activemq status
./activemq/bin/activemq console   # 前台运行(调试用)

6. 目的地管理(Queue / Topic)

6.1 通过 Web 控制台管理

访问 http://localhost:8161/admin(默认用户名/密码:admin/admin

  • Queues 页面:查看队列列表、消息数量、消费者数量,支持发送测试消息、清空队列、删除队列
  • Topics 页面:查看主题列表,查看订阅者数量
  • Subscribers 页面:查看持久订阅者状态

6.2 通过命令行管理

ActiveMQ 提供 activemq-admin 工具:

bash
# 查看连接统计
./activemq/bin/activemq-admin query \
  --jmxurl service:jmx:rmi:///jndi/rmi://localhost:7072/jmxrmi

# 发送测试消息
./activemq/bin/activemq producer \
  --destination queue://TEST.QUEUE \
  --message "Hello ActiveMQ" \
  --messageCount 10

# 消费消息
./activemq/bin/activemq consumer \
  --destination queue://TEST.QUEUE

6.3 通过 REST API 管理

ActiveMQ 内置 Jolokia HTTP/JMX 桥接:

bash
# 查看队列统计
curl http://localhost:8161/api/jolokia/read/\
org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,\
destinationName=TEST.QUEUE/QueueSize

# 发送消息
curl -X POST http://localhost:8161/api/message/TEST.QUEUE?type=queue \
  -H "Content-Type: application/json" \
  -u admin:admin \
  -d '{"body":"Hello","headers":{"JMSPriority":4}}'

# 清除队列
curl -X DELETE http://localhost:8161/api/message/TEST.QUEUE?type=queue \
  -u admin:admin

7. 用户与权限管理

7.1 用户定义 conf/users.properties

properties
# 格式:用户名=密码
admin=admin
user=user
publisher=publisher-secret
subscriber=subscriber-secret

7.2 组定义 conf/groups.properties

properties
# 格式:组名=用户1,用户2,...
admins=admin
users=user,publisher,subscriber
publishers=publisher
subscribers=subscriber

7.3 授权配置 activemq.xml

xml
<plugins>
  <simpleAuthenticationPlugin anonymousAccessAllowed="false">
    <users>
      <authenticationUser username="admin" password="admin" groups="admins,users"/>
      <authenticationUser username="publisher" password="pub-pass" groups="publishers,users"/>
      <authenticationUser username="subscriber" password="sub-pass" groups="subscribers,users"/>
    </users>
  </simpleAuthenticationPlugin>

  <authorizationPlugin>
    <map>
      <authorizationMap>
        <authorizationEntries>
          <!-- admin 拥有所有权限 -->
          <authorizationEntry topic=">" read="admins" write="admins" admin="admins"/>
          <authorizationEntry queue=">" read="admins" write="admins" admin="admins"/>

          <!-- publisher 可以写入 ORDER 开头的队列 -->
          <authorizationEntry queue="ORDER.>" read="admins" write="publishers" admin="admins"/>

          <!-- subscriber 可以读取 ORDER 队列 -->
          <authorizationEntry queue="ORDER.>" read="subscribers" write="admins" admin="admins"/>
        </authorizationEntries>
      </authorizationMap>
    </map>
  </authorizationPlugin>
</plugins>

8. 消息持久化

8.1 KahaDB(默认)

KahaDB 是 ActiveMQ 的默认高性能文件数据库,基于 B-Tree 索引:

xml
<persistenceAdapter>
  <kahaDB directory="${activemq.data}/kahadb"
          journalMaxFileLength="32mb"
          enableJournalDiskSyncs="false"
          indexWriteBatchSize="10000"
          indexCacheSize="10000"/>
</persistenceAdapter>

KahaDB 文件结构:

kahadb/
├── db-1.log        # 消息日志(journal),32MB 轮转
├── db-2.log
├── db.data         # B-Tree 索引数据
├── db.redo         # 崩溃恢复日志
└── lock            # 排他锁文件

8.2 JDBC 持久化(MySQL)

xml
<persistenceAdapter>
  <jdbcPersistenceAdapter dataDirectory="${activemq.data}"
                          dataSource="#mysql-ds"/>
</persistenceAdapter>

<!-- 在 beans 段定义数据源 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource"
      destroy-method="close">
  <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
  <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
  <property name="username" value="activemq"/>
  <property name="password" value="activemq"/>
  <property name="maxTotal" value="200"/>
  <property name="poolPreparedStatements" value="true"/>
</bean>

8.3 消息调度器(延迟消息)

xml
<!-- 启用调度器插件 -->
<broker schedulerSupport="true" ...>
java
// Java 代码发送延迟消息
Message msg = session.createTextMessage("延迟10秒发送");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);  // 10秒后投递
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5000);  // 每5秒重复
msg.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);      // 重复3次
producer.send(msg);

8.4 死信队列(DLQ)

默认配置下,处理失败(重新投递 6 次后)的消息进入 ActiveMQ.DLQ

自定义 DLQ 策略:

xml
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <!-- 为每个 Queue 创建独立的 DLQ -->
      <policyEntry queue="ORDER.>" >
        <deadLetterStrategy>
          <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
        </deadLetterStrategy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

9. 集群与高可用

9.1 Master-Slave(KahaDB 共享存储)

xml
<!-- Master 节点配置 -->
<broker brokerName="master" ...>
  <transportConnectors>
    <transportConnector uri="tcp://0.0.0.0:61616"/>
  </transportConnectors>
  <persistenceAdapter>
    <kahaDB directory="/shared/nfs/kahadb"/>  <!-- NFS 共享目录 -->
  </persistenceAdapter>
</broker>

Slave 启动时会尝试获取 KahaDB 锁文件,Master 持有锁则 Slave 等待。Master 宕机后,Slave 自动获取锁成为新 Master。

9.2 Network of Brokers(分布式网格)

适合需要高吞吐的分布式场景:

xml
<!-- Broker 1 配置 -->
<networkConnectors>
  <networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
                   name="bridge"
                   duplex="true"
                   networkTTL="3"
                   dynamicOnly="true"/>
</networkConnectors>

10. 协议支持

10.1 启用 AMQP 1.0

修改 activemq.xml,取消 AMQP 注释:

xml
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?..."/>

AMQP 使用场景:跨语言互操作(Python、.NET、Go 等)

10.2 启用 STOMP

xml
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?..."/>

10.3 启用 MQTT

注意:ActiveMQ 的 MQTT 实现是协议适配器(类似 RabbitMQ MQTT 插件),不是原生 MQTT Broker。IoT 大规模场景建议使用 ADMQ MQTT。

xml
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?..."/>

10.4 多协议统一端点(可选)

xml
<!-- 单端口自动检测协议 -->
<transportConnector name="auto"
    uri="auto://0.0.0.0:61618?wireFormat.allowedFormats=default,STOMP,AMQP,MQTT"/>

11. 监控与运维

11.1 Web 控制台

URL说明
http://localhost:8161/admin主控制台(admin/admin)
http://localhost:8161/api/jolokiaJMX over HTTP(Jolokia)
http://localhost:8161/api/message/{dest}REST 消息 API

11.2 JMX 监控

bash
# 连接 JMX(需要 JDK 中的 jconsole)
jconsole service:jmx:rmi:///jndi/rmi://localhost:7072/jmxrmi

11.3 关键 JMX 指标

MBean属性说明
BrokerTotalMessageCount所有队列消息总数
Queue=ORDER.*QueueSize队列积压量
Queue=ORDER.*ConsumerCount消费者数
Queue=ORDER.*EnqueueCount入队消息数
Queue=ORDER.*DequeueCount出队消息数
BrokerMemoryPercentUsage内存使用百分比
BrokerStorePercentUsage磁盘使用百分比

12. 安全配置

12.1 修改默认密码

properties
# conf/users.properties - 必须修改 admin 密码
admin=YOUR_STRONG_PASSWORD

# conf/jetty-realm.properties - Web 控制台密码
admin: YOUR_STRONG_PASSWORD, admin
user: user, user

12.2 SSL/TLS 加密

xml
<!-- ssl+nio 协议 -->
<transportConnector name="ssl"
    uri="ssl://0.0.0.0:61617?needClientAuth=false"/>
properties
# conf/activemq.xml 中引用 SSL 配置
# 还需在 wrapper.conf 中配置 keystore
wrapper.java.additional.X=-Djavax.net.ssl.keyStore=/etc/activemq/ssl/keystore.jks
wrapper.java.additional.X=-Djavax.net.ssl.keyStorePassword=keystore-pass
wrapper.java.additional.X=-Djavax.net.ssl.trustStore=/etc/activemq/ssl/truststore.jks
wrapper.java.additional.X=-Djavax.net.ssl.trustStorePassword=trust-pass

12.3 Jasypt 密码加密

ActiveMQ 5.19.4 集成 Jasypt 可对配置文件中的密码加密:

bash
# 加密密码
./activemq/bin/activemq encrypt --password masterpassword --input db-password
# 输出: ENC(xxxxx)
xml
<!-- 在 activemq.xml 中使用加密密码 -->
<property name="password" value="ENC(xxxxx)"/>

13. 常见问题

Q:Web 控制台无法访问(连接被拒绝)? A:检查 conf/jetty.xml 中是否存在硬编码 IP 172.20.140.224,修改为 0.0.0.0 或实际 IP。

Q:消息积压但消费者不消费? A:检查消费者是否 Pending Ack(事务未提交),查看 conf/activemq.xml 中的 systemUsage 是否达到内存/磁盘限制(Broker 会自动停止接收新消息)。

Q:KahaDB 日志文件过大? A:调整 journalMaxFileLength,或启用日志压缩(cleanupOnStop="true")。

Q:启动时 License 报错? A:确认 license/ 目录中有有效的 apusic.lic,检查授权文件是否过期,确认系统时间正确。

Q:消息乱序? A:Queue 模式下单消费者保证顺序;多消费者或 Network of Brokers 模式无法保证全局顺序,需要业务层处理。

Q:ActiveMQ 内存占用过高? A:调整 systemUsage/memoryUsage,降低 percentOfJvmHeap(默认 70%);检查消费者是否消费够快;考虑启用消息分页(paging)。

金蝶天燕(Apusic)企业级消息中间件套件