Skip to content

ADMQ ActiveMQ 开发集成手册

版本:1.0


目录

  1. 连接配置
  2. Java JMS 客户端
  3. Spring Boot 集成
  4. Spring Boot + JMS 注解模式
  5. Apache Camel 路由集成
  6. Python 客户端(STOMP)
  7. Node.js 客户端(STOMP)
  8. Go 客户端(AMQP 1.0)
  9. REST API 集成
  10. 延迟消息与定时任务
  11. 事务消息
  12. 消息选择器与过滤
  13. 最佳实践

1. 连接配置

1.1 连接 URL 格式

# OpenWire(默认)
tcp://localhost:61616

# 带故障转移(HA 模式)
failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false

# SSL
ssl://localhost:61617

# AMQP 1.0
amqp://localhost:5672

# STOMP
stomp://localhost:61613

# MQTT
mqtt://localhost:1883

1.2 连接参数

参数默认值说明
maximumReconnectAttempts-1(无限)failover 最大重试次数
reconnectDelay10 ms重连初始延迟
maxReconnectDelay30000 ms重连最大延迟
socket.soTimeout0(无超时)Socket 超时
wireFormat.maxInactivityDuration30000 ms心跳超时

2. Java JMS 客户端

2.1 Maven 依赖

xml
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.19.4</version>
</dependency>

2.2 Queue 生产者

java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class QueueProducer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory(
            "admin", "admin",
            "tcp://localhost:61616"
        );

        try (Connection connection = factory.createConnection()) {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 创建目的地(Queue)
            Queue queue = session.createQueue("ORDER.QUEUE");
            MessageProducer producer = session.createProducer(queue);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);  // 持久化消息
            producer.setTimeToLive(60000);  // TTL: 60秒

            // 发送文本消息
            TextMessage textMessage = session.createTextMessage("Order #12345");
            textMessage.setStringProperty("orderType", "VIP");
            producer.send(textMessage);

            // 发送对象消息
            ObjectMessage objectMessage = session.createObjectMessage(new Order("12345", "VIP"));
            producer.send(objectMessage);

            System.out.println("消息已发送");
        }
    }
}

2.3 Queue 消费者

java
public class QueueConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(
            "admin", "admin", "tcp://localhost:61616"
        );

        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Queue queue = session.createQueue("ORDER.QUEUE");
        MessageConsumer consumer = session.createConsumer(queue);

        // 异步监听
        consumer.setMessageListener(message -> {
            try {
                if (message instanceof TextMessage) {
                    String text = ((TextMessage) message).getText();
                    System.out.println("收到消息: " + text);
                    processOrder(text);
                    message.acknowledge();  // 手动确认(CLIENT_ACKNOWLEDGE 模式)
                }
            } catch (Exception e) {
                System.err.println("处理失败,不确认(将进入重试): " + e.getMessage());
            }
        });

        // 保持运行
        Thread.currentThread().join();
    }
}

2.4 Topic 发布/订阅

java
// 发布者
Topic topic = session.createTopic("ALERTS.TOPIC");
MessageProducer publisher = session.createProducer(topic);
publisher.send(session.createTextMessage("系统告警: 磁盘使用率 > 90%"));

// 持久订阅者(broker 重启后仍能收到离线期间的消息)
connection.setClientID("alert-monitor-1");  // 客户端 ID 必须唯一
Topic topic = session.createTopic("ALERTS.TOPIC");
MessageConsumer durableSubscriber = session.createDurableSubscriber(topic, "alert-subscription");

// 非持久订阅(只收实时消息)
MessageConsumer subscriber = session.createConsumer(topic);

3. Spring Boot 集成

3.1 依赖配置

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 连接池 -->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>

3.2 application.yml

yaml
spring:
  activemq:
    broker-url: failover:(tcp://localhost:61616)?randomize=false
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 50
      idle-timeout: 30000
  jms:
    pub-sub-domain: false   # false = Queue 模式,true = Topic 模式
    template:
      delivery-mode: persistent
      time-to-live: 60000
      receive-timeout: 5000

3.3 JMS 配置类

java
@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(
            ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(false);  // Queue 模式
        factory.setConcurrency("3-10");  // 3~10 个消费者线程
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(
            ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);  // Topic 模式
        factory.setSubscriptionDurable(true);  // 持久订阅
        factory.setClientId("my-service-subscriber");
        return factory;
    }
}

