外观
ADMQ ActiveMQ 开发集成手册
版本:1.0
目录
- 连接配置
- Java JMS 客户端
- Spring Boot 集成
- Spring Boot + JMS 注解模式
- Apache Camel 路由集成
- Python 客户端(STOMP)
- Node.js 客户端(STOMP)
- Go 客户端(AMQP 1.0)
- REST API 集成
- 延迟消息与定时任务
- 事务消息
- 消息选择器与过滤
- 最佳实践
1. 连接配置
1.1 连接 URL 格式
# OpenWire(默认)
tcp://localhost:61616
# 带故障转移(HA 模式)
failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false
# SSL
ssl://localhost:61617
# AMQP 1.0
amqp://localhost:5672
# STOMP
stomp://localhost:61613
# MQTT
mqtt://localhost:18831.2 连接参数
| 参数 | 默认值 | 说明 |
|---|---|---|
maximumReconnectAttempts | -1(无限) | failover 最大重试次数 |
reconnectDelay | 10 ms | 重连初始延迟 |
maxReconnectDelay | 30000 ms | 重连最大延迟 |
socket.soTimeout | 0(无超时) | Socket 超时 |
wireFormat.maxInactivityDuration | 30000 ms | 心跳超时 |
2. Java JMS 客户端
2.1 Maven 依赖
xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.19.4</version>
</dependency>2.2 Queue 生产者
java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueProducer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
"admin", "admin",
"tcp://localhost:61616"
);
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(Queue)
Queue queue = session.createQueue("ORDER.QUEUE");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久化消息
producer.setTimeToLive(60000); // TTL: 60秒
// 发送文本消息
TextMessage textMessage = session.createTextMessage("Order #12345");
textMessage.setStringProperty("orderType", "VIP");
producer.send(textMessage);
// 发送对象消息
ObjectMessage objectMessage = session.createObjectMessage(new Order("12345", "VIP"));
producer.send(objectMessage);
System.out.println("消息已发送");
}
}
}2.3 Queue 消费者
java
public class QueueConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(
"admin", "admin", "tcp://localhost:61616"
);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("ORDER.QUEUE");
MessageConsumer consumer = session.createConsumer(queue);
// 异步监听
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("收到消息: " + text);
processOrder(text);
message.acknowledge(); // 手动确认(CLIENT_ACKNOWLEDGE 模式)
}
} catch (Exception e) {
System.err.println("处理失败,不确认(将进入重试): " + e.getMessage());
}
});
// 保持运行
Thread.currentThread().join();
}
}2.4 Topic 发布/订阅
java
// 发布者
Topic topic = session.createTopic("ALERTS.TOPIC");
MessageProducer publisher = session.createProducer(topic);
publisher.send(session.createTextMessage("系统告警: 磁盘使用率 > 90%"));
// 持久订阅者(broker 重启后仍能收到离线期间的消息)
connection.setClientID("alert-monitor-1"); // 客户端 ID 必须唯一
Topic topic = session.createTopic("ALERTS.TOPIC");
MessageConsumer durableSubscriber = session.createDurableSubscriber(topic, "alert-subscription");
// 非持久订阅(只收实时消息)
MessageConsumer subscriber = session.createConsumer(topic);3. Spring Boot 集成
3.1 依赖配置
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>3.2 application.yml
yaml
spring:
activemq:
broker-url: failover:(tcp://localhost:61616)?randomize=false
user: admin
password: admin
pool:
enabled: true
max-connections: 50
idle-timeout: 30000
jms:
pub-sub-domain: false # false = Queue 模式,true = Topic 模式
template:
delivery-mode: persistent
time-to-live: 60000
receive-timeout: 50003.3 JMS 配置类
java
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(false); // Queue 模式
factory.setConcurrency("3-10"); // 3~10 个消费者线程
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true); // Topic 模式
factory.setSubscriptionDurable(true); // 持久订阅
factory.setClientId("my-service-subscriber");
return factory;
}
}4. Spring Boot + JMS 注解模式
4.1 发送消息
java
@Service
@RequiredArgsConstructor
public class OrderService {
private final JmsTemplate jmsTemplate;
public void createOrder(Order order) {
// 发送到 Queue
jmsTemplate.convertAndSend("ORDER.QUEUE", order, message -> {
message.setStringProperty("orderType", order.getType());
message.setIntProperty("JMSPriority", order.isVip() ? 9 : 4);
return message;
});
}
public void broadcastAlert(String alert) {
// 发送到 Topic(需要临时切换 pub-sub-domain)
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend("ALERTS.TOPIC", alert);
}
}4.2 接收消息
java
@Component
@Slf4j
public class OrderListener {
@JmsListener(
destination = "ORDER.QUEUE",
containerFactory = "queueListenerFactory"
)
public void processOrder(Order order, Message rawMessage) throws JMSException {
String orderType = rawMessage.getStringProperty("orderType");
log.info("处理订单: {} type={}", order.getId(), orderType);
try {
orderService.process(order);
rawMessage.acknowledge();
} catch (Exception e) {
log.error("处理失败: {}", e.getMessage());
// 不确认 → 消息重新投递,最终进入 DLQ
throw new RuntimeException(e);
}
}
@JmsListener(
destination = "ALERTS.TOPIC",
containerFactory = "topicListenerFactory",
subscription = "alert-subscription-order-service"
)
public void handleAlert(String alert) {
log.warn("收到告警: {}", alert);
}
}5. Apache Camel 路由集成
ADMQ ActiveMQ 内置 Camel 2.25.4,可直接用于消息路由和转换。
5.1 依赖
xml
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>3.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-activemq-starter</artifactId>
<version>3.21.0</version>
</dependency>5.2 基础路由示例
java
@Component
public class OrderRoutes extends RouteBuilder {
@Override
public void configure() throws Exception {
// 错误处理策略
errorHandler(deadLetterChannel("activemq:queue:ORDER.DLQ")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.backOffMultiplier(2));
// 路由1:消息分发(基于属性路由)
from("activemq:queue:ORDER.INBOX")
.choice()
.when(header("orderType").isEqualTo("VIP"))
.to("activemq:queue:ORDER.VIP.QUEUE")
.when(header("orderType").isEqualTo("EXPRESS"))
.to("activemq:queue:ORDER.EXPRESS.QUEUE")
.otherwise()
.to("activemq:queue:ORDER.NORMAL.QUEUE")
.end();
// 路由2:消息转换(ActiveMQ → 数据库 + HTTP)
from("activemq:queue:ORDER.NORMAL.QUEUE")
.unmarshal().json(Order.class)
.process(exchange -> {
Order order = exchange.getIn().getBody(Order.class);
order.setStatus("PROCESSING");
exchange.getIn().setBody(order);
})
.to("jpa:com.example.OrderEntity") // 写入数据库
.marshal().json()
.to("http://api-gateway/orders?bridgeEndpoint=true"); // 通知下游
// 路由3:定时汇总
from("timer:order-summary?period=60000")
.to("sql:SELECT COUNT(*) FROM orders WHERE created_at > :?since")
.log("最近1分钟新增订单: ${body}");
}
}5.3 消息聚合(Aggregator 模式)
java
from("activemq:queue:ORDER.ITEMS")
.aggregate(header("orderId"), new GroupedBodyAggregationStrategy())
.completionSize(10) // 聚合10条后发出
.completionTimeout(5000) // 或等待5秒
.to("activemq:queue:ORDER.BATCH");5.4 拆分(Splitter 模式)
java
from("activemq:queue:BULK.ORDERS")
.unmarshal().json(List.class)
.split(body())
.parallelProcessing(true)
.to("activemq:queue:ORDER.QUEUE");6. Python 客户端(STOMP)
bash
pip install stomp.pypython
import stomp
import json
import time
class OrderListener(stomp.ConnectionListener):
def on_message(self, frame):
headers = frame.headers
body = json.loads(frame.body)
print(f"收到消息: destination={headers['destination']} body={body}")
# 手动确认
self.conn.ack(headers['message-id'], headers['subscription'])
def on_error(self, frame):
print(f"错误: {frame.body}")
conn = stomp.Connection([('localhost', 61613)])
listener = OrderListener()
listener.conn = conn
conn.set_listener('', listener)
conn.connect('admin', 'admin', wait=True)
# 订阅
conn.subscribe(destination='/queue/ORDER.QUEUE', id=1, ack='client')
# 发送
conn.send(
destination='/queue/ORDER.QUEUE',
body=json.dumps({'orderId': '12345', 'amount': 199.99}),
headers={'orderType': 'VIP'}
)
time.sleep(10)
conn.disconnect()7. Node.js 客户端(STOMP)
bash
npm install @stomp/stompjs wsjavascript
const { Client } = require('@stomp/stompjs');
const WebSocket = require('ws');
const client = new Client({
brokerURL: 'ws://localhost:61614/stomp', // WebSocket STOMP (需在 activemq.xml 启用)
connectHeaders: {
login: 'admin',
passcode: 'admin',
},
reconnectDelay: 5000,
});
client.onConnect = () => {
console.log('已连接 ADMQ ActiveMQ');
// 订阅队列
client.subscribe('/queue/ORDER.QUEUE', (message) => {
const body = JSON.parse(message.body);
console.log('收到订单:', body);
message.ack(); // 手动确认
}, { ack: 'client' });
// 发送消息
client.publish({
destination: '/queue/ORDER.QUEUE',
body: JSON.stringify({ orderId: '12345', amount: 99.99 }),
headers: { orderType: 'VIP' },
});
};
client.activate();8. Go 客户端(AMQP 1.0)
bash
go get github.com/Azure/go-amqp@latestgo
package main
import (
"context"
"fmt"
amqp "github.com/Azure/go-amqp"
)
func main() {
ctx := context.Background()
// 连接 ADMQ ActiveMQ(AMQP 1.0,需启用 amqp 协议)
conn, err := amqp.Dial(ctx, "amqp://localhost:5672", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("admin", "admin"),
})
if err != nil {
panic(err)
}
defer conn.Close()
session, err := conn.NewSession(ctx, nil)
if err != nil {
panic(err)
}
// 发送消息
sender, err := session.NewSender(ctx, "queue://ORDER.QUEUE", nil)
if err != nil {
panic(err)
}
err = sender.Send(ctx, amqp.NewMessage([]byte(`{"orderId":"12345"}`)), nil)
if err != nil {
panic(err)
}
fmt.Println("消息已发送")
sender.Close(ctx)
// 接收消息
receiver, err := session.NewReceiver(ctx, "queue://ORDER.QUEUE", nil)
if err != nil {
panic(err)
}
msg, err := receiver.Receive(ctx, nil)
if err != nil {
panic(err)
}
fmt.Printf("收到消息: %s\n", msg.GetData())
receiver.AcceptMessage(ctx, msg)
}9. REST API 集成
ActiveMQ 通过 Jolokia 暴露 JMX 为 REST API,通过 REST Message API 支持 HTTP 收发消息。
9.1 发送消息
bash
# 发送到 Queue
curl -X POST http://localhost:8161/api/message/ORDER.QUEUE?type=queue \
-H "Content-Type: application/json" \
-u admin:admin \
-d 'Order body text'
# 带自定义属性
curl -X POST "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&property.orderType=VIP" \
-u admin:admin \
-d '{"orderId":"12345"}'9.2 消费消息
bash
# 拉取一条消息
curl "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&clientId=rest-client" \
-u admin:admin
# 订阅并持续拉取(长轮询)
curl "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&timeout=10000" \
-u admin:admin9.3 队列统计(Jolokia)
bash
# 队列积压量
curl "http://localhost:8161/api/jolokia/read/org.apache.activemq:\
type=Broker,brokerName=localhost,destinationType=Queue,\
destinationName=ORDER.QUEUE/QueueSize" \
-u admin:admin
# 响应示例
{
"request": {...},
"value": 42,
"status": 200
}10. 延迟消息与定时任务
java
// 需要在 activemq.xml 中启用:<broker schedulerSupport="true">
// 延迟 30 分钟后发送
TextMessage msg = session.createTextMessage("30分钟后提醒");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 30 * 60 * 1000L);
producer.send(msg);
// 定时重复发送(每天 9:00 AM)
TextMessage dailyMsg = session.createTextMessage("日报");
msg.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 9 * * ?");
producer.send(dailyMsg);
// 指定次数的定时消息(每5分钟,重复10次)
TextMessage periodic = session.createTextMessage("周期消息");
periodic.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
periodic.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 60 * 1000L);
periodic.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 10);
producer.send(periodic);11. 事务消息
11.1 JMS 本地事务
java
// 创建事务 Session
Session txSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = txSession.createProducer(txSession.createQueue("ORDER.QUEUE"));
try {
// 批量发送
for (Order order : orders) {
TextMessage msg = txSession.createTextMessage(toJson(order));
producer.send(msg);
}
// 同时更新数据库
dbService.batchUpdateOrders(orders);
// 提交事务(JMS + DB 分别提交,非原子)
txSession.commit();
} catch (Exception e) {
txSession.rollback();
throw e;
}11.2 XA 分布式事务(JMS + DB 原子提交)
java
// 使用 XA ConnectionFactory(需要 XA 驱动)
XAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
XAConnection xaConn = xaFactory.createXAConnection("admin", "admin");
XASession xaSession = xaConn.createXASession();
XAResource jmsXA = xaSession.getXAResource();
// 由事务管理器(如 Atomikos)协调 JMS XA + JDBC XA
// Spring 中配合 @Transactional + JTA 使用12. 消息选择器与过滤
java
// 只消费 orderType='VIP' 且 amount>100 的消息
String selector = "orderType = 'VIP' AND amount > 100";
MessageConsumer consumer = session.createConsumer(queue, selector);
// 发送带属性的消息
TextMessage msg = session.createTextMessage("VIP大额订单");
msg.setStringProperty("orderType", "VIP");
msg.setDoubleProperty("amount", 299.99);
producer.send(msg);消息选择器语法遵循 SQL 92 的子集,支持:
- 比较运算符:
=,<>,<,>,<=,>= - 逻辑运算符:
AND,OR,NOT - 范围:
BETWEEN ... AND ... - 列表:
IN (...) - 模式:
LIKE 'pattern%' - 空值:
IS NULL,IS NOT NULL
13. 最佳实践
13.1 连接管理
java
// 生产环境必须使用连接池
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"admin", "admin",
"failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false"
);
JmsPoolConnectionFactory pool = new JmsPoolConnectionFactory();
pool.setConnectionFactory(factory);
pool.setMaxConnections(10); // 连接数(不是 Session 数)
pool.setMaxSessionsPerConnection(100);
pool.setIdleTimeout(30000);
return pool;
}13.2 消息大小
推荐:< 1 MB 每条消息
最大:102.4 MB(wireFormat.maxFrameSize 配置)
大文件:在消息中传递文件引用(URL/路径),不直接传文件内容13.3 确认模式选择
| 确认模式 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|
AUTO_ACKNOWLEDGE | 高 | 低(可能丢失) | 日志、统计等可丢失场景 |
CLIENT_ACKNOWLEDGE | 中 | 高 | 大多数业务场景 |
DUPS_OK_ACKNOWLEDGE | 最高 | 最低 | 允许重复,只求性能 |
SESSION_TRANSACTED | 低 | 最高 | 金融、订单等关键业务 |
13.4 消费者幂等性
java
// ActiveMQ 消息可能重复投递(At-least-once),消费者必须幂等
@JmsListener(destination = "ORDER.QUEUE")
public void processOrder(Order order, Message msg) throws JMSException {
String messageId = msg.getJMSMessageID();
// 幂等检查(Redis 或 数据库 unique key)
if (processedMessages.contains(messageId)) {
log.info("重复消息,跳过: {}", messageId);
msg.acknowledge();
return;
}
// 处理业务
orderService.process(order);
processedMessages.add(messageId);
msg.acknowledge();
}