Skip to content

ADMQ RabbitMQ 迁移手册

产品版本: ADMQ RabbitMQ 2.0 日期: 2026-06


目录

  1. 迁移概述
  2. 从原生 RabbitMQ 迁移
  3. 从 ActiveMQ 迁移
  4. 从 Kafka 迁移
  5. 从 RocketMQ 迁移
  6. 从 EMQX 迁移(MQTT 场景)
  7. 用户数据迁移
  8. 消息数据迁移
  9. 配置映射参考
  10. 迁移验证清单

1. 迁移概述

1.1 支持接入的源系统

源系统迁移难度协议兼容性主要差异
RabbitMQ(开源版)★☆☆☆☆ 极低100% 兼容只需修改连接参数
ActiveMQ Classic★★★☆☆ 中等STOMP/AMQP 1.0目的地命名语义不同
ActiveMQ Artemis★★☆☆☆ 较低AMQP 1.0/STOMP接近,需适配 vhost
Kafka★★★★☆ 较高无直接兼容消息模型根本不同
RocketMQ★★★★☆ 较高无直接兼容消息模型不同
EMQX(MQTT 场景)★★☆☆☆ 较低MQTT 完全兼容MQTT 协议一致

1.2 迁移策略选择

推荐:双写过渡 → 验证 → 切换
1. 部署 ADMQ RabbitMQ,与源系统并行运行
2. 应用层同时向两套系统发消息(双写)
3. 消费者先从 ADMQ 消费,验证无误
4. 切换生产者,停止双写
5. 关停源系统

2. 从原生 RabbitMQ 迁移

ADMQ RabbitMQ 完整实现 AMQP 0-9-1 协议,与标准 RabbitMQ 客户端完全兼容,迁移成本极低。

2.1 版本兼容性

源版本迁移路径
3.12.x直接迁移,无需修改代码
3.11.x兼容,配置无变化
3.10.x 及以下建议先升级到 3.12,再迁移

2.2 客户端代码无需修改

只需修改连接参数:

java
// 修改前(指向原生 RabbitMQ)
factory.setHost("old-rabbitmq-server");
factory.setUsername("your_user");
factory.setPassword("your_password");

// 修改后(指向 ADMQ RabbitMQ)
factory.setHost("new-admq-rabbitmq-server");
factory.setUsername("migrated_user");   // 迁移后的用户名
factory.setPassword("new_password");    // 迁移后的密码

2.3 配置迁移

将原有 rabbitmq.conf 内容复制到 config/rabbitmq.conf,注意以下差异:

原生 RabbitMQ 配置ADMQ 配置说明
原有账号需手动重建密码 hash 不可直接迁移
default_user = guestdefault_user = admqADMQ 修改了默认用户
插件启用状态需重新 enableADMQ 内置所有插件,只需启用

2.4 用户迁移

bash
# 在原 RabbitMQ 上导出用户列表
rabbitmqctl list_users

# 在 ADMQ 上重建用户
bin/admq rabbitmq admin add_user alice password123
bin/admq rabbitmq admin set_user_tags alice management
bin/admq rabbitmq admin set_permissions -p /prod alice ".*" ".*" ".*"

2.5 队列/Exchange 定义导出

RabbitMQ 提供 Definition 导出功能,可迁移队列、Exchange、绑定关系、策略(但不含消息):

bash
# 从原 RabbitMQ 导出定义(含队列/Exchange/Binding/策略)
curl -u admin:password http://old-rabbitmq:15672/api/definitions \
  -o rabbitmq-definitions.json

# 编辑 rabbitmq-definitions.json,修改用户密码哈希(不能直接迁移)
# 然后导入到 ADMQ
curl -X POST http://localhost:15672/api/definitions \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d @rabbitmq-definitions.json

注意: Definition 文件中的 users 字段包含 bcrypt 密码哈希,如两端 Erlang 版本不同则不兼容,建议手动重建用户,或使用 --reset-password 参数。


3. 从 ActiveMQ 迁移

3.1 协议映射

ActiveMQ 概念ADMQ RabbitMQ 对应
QueueClassic/Quorum Queue
TopicFanout/Topic Exchange
Virtual TopicTopic Exchange + 独立 Queue
Durable SubscriberDurable Queue + Topic 绑定
Advisory Message无直接对应,用 Management API 替代

3.2 目的地(Destination)命名迁移

java
// ActiveMQ 目的地命名
"queue://myQueue"          → AMQP 默认 Exchange,routingKey = "myQueue"
"topic://news.sports"      → amq.topic Exchange,routingKey = "news.sports"

