Skip to content

ADMQ ActiveMQ 迁移手册

版本:1.0 | 涵盖从主流消息系统迁移至 ADMQ ActiveMQ 的完整指南


目录

  1. 迁移总览
  2. 从 IBM MQ 迁移
  3. 从 WebSphere MQ / Liberty Messaging 迁移
  4. 从 RabbitMQ 迁移
  5. 从 Kafka 迁移(降级场景)
  6. 从其他 ActiveMQ 版本迁移
  7. 从 Apusic AS 内置 JMS 迁移
  8. 数据迁移工具与方案
  9. 迁移验证
  10. 回滚方案

1. 迁移总览

1.1 ADMQ ActiveMQ 的定位

ADMQ ActiveMQ 最适合以下迁移来源:

来源系统迁移难度主要工作量
IBM MQJMS API 兼容,主要是配置和部署变化
WebSphere MQ同上
自建 ActiveMQ配置文件基本兼容
Spring ActiveMQ(老版本)极低仅需更换 broker-url
RabbitMQ(JMS 场景)中高路由模型差异
Kafka(降级为传统 MQ 场景)架构差异大

1.2 概念映射表

来源系统ADMQ ActiveMQ 概念
IBM MQ Queue ManagerBroker
IBM MQ ChannelTransportConnector
IBM MQ QueueQueue
IBM MQ TopicTopic
IBM MQ Message GroupMessage Selector + 消费者组
RabbitMQ Exchange无直接对应(应用层路由 / Camel)
RabbitMQ QueueQueue
RabbitMQ BindingCamel 路由规则
Kafka TopicTopic(语义不同:消费后不删除)
Kafka ConsumerGroup同一 Queue 多消费者

2. 从 IBM MQ 迁移

2.1 连接配置迁移

IBM MQ 原配置(JMS):

java
MQConnectionFactory factory = new MQConnectionFactory();
factory.setHostName("ibmmq-host");
factory.setPort(1414);
factory.setQueueManager("QMGR01");
factory.setChannel("JAVA.CLIENT.CHANNEL");
factory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);

迁移至 ADMQ ActiveMQ:

java
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
    "admin", "admin",
    "tcp://admq-activemq-host:61616"
);
// 或使用 failover
// "failover:(tcp://host1:61616,tcp://host2:61616)?randomize=false"

2.2 JNDI 配置迁移

IBM MQ 通常通过 JNDI 查找连接工厂和目的地。

原 JNDI 配置(jndi.properties):

properties
java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory
java.naming.provider.url=file:///var/mqm/jndi

迁移至 ActiveMQ JNDI(jndi.properties):

properties
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=admin
java.naming.security.credentials=admin

# 目的地定义
queue.OrderQueue=ORDER.QUEUE
topic.AlertTopic=ALERTS.TOPIC

Java 代码(JNDI 查找,无需改动):

java
Context ctx = new InitialContext();  // 自动读取 jndi.properties
ConnectionFactory cf = (ConnectionFactory) ctx.lookup("ConnectionFactory");
Queue queue = (Queue) ctx.lookup("OrderQueue");
// 其余代码完全不变

2.3 IBM MQ 特性迁移

IBM MQ 特性ADMQ ActiveMQ 替代方案
Persistent MessageDeliveryMode.PERSISTENT(默认)
Message Prioritymsg.setJMSPriority(9)
Message Expiryproducer.setTimeToLive(ms)
Selectorsession.createConsumer(dest, selector)
Message Groupsmsg.setStringProperty("JMSXGroupID", "group1")
Browse-only(浏览不消费)QueueBrowser browser = session.createBrowser(queue)
Dead Letter Queue配置 individualDeadLetterStrategy
Backout Queue等同 DLQ,配置 maxRedeliveries

2.4 MQ 消息格式转换

IBM MQ 使用 MQMessage 格式,需转换为 JMS TextMessage/BytesMessage:

java
// 原 IBM MQ 代码
MQMessage mqMsg = new MQMessage();
mqMsg.format = MQC.MQFMT_STRING;
mqMsg.writeString("Hello IBM MQ");
queue.put(mqMsg);