4. Spring Boot + JMS 注解模式

4.1 发送消息

java
@Service
@RequiredArgsConstructor
public class OrderService {

    private final JmsTemplate jmsTemplate;

    public void createOrder(Order order) {
        // 发送到 Queue
        jmsTemplate.convertAndSend("ORDER.QUEUE", order, message -> {
            message.setStringProperty("orderType", order.getType());
            message.setIntProperty("JMSPriority", order.isVip() ? 9 : 4);
            return message;
        });
    }

    public void broadcastAlert(String alert) {
        // 发送到 Topic(需要临时切换 pub-sub-domain)
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend("ALERTS.TOPIC", alert);
    }
}

4.2 接收消息

java
@Component
@Slf4j
public class OrderListener {

    @JmsListener(
        destination = "ORDER.QUEUE",
        containerFactory = "queueListenerFactory"
    )
    public void processOrder(Order order, Message rawMessage) throws JMSException {
        String orderType = rawMessage.getStringProperty("orderType");
        log.info("处理订单: {} type={}", order.getId(), orderType);

        try {
            orderService.process(order);
            rawMessage.acknowledge();
        } catch (Exception e) {
            log.error("处理失败: {}", e.getMessage());
            // 不确认 → 消息重新投递,最终进入 DLQ
            throw new RuntimeException(e);
        }
    }

    @JmsListener(
        destination = "ALERTS.TOPIC",
        containerFactory = "topicListenerFactory",
        subscription = "alert-subscription-order-service"
    )
    public void handleAlert(String alert) {
        log.warn("收到告警: {}", alert);
    }
}

5. Apache Camel 路由集成

ADMQ ActiveMQ 内置 Camel 2.25.4,可直接用于消息路由和转换。

5.1 依赖

xml
<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
    <version>3.21.0</version>
</dependency>
<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-activemq-starter</artifactId>
    <version>3.21.0</version>
</dependency>

5.2 基础路由示例

java
@Component
public class OrderRoutes extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        // 错误处理策略
        errorHandler(deadLetterChannel("activemq:queue:ORDER.DLQ")
            .maximumRedeliveries(3)
            .redeliveryDelay(1000)
            .backOffMultiplier(2));

        // 路由1:消息分发(基于属性路由)
        from("activemq:queue:ORDER.INBOX")
            .choice()
                .when(header("orderType").isEqualTo("VIP"))
                    .to("activemq:queue:ORDER.VIP.QUEUE")
                .when(header("orderType").isEqualTo("EXPRESS"))
                    .to("activemq:queue:ORDER.EXPRESS.QUEUE")
                .otherwise()
                    .to("activemq:queue:ORDER.NORMAL.QUEUE")
            .end();

        // 路由2:消息转换(ActiveMQ → 数据库 + HTTP)
        from("activemq:queue:ORDER.NORMAL.QUEUE")
            .unmarshal().json(Order.class)
            .process(exchange -> {
                Order order = exchange.getIn().getBody(Order.class);
                order.setStatus("PROCESSING");
                exchange.getIn().setBody(order);
            })
            .to("jpa:com.example.OrderEntity")                   // 写入数据库
            .marshal().json()
            .to("http://api-gateway/orders?bridgeEndpoint=true"); // 通知下游

        // 路由3:定时汇总
        from("timer:order-summary?period=60000")
            .to("sql:SELECT COUNT(*) FROM orders WHERE created_at > :?since")
            .log("最近1分钟新增订单: ${body}");
    }
}

5.3 消息聚合(Aggregator 模式)

java
from("activemq:queue:ORDER.ITEMS")
    .aggregate(header("orderId"), new GroupedBodyAggregationStrategy())
    .completionSize(10)           // 聚合10条后发出
    .completionTimeout(5000)      // 或等待5秒
    .to("activemq:queue:ORDER.BATCH");

5.4 拆分(Splitter 模式)

java
from("activemq:queue:BULK.ORDERS")
    .unmarshal().json(List.class)
    .split(body())
        .parallelProcessing(true)
        .to("activemq:queue:ORDER.QUEUE");

6. Python 客户端(STOMP)

bash
pip install stomp.py
python
import stomp
import json
import time