// ActiveMQ 目的地命名示例
session.createQueue("ORDER.PROCESSING")   → queueDeclare("ORDER.PROCESSING")
session.createTopic("PRICES.USD.*")       → exchange "amq.topic", key "PRICES.USD.#"

3.3 代码迁移(JMS → AMQP)

ActiveMQ JMS 代码:

java
// 旧代码(ActiveMQ JMS)
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = cf.createConnection("user", "password");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("orders");
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage("Hello");
producer.send(msg);

迁移为 AMQP(RabbitMQ):

java
// 新代码(AMQP)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admq");
factory.setPassword("apusic_123");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("orders", true, false, false, null);
channel.basicPublish("", "orders", MessageProperties.PERSISTENT_TEXT_PLAIN,
    "Hello".getBytes());

3.4 STOMP 协议迁移(最小改动)

如 ActiveMQ 应用使用 STOMP 协议,可以直接切换 Broker 地址,代码基本不变:

python
# 只修改连接地址
# 原:ActiveMQ 61613
conn = stomp.Connection([('old-activemq-host', 61613)])

# 新:ADMQ RabbitMQ 61613
conn = stomp.Connection([('new-admq-rabbitmq-host', 61613)])
conn.connect('admq', 'apusic_123', wait=True)

# STOMP 目的地映射:
# /queue/xxx → RabbitMQ 中的 Queue "xxx"
# /topic/xxx → RabbitMQ 中的 amq.topic Exchange,routingKey = "xxx"

3.5 ActiveMQ 不支持的特性

ActiveMQ 特性替代方案
消息分组(Message Groups)Consistent Hash Exchange 插件
优先级队列x-max-priority 参数
定时消息rabbitmq_delayed_message_exchange 插件
大型消息拆分应用层拆分 + Claim Check 模式
消息重试计划(Redelivery Policy)死信队列 + 延迟重试队列

4. 从 Kafka 迁移

Kafka 与 RabbitMQ 消息模型差异较大,需要重新设计消息架构。

4.1 消息模型对比

概念KafkaADMQ RabbitMQ
消息存储日志,按 offset 访问,可重放队列,消费即删除(Stream 可重放)
消费组Consumer Group,多消费者共享 offset多个消费者绑定同一队列竞争消费
分区Partition 保证顺序单队列 FIFO;多 Queue 用 Consistent Hash
Topic 保留按时间/大小保留,无需消费消费后删除(Stream 类型可保留)
消费回溯任意 offset仅 Stream 队列支持
吞吐量极高(百万/秒)高(十万/秒),适合企业应用

4.2 适合迁移的场景

适合从 Kafka 迁移:

  • 消息量 < 100k/秒
  • 需要灵活的路由(Topic/Fanout/Direct)
  • 需要协议多样性(AMQP/MQTT/STOMP)
  • 需要内置的用户权限管理
  • 需要 Management UI 和可视化

不适合从 Kafka 迁移:

  • 大量历史数据回溯需求
  • 极高吞吐量(> 100k msg/s)
  • 严格要求消息流处理(KSQL/Flink)

4.3 Kafka Topic → RabbitMQ Queue 映射

Kafka Topic: "order-events"
  └─ Partition 0: 用于 order-service-A
  └─ Partition 1: 用于 order-service-B

→ RabbitMQ 方案一(顺序不敏感):
  Exchange: "order-events" (topic)
  Queue 1: "order-events.service-a" (消费者 A)
  Queue 2: "order-events.service-b" (消费者 B)

→ RabbitMQ 方案二(需要分区顺序):
  使用 Consistent Hash Exchange
  Exchange: "order-events" (x-consistent-hash)
  Queue 1: "order-events-0"
  Queue 2: "order-events-1"

4.4 Kafka Consumer Group → RabbitMQ

python
# Kafka Consumer Group(多消费者竞争消费)
consumer = KafkaConsumer('orders', group_id='order-processor', ...)

# RabbitMQ 等价:多消费者绑定同一队列
# 消费者 1
channel1.basic_consume(queue='order_queue', on_message_callback=handler)
# 消费者 2(自动实现 round-robin 竞争消费)
channel2.basic_consume(queue='order_queue', on_message_callback=handler)

4.5 消息回溯需求(使用 Stream)

bash
# 创建 Stream 队列(支持 offset 回溯)
curl -X PUT http://localhost:15672/api/queues/%2F/event-stream \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{
    "arguments": {
      "x-queue-type": "stream",
      "x-max-age": "7D",
      "x-stream-max-segment-size-bytes": 536870912
    }
  }'
java
// Stream 消费者(从任意 offset 消费)
Map<String, Object> consumerArgs = new HashMap<>();
consumerArgs.put("x-stream-offset", "first");         // 从头消费
// consumerArgs.put("x-stream-offset", "last");       // 只消费最新
// consumerArgs.put("x-stream-offset", timestamp);     // 从时间点消费