// 迁移后 JMS 代码
TextMessage jmsMsg = session.createTextMessage("Hello ActiveMQ");
jmsMsg.setStringProperty("OriginalFormat", "STRING");
producer.send(jmsMsg);

3. 从 WebSphere MQ / Liberty Messaging 迁移

3.1 JEE 应用中的资源定义迁移

原 WebSphere 配置(server.xml):

xml
<jmsConnectionFactory id="jmsQCF" jndiName="jms/myQCF">
    <properties.wasJms remoteServerAddress="localhost:7276:BootstrapBasicMessaging"/>
</jmsConnectionFactory>

<jmsQueue id="myQueue" jndiName="jms/myQueue">
    <properties.wasJms queueName="Default.Queue"/>
</jmsQueue>

迁移至 ADMQ ActiveMQ(Spring Boot):

yaml
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
java
// 原 @Resource 注入方式不变(如果使用 JNDI)
@Resource(lookup = "jms/myQCF")
private ConnectionFactory connectionFactory;

@Resource(lookup = "jms/myQueue")
private Queue myQueue;

3.2 MDB(Message-Driven Bean)迁移

原 MDB(JEE):

java
@MessageDriven(
    activationConfig = {
        @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
        @ActivationConfigProperty(propertyName="destination", propertyValue="ORDER.QUEUE")
    }
)
public class OrderMDB implements MessageListener {
    @Override
    public void onMessage(Message message) {
        // 处理逻辑
    }
}

迁移至 Spring @JmsListener:

java
@Component
public class OrderListener {
    @JmsListener(destination = "ORDER.QUEUE")
    public void onMessage(Message message) {
        // 处理逻辑(完全相同)
    }
}

4. 从 RabbitMQ 迁移

4.1 消息模型差异

RabbitMQ:
Producer → Exchange(Direct/Fanout/Topic/Headers) → Queue → Consumer

ADMQ ActiveMQ:
Producer → Queue/Topic → Consumer
(路由逻辑由 Camel 或应用层实现)

4.2 Exchange 路由迁移至 Camel

RabbitMQ Direct Exchange(精确路由):

java
// RabbitMQ
channel.basicPublish("order-exchange", "order.vip", null, body);
// binding: routingKey="order.vip" → queue="VIP.QUEUE"

ADMQ ActiveMQ + Camel 等价:

java
// 生产者直接发到路由队列
template.send("activemq:queue:ORDER.INBOX",
    exchange -> {
        exchange.getIn().setBody(body);
        exchange.getIn().setHeader("routingKey", "order.vip");
    });

// Camel 路由(替代 Exchange Binding)
from("activemq:queue:ORDER.INBOX")
    .choice()
        .when(header("routingKey").isEqualTo("order.vip"))
            .to("activemq:queue:VIP.QUEUE")
        .when(header("routingKey").startsWith("order."))
            .to("activemq:queue:ORDER.NORMAL.QUEUE")
    .end();

RabbitMQ Fanout Exchange(广播):

java
// RabbitMQ:一个 exchange 绑定多个 queue
// ADMQ ActiveMQ:使用 Topic(每个消费者组创建独立订阅)
java
// 生产者:发到 Topic
MessageProducer producer = session.createProducer(session.createTopic("NOTIFICATIONS"));
producer.send(session.createTextMessage(notification));

// 多个消费者服务:各自订阅(等价 Fanout)
// email-service:
MessageConsumer emailConsumer = session.createConsumer(
    session.createTopic("NOTIFICATIONS"));

// sms-service:
MessageConsumer smsConsumer = session.createConsumer(
    session.createTopic("NOTIFICATIONS"));

4.3 RabbitMQ 消费者 ACK 迁移

java
// RabbitMQ
channel.basicConsume(queueName, false,  // autoAck=false
    (consumerTag, delivery) -> {
        processMessage(delivery.getBody());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    },
    consumerTag -> {});