class OrderListener(stomp.ConnectionListener):
    def on_message(self, frame):
        headers = frame.headers
        body = json.loads(frame.body)
        print(f"收到消息: destination={headers['destination']} body={body}")
        # 手动确认
        self.conn.ack(headers['message-id'], headers['subscription'])

    def on_error(self, frame):
        print(f"错误: {frame.body}")

conn = stomp.Connection([('localhost', 61613)])
listener = OrderListener()
listener.conn = conn
conn.set_listener('', listener)

conn.connect('admin', 'admin', wait=True)

# 订阅
conn.subscribe(destination='/queue/ORDER.QUEUE', id=1, ack='client')

# 发送
conn.send(
    destination='/queue/ORDER.QUEUE',
    body=json.dumps({'orderId': '12345', 'amount': 199.99}),
    headers={'orderType': 'VIP'}
)

time.sleep(10)
conn.disconnect()

7. Node.js 客户端(STOMP)

bash
npm install @stomp/stompjs ws
javascript
const { Client } = require('@stomp/stompjs');
const WebSocket = require('ws');

const client = new Client({
  brokerURL: 'ws://localhost:61614/stomp',  // WebSocket STOMP (需在 activemq.xml 启用)
  connectHeaders: {
    login: 'admin',
    passcode: 'admin',
  },
  reconnectDelay: 5000,
});

client.onConnect = () => {
  console.log('已连接 ADMQ ActiveMQ');

  // 订阅队列
  client.subscribe('/queue/ORDER.QUEUE', (message) => {
    const body = JSON.parse(message.body);
    console.log('收到订单:', body);
    message.ack();  // 手动确认
  }, { ack: 'client' });

  // 发送消息
  client.publish({
    destination: '/queue/ORDER.QUEUE',
    body: JSON.stringify({ orderId: '12345', amount: 99.99 }),
    headers: { orderType: 'VIP' },
  });
};

client.activate();

8. Go 客户端(AMQP 1.0)

bash
go get github.com/Azure/go-amqp@latest
go
package main

import (
    "context"
    "fmt"
    amqp "github.com/Azure/go-amqp"
)

func main() {
    ctx := context.Background()

    // 连接 ADMQ ActiveMQ(AMQP 1.0,需启用 amqp 协议)
    conn, err := amqp.Dial(ctx, "amqp://localhost:5672", &amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain("admin", "admin"),
    })
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    session, err := conn.NewSession(ctx, nil)
    if err != nil {
        panic(err)
    }

    // 发送消息
    sender, err := session.NewSender(ctx, "queue://ORDER.QUEUE", nil)
    if err != nil {
        panic(err)
    }

    err = sender.Send(ctx, amqp.NewMessage([]byte(`{"orderId":"12345"}`)), nil)
    if err != nil {
        panic(err)
    }
    fmt.Println("消息已发送")
    sender.Close(ctx)

    // 接收消息
    receiver, err := session.NewReceiver(ctx, "queue://ORDER.QUEUE", nil)
    if err != nil {
        panic(err)
    }

    msg, err := receiver.Receive(ctx, nil)
    if err != nil {
        panic(err)
    }
    fmt.Printf("收到消息: %s\n", msg.GetData())
    receiver.AcceptMessage(ctx, msg)
}

9. REST API 集成

ActiveMQ 通过 Jolokia 暴露 JMX 为 REST API,通过 REST Message API 支持 HTTP 收发消息。

9.1 发送消息

bash
# 发送到 Queue
curl -X POST http://localhost:8161/api/message/ORDER.QUEUE?type=queue \
  -H "Content-Type: application/json" \
  -u admin:admin \
  -d 'Order body text'

# 带自定义属性
curl -X POST "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&property.orderType=VIP" \
  -u admin:admin \
  -d '{"orderId":"12345"}'

9.2 消费消息

bash
# 拉取一条消息
curl "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&clientId=rest-client" \
  -u admin:admin

# 订阅并持续拉取(长轮询)
curl "http://localhost:8161/api/message/ORDER.QUEUE?type=queue&timeout=10000" \
  -u admin:admin

9.3 队列统计(Jolokia)

bash
# 队列积压量
curl "http://localhost:8161/api/jolokia/read/org.apache.activemq:\
type=Broker,brokerName=localhost,destinationType=Queue,\
destinationName=ORDER.QUEUE/QueueSize" \
  -u admin:admin

# 响应示例
{
  "request": {...},
  "value": 42,
  "status": 200
}

