外观
ADMQ ActiveMQ 迁移手册
版本:1.0 | 涵盖从主流消息系统迁移至 ADMQ ActiveMQ 的完整指南
目录
- 迁移总览
- 从 IBM MQ 迁移
- 从 WebSphere MQ / Liberty Messaging 迁移
- 从 RabbitMQ 迁移
- 从 Kafka 迁移(降级场景)
- 从其他 ActiveMQ 版本迁移
- 从 Apusic AS 内置 JMS 迁移
- 数据迁移工具与方案
- 迁移验证
- 回滚方案
1. 迁移总览
1.1 ADMQ ActiveMQ 的定位
ADMQ ActiveMQ 最适合以下迁移来源:
| 来源系统 | 迁移难度 | 主要工作量 |
|---|---|---|
| IBM MQ | 中 | JMS API 兼容,主要是配置和部署变化 |
| WebSphere MQ | 中 | 同上 |
| 自建 ActiveMQ | 低 | 配置文件基本兼容 |
| Spring ActiveMQ(老版本) | 极低 | 仅需更换 broker-url |
| RabbitMQ(JMS 场景) | 中高 | 路由模型差异 |
| Kafka(降级为传统 MQ 场景) | 高 | 架构差异大 |
1.2 概念映射表
| 来源系统 | ADMQ ActiveMQ 概念 |
|---|---|
| IBM MQ Queue Manager | Broker |
| IBM MQ Channel | TransportConnector |
| IBM MQ Queue | Queue |
| IBM MQ Topic | Topic |
| IBM MQ Message Group | Message Selector + 消费者组 |
| RabbitMQ Exchange | 无直接对应(应用层路由 / Camel) |
| RabbitMQ Queue | Queue |
| RabbitMQ Binding | Camel 路由规则 |
| Kafka Topic | Topic(语义不同:消费后不删除) |
| Kafka ConsumerGroup | 同一 Queue 多消费者 |
2. 从 IBM MQ 迁移
2.1 连接配置迁移
IBM MQ 原配置(JMS):
java
MQConnectionFactory factory = new MQConnectionFactory();
factory.setHostName("ibmmq-host");
factory.setPort(1414);
factory.setQueueManager("QMGR01");
factory.setChannel("JAVA.CLIENT.CHANNEL");
factory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);迁移至 ADMQ ActiveMQ:
java
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"admin", "admin",
"tcp://admq-activemq-host:61616"
);
// 或使用 failover
// "failover:(tcp://host1:61616,tcp://host2:61616)?randomize=false"2.2 JNDI 配置迁移
IBM MQ 通常通过 JNDI 查找连接工厂和目的地。
原 JNDI 配置(jndi.properties):
properties
java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
java.naming.provider.url=file:///var/mqm/jndi迁移至 ActiveMQ JNDI(jndi.properties):
properties
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=admin
java.naming.security.credentials=admin
# 目的地定义
queue.OrderQueue=ORDER.QUEUE
topic.AlertTopic=ALERTS.TOPICJava 代码(JNDI 查找,无需改动):
java
Context ctx = new InitialContext(); // 自动读取 jndi.properties
ConnectionFactory cf = (ConnectionFactory) ctx.lookup("ConnectionFactory");
Queue queue = (Queue) ctx.lookup("OrderQueue");
// 其余代码完全不变2.3 IBM MQ 特性迁移
| IBM MQ 特性 | ADMQ ActiveMQ 替代方案 |
|---|---|
| Persistent Message | DeliveryMode.PERSISTENT(默认) |
| Message Priority | msg.setJMSPriority(9) |
| Message Expiry | producer.setTimeToLive(ms) |
| Selector | session.createConsumer(dest, selector) |
| Message Groups | msg.setStringProperty("JMSXGroupID", "group1") |
| Browse-only(浏览不消费) | QueueBrowser browser = session.createBrowser(queue) |
| Dead Letter Queue | 配置 individualDeadLetterStrategy |
| Backout Queue | 等同 DLQ,配置 maxRedeliveries |
2.4 MQ 消息格式转换
IBM MQ 使用 MQMessage 格式,需转换为 JMS TextMessage/BytesMessage:
java
// 原 IBM MQ 代码
MQMessage mqMsg = new MQMessage();
mqMsg.format = MQC.MQFMT_STRING;
mqMsg.writeString("Hello IBM MQ");
queue.put(mqMsg);
// 迁移后 JMS 代码
TextMessage jmsMsg = session.createTextMessage("Hello ActiveMQ");
jmsMsg.setStringProperty("OriginalFormat", "STRING");
producer.send(jmsMsg);3. 从 WebSphere MQ / Liberty Messaging 迁移
3.1 JEE 应用中的资源定义迁移
原 WebSphere 配置(server.xml):
xml
<jmsConnectionFactory id="jmsQCF" jndiName="jms/myQCF">
<properties.wasJms remoteServerAddress="localhost:7276:BootstrapBasicMessaging"/>
</jmsConnectionFactory>
<jmsQueue id="myQueue" jndiName="jms/myQueue">
<properties.wasJms queueName="Default.Queue"/>
</jmsQueue>迁移至 ADMQ ActiveMQ(Spring Boot):
yaml
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: adminjava
// 原 @Resource 注入方式不变(如果使用 JNDI)
@Resource(lookup = "jms/myQCF")
private ConnectionFactory connectionFactory;
@Resource(lookup = "jms/myQueue")
private Queue myQueue;3.2 MDB(Message-Driven Bean)迁移
原 MDB(JEE):
java
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="ORDER.QUEUE")
}
)
public class OrderMDB implements MessageListener {
@Override
public void onMessage(Message message) {
// 处理逻辑
}
}迁移至 Spring @JmsListener:
java
@Component
public class OrderListener {
@JmsListener(destination = "ORDER.QUEUE")
public void onMessage(Message message) {
// 处理逻辑(完全相同)
}
}4. 从 RabbitMQ 迁移
4.1 消息模型差异
RabbitMQ:
Producer → Exchange(Direct/Fanout/Topic/Headers) → Queue → Consumer
ADMQ ActiveMQ:
Producer → Queue/Topic → Consumer
(路由逻辑由 Camel 或应用层实现)4.2 Exchange 路由迁移至 Camel
RabbitMQ Direct Exchange(精确路由):
java
// RabbitMQ
channel.basicPublish("order-exchange", "order.vip", null, body);
// binding: routingKey="order.vip" → queue="VIP.QUEUE"ADMQ ActiveMQ + Camel 等价:
java
// 生产者直接发到路由队列
template.send("activemq:queue:ORDER.INBOX",
exchange -> {
exchange.getIn().setBody(body);
exchange.getIn().setHeader("routingKey", "order.vip");
});
// Camel 路由(替代 Exchange Binding)
from("activemq:queue:ORDER.INBOX")
.choice()
.when(header("routingKey").isEqualTo("order.vip"))
.to("activemq:queue:VIP.QUEUE")
.when(header("routingKey").startsWith("order."))
.to("activemq:queue:ORDER.NORMAL.QUEUE")
.end();RabbitMQ Fanout Exchange(广播):
java
// RabbitMQ:一个 exchange 绑定多个 queue
// ADMQ ActiveMQ:使用 Topic(每个消费者组创建独立订阅)java
// 生产者:发到 Topic
MessageProducer producer = session.createProducer(session.createTopic("NOTIFICATIONS"));
producer.send(session.createTextMessage(notification));
// 多个消费者服务:各自订阅(等价 Fanout)
// email-service:
MessageConsumer emailConsumer = session.createConsumer(
session.createTopic("NOTIFICATIONS"));
// sms-service:
MessageConsumer smsConsumer = session.createConsumer(
session.createTopic("NOTIFICATIONS"));4.3 RabbitMQ 消费者 ACK 迁移
java
// RabbitMQ
channel.basicConsume(queueName, false, // autoAck=false
(consumerTag, delivery) -> {
processMessage(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> {});
// ADMQ ActiveMQ(CLIENT_ACKNOWLEDGE)
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
processMessage(((BytesMessage) message).readBytes(...));
message.acknowledge(); // 等价 basicAck
} catch (Exception e) {
// 不 acknowledge → 消息重新投递(等价 basicNack/basicReject)
}
});5. 从 Kafka 迁移(降级场景)
5.1 适用场景
从 Kafka 迁移到 ActiveMQ 通常是降级,适用于:
- 消息量小,不需要高吞吐
- 需要传统 JMS API 兼容(如遗留 Java EE 应用)
- 需要延迟消息、消息优先级等 ActiveMQ 特性
- 运维团队更熟悉 ActiveMQ
5.2 主要差异
| 特性 | Kafka | ADMQ ActiveMQ |
|---|---|---|
| 消息保留 | 按时间/大小保留(可重放) | 消费后删除 |
| 消费者组 | 分区均分,独立 offset | 竞争消费,无分区概念 |
| 消息回放 | ✅ | ❌ |
| 吞吐量 | 极高(百万 TPS) | 中等(万级 TPS) |
| 延迟消息 | ❌(需外部) | ✅(内置 Scheduler) |
| 消息优先级 | ❌ | ✅(0-9 级) |
5.3 生产者迁移
java
// Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("ORDER.TOPIC", orderId, orderJson));
// 迁移至 ActiveMQ
MessageProducer producer = session.createProducer(session.createQueue("ORDER.QUEUE"));
TextMessage msg = session.createTextMessage(orderJson);
msg.setStringProperty("orderId", orderId);
producer.send(msg);5.4 消费者组迁移
java
// Kafka:ConsumerGroup 自动负载均衡
consumer.subscribe(Collections.singletonList("ORDER.TOPIC"));
// ActiveMQ:同一 Queue 多个消费者实例 = 竞争消费(等价 Kafka ConsumerGroup)
// 只需启动多个 MessageConsumer 连接同一 Queue 即可
MessageConsumer consumer1 = session1.createConsumer(orderQueue);
MessageConsumer consumer2 = session2.createConsumer(orderQueue);
// broker 自动在多个消费者间分发消息6. 从其他 ActiveMQ 版本迁移
6.1 从 ActiveMQ 5.x 旧版本升级
ADMQ ActiveMQ 基于 5.19.4,与 5.x 系列配置基本兼容。
主要变化:
| 变化点 | 旧版本 | 5.19.4 |
|---|---|---|
| 日志框架 | log4j 1.x | log4j2 |
| MQTT 实现 | 独立插件 | 内置 |
| AMQP 支持 | 需要额外 JAR | 内置 |
| Web 控制台 | Jetty 8/9 | Jetty 9.4+ |
数据目录直接兼容:
bash
# KahaDB 格式向后兼容
cp -r /old-activemq/data/kahadb/ /var/lib/admq-activemq/kahadb/6.2 从 ActiveMQ Artemis 迁移
ActiveMQ Artemis(下一代 ActiveMQ)与 ActiveMQ 5.x 配置格式差异较大:
| 方面 | ActiveMQ 5.x(ADMQ) | ActiveMQ Artemis |
|---|---|---|
| 配置文件 | Spring XML activemq.xml | Artemis XML broker.xml |
| 持久化 | KahaDB / JDBC | Journal(AOF 格式) |
| 高可用 | Master-Slave / Network | Live-Backup Pair |
| 地址模型 | Queue/Topic | Address + Queue(更灵活) |
建议:Artemis 到 5.x 的迁移比较少见,通常是反向(5.x → Artemis 升级)。如有此需求请联系 Apusic 技术支持。
7. 从 Apusic AS 内置 JMS 迁移
Apusic Application Server 内置了一个轻量级 JMS 实现。迁移到 ADMQ ActiveMQ 主要变化:
7.1 连接工厂 JNDI 名称变更
原:jms/QueueConnectionFactory(Apusic AS 内置)
新:配置 ActiveMQ JNDI,或直接使用 ActiveMQConnectionFactory7.2 @Resource 注入无需改动
java
// 原代码(JEE 标准,不依赖具体实现)
@Resource(mappedName = "jms/myQCF")
private ConnectionFactory connectionFactory;
// 在 Apusic AS 的资源配置中,将 jms/myQCF 指向 ADMQ ActiveMQ
// 应用代码无需修改8. 数据迁移工具与方案
8.1 Apache Camel 作为迁移桥接
Camel 可以同时连接两个 Broker,实现消息平滑迁移:
java
// 迁移桥:从旧 IBM MQ 抽取消息,写入 ADMQ ActiveMQ
@Component
public class MigrationBridge extends RouteBuilder {
@Override
public void configure() {
// IBM MQ 连接
from("wmq:queue:OLD.ORDER.QUEUE?targetClient=1")
.log("迁移消息: ${body}")
.to("activemq:queue:ORDER.QUEUE"); // ADMQ ActiveMQ
// 监控迁移进度
from("timer:migration-stats?period=60000")
.to("wmq:queue:OLD.ORDER.QUEUE?operation=browse")
.log("剩余未迁移消息数: ${body.size()}");
}
}8.2 ActiveMQ 命令行迁移工具
bash
# 从远程 ActiveMQ 复制队列消息
./activemq/bin/activemq browse \
--amqurl tcp://old-activemq:61616 \
--queue ORDER.QUEUE > messages.txt
# 重新导入(简单文本格式)
./activemq/bin/activemq producer \
--amqurl tcp://admq-activemq:61616 \
--destination queue://ORDER.QUEUE \
--file messages.txt8.3 双写迁移方案
java
@Service
public class DualWriteOrderService {
private final JmsTemplate admqTemplate;
private final JmsTemplate legacyTemplate; // 原系统
private volatile boolean migrationMode = true; // 可动态切换
public void send(String queue, Object message) {
admqTemplate.convertAndSend(queue, message);
if (migrationMode) {
// 同时写入旧系统(确保旧系统消费者不漏消息)
try {
legacyTemplate.convertAndSend(queue, message);
} catch (Exception e) {
log.warn("旧系统写入失败(可接受): {}", e.getMessage());
}
}
}
}9. 迁移验证
9.1 功能验证清单
✅ 基础消息收发(Queue)
✅ 发布/订阅(Topic)
✅ 持久订阅(Durable Subscription)
✅ 消息优先级
✅ 消息选择器(Selector)
✅ 延迟消息(Scheduler)
✅ 死信队列(DLQ)
✅ 事务消息
✅ SSL/TLS 连接
✅ Web 控制台访问
✅ JMX 监控
✅ failover 重连
✅ 高可用切换(如有 Master-Slave)9.2 消息数量验证
bash
# 通过 JMX REST API 查看 Queue 积压
curl "http://localhost:8161/api/jolokia/read/\
org.apache.activemq:type=Broker,brokerName=localhost,\
destinationType=Queue,destinationName=ORDER.QUEUE/QueueSize" \
-u admin:admin9.3 性能基准测试
bash
# 生产者性能
./activemq/bin/activemq producer \
--messageSize 1024 \
--messageCount 10000 \
--parallelThreads 10 \
--destination queue://PERF.QUEUE
# 消费者性能
./activemq/bin/activemq consumer \
--messageCount 10000 \
--parallelThreads 10 \
--destination queue://PERF.QUEUE10. 回滚方案
10.1 快速回滚(< 5 分钟)
1. 将负载均衡/服务发现指向旧系统
2. 等待 ADMQ ActiveMQ 队列中的消息消费完(或手动迁回)
3. 停止向 ADMQ ActiveMQ 写入
4. 旧系统恢复提供服务10.2 数据回迁(如有积压)
java
// Camel 反向迁移:ADMQ → 旧系统
from("activemq:queue:ORDER.QUEUE")
.to("oldSystem:queue:ORDER.QUEUE");10.3 灰度切换建议
Week 1:10% 流量 → ADMQ ActiveMQ,观察错误率和延迟
Week 2:50% 流量 → ADMQ ActiveMQ
Week 3:100% 流量 → ADMQ ActiveMQ,保留旧系统热备
Week 4:下线旧系统