// ADMQ ActiveMQ(CLIENT_ACKNOWLEDGE)
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
    try {
        processMessage(((BytesMessage) message).readBytes(...));
        message.acknowledge();  // 等价 basicAck
    } catch (Exception e) {
        // 不 acknowledge → 消息重新投递(等价 basicNack/basicReject)
    }
});

5. 从 Kafka 迁移(降级场景)

5.1 适用场景

从 Kafka 迁移到 ActiveMQ 通常是降级,适用于:

  • 消息量小,不需要高吞吐
  • 需要传统 JMS API 兼容(如遗留 Java EE 应用)
  • 需要延迟消息、消息优先级等 ActiveMQ 特性
  • 运维团队更熟悉 ActiveMQ

5.2 主要差异

特性KafkaADMQ ActiveMQ
消息保留按时间/大小保留(可重放)消费后删除
消费者组分区均分,独立 offset竞争消费,无分区概念
消息回放
吞吐量极高(百万 TPS)中等(万级 TPS)
延迟消息❌(需外部)✅(内置 Scheduler)
消息优先级✅(0-9 级)

5.3 生产者迁移

java
// Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("ORDER.TOPIC", orderId, orderJson));

// 迁移至 ActiveMQ
MessageProducer producer = session.createProducer(session.createQueue("ORDER.QUEUE"));
TextMessage msg = session.createTextMessage(orderJson);
msg.setStringProperty("orderId", orderId);
producer.send(msg);

5.4 消费者组迁移

java
// Kafka:ConsumerGroup 自动负载均衡
consumer.subscribe(Collections.singletonList("ORDER.TOPIC"));

// ActiveMQ:同一 Queue 多个消费者实例 = 竞争消费(等价 Kafka ConsumerGroup)
// 只需启动多个 MessageConsumer 连接同一 Queue 即可
MessageConsumer consumer1 = session1.createConsumer(orderQueue);
MessageConsumer consumer2 = session2.createConsumer(orderQueue);
// broker 自动在多个消费者间分发消息

6. 从其他 ActiveMQ 版本迁移

6.1 从 ActiveMQ 5.x 旧版本升级

ADMQ ActiveMQ 基于 5.19.4,与 5.x 系列配置基本兼容。

主要变化:

变化点旧版本5.19.4
日志框架log4j 1.xlog4j2
MQTT 实现独立插件内置
AMQP 支持需要额外 JAR内置
Web 控制台Jetty 8/9Jetty 9.4+

数据目录直接兼容:

bash
# KahaDB 格式向后兼容
cp -r /old-activemq/data/kahadb/ /var/lib/admq-activemq/kahadb/

6.2 从 ActiveMQ Artemis 迁移

ActiveMQ Artemis(下一代 ActiveMQ)与 ActiveMQ 5.x 配置格式差异较大:

方面ActiveMQ 5.x(ADMQ)ActiveMQ Artemis
配置文件Spring XML activemq.xmlArtemis XML broker.xml
持久化KahaDB / JDBCJournal(AOF 格式)
高可用Master-Slave / NetworkLive-Backup Pair
地址模型Queue/TopicAddress + Queue(更灵活)

建议:Artemis 到 5.x 的迁移比较少见,通常是反向(5.x → Artemis 升级)。如有此需求请联系 Apusic 技术支持。


7. 从 Apusic AS 内置 JMS 迁移

Apusic Application Server 内置了一个轻量级 JMS 实现。迁移到 ADMQ ActiveMQ 主要变化:

7.1 连接工厂 JNDI 名称变更

原:jms/QueueConnectionFactory(Apusic AS 内置)
新:配置 ActiveMQ JNDI,或直接使用 ActiveMQConnectionFactory

7.2 @Resource 注入无需改动

java
// 原代码(JEE 标准,不依赖具体实现)
@Resource(mappedName = "jms/myQCF")
private ConnectionFactory connectionFactory;

// 在 Apusic AS 的资源配置中,将 jms/myQCF 指向 ADMQ ActiveMQ
// 应用代码无需修改

8. 数据迁移工具与方案