channel.basicConsume("event-stream", false, consumerArgs, callback, tag -> {});

5. 从 RocketMQ 迁移

5.1 概念映射

RocketMQ 概念ADMQ RabbitMQ 对应
TopicExchange + Queue
TagRouting Key(Topic Exchange)
Message GroupQuorum Queue / Consistent Hash
Consumer Group同一 Queue 多消费者
顺序消息单队列(不并行消费)
事务消息AMQP 事务 / Publisher Confirms
延迟消息死信队列延迟方案
消息过滤(SQL)Header Exchange

5.2 延迟消息替代方案

RocketMQ 支持按延迟级别发送延迟消息,RabbitMQ 通过 TTL + 死信队列实现:

java
// 实现 5 秒延迟队列
// Step 1: 创建延迟队列(TTL=5000ms,过期后发死信)
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "");
args.put("x-dead-letter-routing-key", "actual_queue");
args.put("x-message-ttl", 5000);
channel.queueDeclare("delay_5s", true, false, false, args);

// Step 2: 发送到延迟队列(5 秒后自动转发到 actual_queue)
channel.basicPublish("", "delay_5s", null, message.getBytes());

5.3 顺序消息迁移

java
// RocketMQ 顺序消息:相同 MessageGroup 发到同一分区
// RabbitMQ 等价:发到同一队列,只用一个消费者
channel.basicQos(1); // 确保顺序处理,每次处理完再取下一条

6. 从 EMQX 迁移(MQTT 场景)

如当前使用 EMQX 或其他 MQTT Broker,迁移到 ADMQ RabbitMQ 的 MQTT 功能。

6.1 协议完全兼容

MQTT 3.1 / 3.1.1 / 5.0 协议标准一致,客户端代码无需修改,只需更改 Broker 地址:

python
# 只修改 Broker 地址
client.connect("new-admq-rabbitmq-host", 1883, 60)
# 或 TLS
client.tls_set(...)
client.connect("new-admq-rabbitmq-host", 8883, 60)

6.2 MQTT 与 AMQP 互通优势

迁移到 ADMQ RabbitMQ 后,MQTT 消息可直接被 AMQP 消费者接收,无需额外桥接:

MQTT 设备 → (MQTT 1883) → ADMQ RabbitMQ → amq.topic Exchange

                                       AMQP 消费者(Java/Python/Go)

6.3 EMQX 特有功能替代

EMQX 功能ADMQ RabbitMQ 替代方案
规则引擎(Rule Engine)Shovel 插件 + 应用层处理
数据持久化到数据库消费者应用写入数据库
MQTT 认证插件rabbitmq_auth_backend_http
保留消息不支持 MQTT 保留消息(Retained Message)
Last Will 遗嘱消息✅ 支持
QoS 0/1✅ 支持
QoS 2✅ 支持
通配符订阅(# / +)✅ 支持

注意: ADMQ RabbitMQ 的 MQTT 插件不支持 保留消息(Retained Message),如业务依赖此特性,需要在应用层实现保留消息逻辑,或保留 MQTT 专用 Broker 处理此类消息。


7. 用户数据迁移

7.1 批量创建用户脚本

bash
#!/bin/bash
# migrate_users.sh
# 格式:username:password:tags:vhost:conf_perm:write_perm:read_perm

ADMQ_HOME=/path/to/admq-rabbitmq
ADMIN="$ADMQ_HOME/bin/admq rabbitmq admin"

create_user() {
    local user=$1 pass=$2 tags=$3 vhost=$4
    $ADMIN add_user "$user" "$pass"
    $ADMIN set_user_tags "$user" $tags
    $ADMIN set_permissions -p "$vhost" "$user" ".*" ".*" ".*"
    echo "Created user: $user"
}

# 按需添加
create_user "app_service"   "S3cur3P@ss!" "management"    "/prod"
create_user "admin_ops"     "Adm1nP@ss!" "administrator"  "/"
create_user "readonly_mon"  "M0n1t0r!"   "monitoring"     "/"

7.2 从 RabbitMQ Definition 文件迁移用户

python
#!/usr/bin/env python3
# reset_passwords.py - 在 definition.json 中重置用户密码
import json, hashlib, base64, os

with open('rabbitmq-definitions.json') as f:
    data = json.load(f)

# RabbitMQ 使用 SHA-256 密码哈希(加盐)
for user in data.get('users', []):
    print(f"User: {user['name']}, Tags: {user['tags']}")
    # 需要手动设置新密码,无法从旧哈希反推

print("\n请使用以下命令重建用户密码:")
for user in data.get('users', []):
    print(f"bin/admq rabbitmq admin change_password {user['name']} NEW_PASSWORD")

8. 消息数据迁移

8.1 在线迁移(Shovel 插件)

使用 RabbitMQ Shovel 插件将消息从源 Broker 拉取到 ADMQ:

bash
# 配置 Shovel:从源 RabbitMQ 迁移消息到 ADMQ
curl -X PUT http://localhost:15672/api/parameters/shovel/%2F/migration-shovel \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{
    "value": {
      "src-protocol": "amqp091",
      "src-uri": "amqp://user:pass@old-rabbitmq-host",
      "src-queue": "source_queue",
      "dest-protocol": "amqp091",
      "dest-uri": "amqp://admq:apusic_123@localhost",
      "dest-queue": "target_queue",
      "ack-mode": "on-confirm",
      "delete-after": "queue-length"
    }
  }'