10. 延迟消息与定时任务

java
// 需要在 activemq.xml 中启用:<broker schedulerSupport="true">

// 延迟 30 分钟后发送
TextMessage msg = session.createTextMessage("30分钟后提醒");
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 30 * 60 * 1000L);
producer.send(msg);

// 定时重复发送(每天 9:00 AM)
TextMessage dailyMsg = session.createTextMessage("日报");
msg.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 9 * * ?");
producer.send(dailyMsg);

// 指定次数的定时消息(每5分钟,重复10次)
TextMessage periodic = session.createTextMessage("周期消息");
periodic.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
periodic.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 60 * 1000L);
periodic.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 10);
producer.send(periodic);

11. 事务消息

11.1 JMS 本地事务

java
// 创建事务 Session
Session txSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = txSession.createProducer(txSession.createQueue("ORDER.QUEUE"));

try {
    // 批量发送
    for (Order order : orders) {
        TextMessage msg = txSession.createTextMessage(toJson(order));
        producer.send(msg);
    }

    // 同时更新数据库
    dbService.batchUpdateOrders(orders);

    // 提交事务(JMS + DB 分别提交,非原子)
    txSession.commit();
} catch (Exception e) {
    txSession.rollback();
    throw e;
}

11.2 XA 分布式事务(JMS + DB 原子提交)

java
// 使用 XA ConnectionFactory(需要 XA 驱动)
XAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
XAConnection xaConn = xaFactory.createXAConnection("admin", "admin");
XASession xaSession = xaConn.createXASession();
XAResource jmsXA = xaSession.getXAResource();

// 由事务管理器(如 Atomikos)协调 JMS XA + JDBC XA
// Spring 中配合 @Transactional + JTA 使用

12. 消息选择器与过滤

java
// 只消费 orderType='VIP' 且 amount>100 的消息
String selector = "orderType = 'VIP' AND amount > 100";
MessageConsumer consumer = session.createConsumer(queue, selector);

// 发送带属性的消息
TextMessage msg = session.createTextMessage("VIP大额订单");
msg.setStringProperty("orderType", "VIP");
msg.setDoubleProperty("amount", 299.99);
producer.send(msg);

消息选择器语法遵循 SQL 92 的子集,支持:

  • 比较运算符:=, <>, <, >, <=, >=
  • 逻辑运算符:AND, OR, NOT
  • 范围:BETWEEN ... AND ...
  • 列表:IN (...)
  • 模式:LIKE 'pattern%'
  • 空值:IS NULL, IS NOT NULL

13. 最佳实践

13.1 连接管理

java
// 生产环境必须使用连接池
@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "admin", "admin",
        "failover:(tcp://broker1:61616,tcp://broker2:61616)?randomize=false"
    );

    JmsPoolConnectionFactory pool = new JmsPoolConnectionFactory();
    pool.setConnectionFactory(factory);
    pool.setMaxConnections(10);        // 连接数(不是 Session 数)
    pool.setMaxSessionsPerConnection(100);
    pool.setIdleTimeout(30000);
    return pool;
}

13.2 消息大小

推荐:< 1 MB 每条消息
最大:102.4 MB(wireFormat.maxFrameSize 配置)
大文件:在消息中传递文件引用(URL/路径),不直接传文件内容

13.3 确认模式选择

确认模式性能可靠性适用场景
AUTO_ACKNOWLEDGE低(可能丢失)日志、统计等可丢失场景
CLIENT_ACKNOWLEDGE大多数业务场景
DUPS_OK_ACKNOWLEDGE最高最低允许重复,只求性能
SESSION_TRANSACTED最高金融、订单等关键业务

13.4 消费者幂等性

java
// ActiveMQ 消息可能重复投递(At-least-once),消费者必须幂等
@JmsListener(destination = "ORDER.QUEUE")
public void processOrder(Order order, Message msg) throws JMSException {
    String messageId = msg.getJMSMessageID();

    // 幂等检查(Redis 或 数据库 unique key)
    if (processedMessages.contains(messageId)) {
        log.info("重复消息,跳过: {}", messageId);
        msg.acknowledge();
        return;
    }

    // 处理业务
    orderService.process(order);
    processedMessages.add(messageId);
    msg.acknowledge();
}

金蝶天燕(Apusic)企业级消息中间件套件