8.1 Apache Camel 作为迁移桥接

Camel 可以同时连接两个 Broker,实现消息平滑迁移:

java
// 迁移桥:从旧 IBM MQ 抽取消息,写入 ADMQ ActiveMQ
@Component
public class MigrationBridge extends RouteBuilder {

    @Override
    public void configure() {
        // IBM MQ 连接
        from("wmq:queue:OLD.ORDER.QUEUE?targetClient=1")
            .log("迁移消息: ${body}")
            .to("activemq:queue:ORDER.QUEUE");  // ADMQ ActiveMQ

        // 监控迁移进度
        from("timer:migration-stats?period=60000")
            .to("wmq:queue:OLD.ORDER.QUEUE?operation=browse")
            .log("剩余未迁移消息数: ${body.size()}");
    }
}

8.2 ActiveMQ 命令行迁移工具

bash
# 从远程 ActiveMQ 复制队列消息
./activemq/bin/activemq browse \
  --amqurl tcp://old-activemq:61616 \
  --queue ORDER.QUEUE > messages.txt

# 重新导入(简单文本格式)
./activemq/bin/activemq producer \
  --amqurl tcp://admq-activemq:61616 \
  --destination queue://ORDER.QUEUE \
  --file messages.txt

8.3 双写迁移方案

java
@Service
public class DualWriteOrderService {

    private final JmsTemplate admqTemplate;
    private final JmsTemplate legacyTemplate;  // 原系统

    private volatile boolean migrationMode = true;  // 可动态切换

    public void send(String queue, Object message) {
        admqTemplate.convertAndSend(queue, message);

        if (migrationMode) {
            // 同时写入旧系统(确保旧系统消费者不漏消息)
            try {
                legacyTemplate.convertAndSend(queue, message);
            } catch (Exception e) {
                log.warn("旧系统写入失败(可接受): {}", e.getMessage());
            }
        }
    }
}

9. 迁移验证

9.1 功能验证清单

✅ 基础消息收发(Queue)
✅ 发布/订阅(Topic)
✅ 持久订阅(Durable Subscription)
✅ 消息优先级
✅ 消息选择器(Selector)
✅ 延迟消息(Scheduler)
✅ 死信队列(DLQ)
✅ 事务消息
✅ SSL/TLS 连接
✅ Web 控制台访问
✅ JMX 监控
✅ failover 重连
✅ 高可用切换(如有 Master-Slave)

9.2 消息数量验证

bash
# 通过 JMX REST API 查看 Queue 积压
curl "http://localhost:8161/api/jolokia/read/\
org.apache.activemq:type=Broker,brokerName=localhost,\
destinationType=Queue,destinationName=ORDER.QUEUE/QueueSize" \
  -u admin:admin

9.3 性能基准测试

bash
# 生产者性能
./activemq/bin/activemq producer \
  --messageSize 1024 \
  --messageCount 10000 \
  --parallelThreads 10 \
  --destination queue://PERF.QUEUE

# 消费者性能
./activemq/bin/activemq consumer \
  --messageCount 10000 \
  --parallelThreads 10 \
  --destination queue://PERF.QUEUE

10. 回滚方案

10.1 快速回滚(< 5 分钟)

1. 将负载均衡/服务发现指向旧系统
2. 等待 ADMQ ActiveMQ 队列中的消息消费完(或手动迁回)
3. 停止向 ADMQ ActiveMQ 写入
4. 旧系统恢复提供服务

10.2 数据回迁(如有积压)

java
// Camel 反向迁移:ADMQ → 旧系统
from("activemq:queue:ORDER.QUEUE")
    .to("oldSystem:queue:ORDER.QUEUE");

10.3 灰度切换建议

Week 1:10% 流量 → ADMQ ActiveMQ,观察错误率和延迟
Week 2:50% 流量 → ADMQ ActiveMQ
Week 3:100% 流量 → ADMQ ActiveMQ,保留旧系统热备
Week 4:下线旧系统

金蝶天燕(Apusic)企业级消息中间件套件