8.2 验证消息迁移完成

bash
# 检查源队列是否清空
curl http://old-rabbitmq:15672/api/queues/%2F/source_queue -u user:pass \
  | python3 -c "import json,sys; q=json.load(sys.stdin); print(f'Messages: {q[\"messages\"]}')"

# 检查目标队列消息数
curl http://localhost:15672/api/queues/%2F/target_queue -u admq:apusic_123 \
  | python3 -c "import json,sys; q=json.load(sys.stdin); print(f'Messages: {q[\"messages\"]}')"

8.3 Federation(联邦)方案

如需长期双向同步(灰度迁移),使用 Federation 插件:

bash
# 配置 Federation upstream
curl -X PUT http://localhost:15672/api/parameters/federation-upstream/%2F/old-rabbitmq \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{"value": {"uri": "amqp://user:pass@old-rabbitmq-host", "expires": 3600000}}'

# 设置 Federation 策略
curl -X PUT http://localhost:15672/api/policies/%2F/federate-queues \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{
    "pattern": "^federated\\.",
    "definition": {"federation-upstream": "old-rabbitmq"},
    "apply-to": "queues"
  }'

9. 配置映射参考

9.1 ActiveMQ → ADMQ RabbitMQ 配置映射

ActiveMQ activemq.xmlADMQ rabbitmq.conf
<broker brokerName="...">cluster_name = ...
<transportConnector uri="tcp://:61616">listeners.tcp.default = 5672
<memoryUsage percentOfJvmHeap="70"/>vm_memory_high_watermark.relative = 0.7
<storeUsage limit="100 gb"/>disk_free_limit.absolute = 10gb
<authenticationPlugin>auth_backends.1 = ...

9.2 RocketMQ → ADMQ RabbitMQ 配置映射

RocketMQ broker.confADMQ rabbitmq.conf
brokerClusterNamecluster_name
listenPort = 10911listeners.tcp.default = 5672
maxMessageSize = 4194304max_message_size = 4194304
sendMessageThreadPoolNumstcp_listen_options.backlog

10. 迁移验证清单

10.1 功能验证

  • [ ] 应用可以成功连接到 ADMQ RabbitMQ
  • [ ] 消息可以正常发布到队列/Exchange
  • [ ] 消费者可以正常接收并确认消息
  • [ ] TLS 连接测试通过
  • [ ] 用户权限验证正确
  • [ ] 队列/Exchange/Binding 关系正确重建
  • [ ] 死信队列工作正常
  • [ ] 消息持久化验证(重启后消息未丢失)

10.2 性能验证

bash
# 使用 PerfTest 工具进行基准测试
# (需单独下载 rabbitmq-perf-test 工具)
java -jar perf-test.jar \
  --uri amqp://admq:apusic_123@localhost \
  --producers 10 \
  --consumers 10 \
  --queue perf-test \
  --rate 1000 \
  --duration 60

10.3 监控验证

  • [ ] Management UI(15672)可正常访问
  • [ ] Prometheus 指标端点(15692)正常返回
  • [ ] Apusic 管控台 Agent(58181)心跳正常
  • [ ] 队列深度、消息速率指标正常

10.4 高可用验证(集群场景)

  • [ ] 集群所有节点状态正常
  • [ ] 一个节点重启后服务不中断
  • [ ] 仲裁队列(Quorum Queue)正常同步
  • [ ] 客户端连接自动故障转移

10.5 切换准备清单

  • [ ] DNS 切换计划已确认
  • [ ] 回滚方案已准备(保留源系统 48 小时)
  • [ ] 通知下游团队变更时间窗口
  • [ ] 灰度切换比例确定(建议先 10% 流量)
  • [ ] 告警阈值已配置
  • [ ] 运维值班安排就绪

金蝶天燕(Apusic)企业级消息中间件套件