外观
ADMQ RabbitMQ 迁移手册
产品版本: ADMQ RabbitMQ 2.0 日期: 2026-06
目录
1. 迁移概述
1.1 支持接入的源系统
| 源系统 | 迁移难度 | 协议兼容性 | 主要差异 |
|---|---|---|---|
| RabbitMQ(开源版) | ★☆☆☆☆ 极低 | 100% 兼容 | 只需修改连接参数 |
| ActiveMQ Classic | ★★★☆☆ 中等 | STOMP/AMQP 1.0 | 目的地命名语义不同 |
| ActiveMQ Artemis | ★★☆☆☆ 较低 | AMQP 1.0/STOMP | 接近,需适配 vhost |
| Kafka | ★★★★☆ 较高 | 无直接兼容 | 消息模型根本不同 |
| RocketMQ | ★★★★☆ 较高 | 无直接兼容 | 消息模型不同 |
| EMQX(MQTT 场景) | ★★☆☆☆ 较低 | MQTT 完全兼容 | MQTT 协议一致 |
1.2 迁移策略选择
推荐:双写过渡 → 验证 → 切换
1. 部署 ADMQ RabbitMQ,与源系统并行运行
2. 应用层同时向两套系统发消息(双写)
3. 消费者先从 ADMQ 消费,验证无误
4. 切换生产者,停止双写
5. 关停源系统2. 从原生 RabbitMQ 迁移
ADMQ RabbitMQ 完整实现 AMQP 0-9-1 协议,与标准 RabbitMQ 客户端完全兼容,迁移成本极低。
2.1 版本兼容性
| 源版本 | 迁移路径 |
|---|---|
| 3.12.x | 直接迁移,无需修改代码 |
| 3.11.x | 兼容,配置无变化 |
| 3.10.x 及以下 | 建议先升级到 3.12,再迁移 |
2.2 客户端代码无需修改
只需修改连接参数:
java
// 修改前(指向原生 RabbitMQ)
factory.setHost("old-rabbitmq-server");
factory.setUsername("your_user");
factory.setPassword("your_password");
// 修改后(指向 ADMQ RabbitMQ)
factory.setHost("new-admq-rabbitmq-server");
factory.setUsername("migrated_user"); // 迁移后的用户名
factory.setPassword("new_password"); // 迁移后的密码2.3 配置迁移
将原有 rabbitmq.conf 内容复制到 config/rabbitmq.conf,注意以下差异:
| 原生 RabbitMQ 配置 | ADMQ 配置 | 说明 |
|---|---|---|
| 原有账号 | 需手动重建 | 密码 hash 不可直接迁移 |
default_user = guest | default_user = admq | ADMQ 修改了默认用户 |
| 插件启用状态 | 需重新 enable | ADMQ 内置所有插件,只需启用 |
2.4 用户迁移
bash
# 在原 RabbitMQ 上导出用户列表
rabbitmqctl list_users
# 在 ADMQ 上重建用户
bin/admq rabbitmq admin add_user alice password123
bin/admq rabbitmq admin set_user_tags alice management
bin/admq rabbitmq admin set_permissions -p /prod alice ".*" ".*" ".*"2.5 队列/Exchange 定义导出
RabbitMQ 提供 Definition 导出功能,可迁移队列、Exchange、绑定关系、策略(但不含消息):
bash
# 从原 RabbitMQ 导出定义(含队列/Exchange/Binding/策略)
curl -u admin:password http://old-rabbitmq:15672/api/definitions \
-o rabbitmq-definitions.json
# 编辑 rabbitmq-definitions.json,修改用户密码哈希(不能直接迁移)
# 然后导入到 ADMQ
curl -X POST http://localhost:15672/api/definitions \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d @rabbitmq-definitions.json注意: Definition 文件中的
users字段包含 bcrypt 密码哈希,如两端 Erlang 版本不同则不兼容,建议手动重建用户,或使用--reset-password参数。
3. 从 ActiveMQ 迁移
3.1 协议映射
| ActiveMQ 概念 | ADMQ RabbitMQ 对应 |
|---|---|
| Queue | Classic/Quorum Queue |
| Topic | Fanout/Topic Exchange |
| Virtual Topic | Topic Exchange + 独立 Queue |
| Durable Subscriber | Durable Queue + Topic 绑定 |
| Advisory Message | 无直接对应,用 Management API 替代 |
3.2 目的地(Destination)命名迁移
java
// ActiveMQ 目的地命名
"queue://myQueue" → AMQP 默认 Exchange,routingKey = "myQueue"
"topic://news.sports" → amq.topic Exchange,routingKey = "news.sports"
// ActiveMQ 目的地命名示例
session.createQueue("ORDER.PROCESSING") → queueDeclare("ORDER.PROCESSING")
session.createTopic("PRICES.USD.*") → exchange "amq.topic", key "PRICES.USD.#"3.3 代码迁移(JMS → AMQP)
ActiveMQ JMS 代码:
java
// 旧代码(ActiveMQ JMS)
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = cf.createConnection("user", "password");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("orders");
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage("Hello");
producer.send(msg);迁移为 AMQP(RabbitMQ):
java
// 新代码(AMQP)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admq");
factory.setPassword("apusic_123");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("orders", true, false, false, null);
channel.basicPublish("", "orders", MessageProperties.PERSISTENT_TEXT_PLAIN,
"Hello".getBytes());3.4 STOMP 协议迁移(最小改动)
如 ActiveMQ 应用使用 STOMP 协议,可以直接切换 Broker 地址,代码基本不变:
python
# 只修改连接地址
# 原:ActiveMQ 61613
conn = stomp.Connection([('old-activemq-host', 61613)])
# 新:ADMQ RabbitMQ 61613
conn = stomp.Connection([('new-admq-rabbitmq-host', 61613)])
conn.connect('admq', 'apusic_123', wait=True)
# STOMP 目的地映射:
# /queue/xxx → RabbitMQ 中的 Queue "xxx"
# /topic/xxx → RabbitMQ 中的 amq.topic Exchange,routingKey = "xxx"3.5 ActiveMQ 不支持的特性
| ActiveMQ 特性 | 替代方案 |
|---|---|
| 消息分组(Message Groups) | Consistent Hash Exchange 插件 |
| 优先级队列 | x-max-priority 参数 |
| 定时消息 | rabbitmq_delayed_message_exchange 插件 |
| 大型消息拆分 | 应用层拆分 + Claim Check 模式 |
| 消息重试计划(Redelivery Policy) | 死信队列 + 延迟重试队列 |
4. 从 Kafka 迁移
Kafka 与 RabbitMQ 消息模型差异较大,需要重新设计消息架构。
4.1 消息模型对比
| 概念 | Kafka | ADMQ RabbitMQ |
|---|---|---|
| 消息存储 | 日志,按 offset 访问,可重放 | 队列,消费即删除(Stream 可重放) |
| 消费组 | Consumer Group,多消费者共享 offset | 多个消费者绑定同一队列竞争消费 |
| 分区 | Partition 保证顺序 | 单队列 FIFO;多 Queue 用 Consistent Hash |
| Topic 保留 | 按时间/大小保留,无需消费 | 消费后删除(Stream 类型可保留) |
| 消费回溯 | 任意 offset | 仅 Stream 队列支持 |
| 吞吐量 | 极高(百万/秒) | 高(十万/秒),适合企业应用 |
4.2 适合迁移的场景
✅ 适合从 Kafka 迁移:
- 消息量 < 100k/秒
- 需要灵活的路由(Topic/Fanout/Direct)
- 需要协议多样性(AMQP/MQTT/STOMP)
- 需要内置的用户权限管理
- 需要 Management UI 和可视化
❌ 不适合从 Kafka 迁移:
- 大量历史数据回溯需求
- 极高吞吐量(> 100k msg/s)
- 严格要求消息流处理(KSQL/Flink)
4.3 Kafka Topic → RabbitMQ Queue 映射
Kafka Topic: "order-events"
└─ Partition 0: 用于 order-service-A
└─ Partition 1: 用于 order-service-B
→ RabbitMQ 方案一(顺序不敏感):
Exchange: "order-events" (topic)
Queue 1: "order-events.service-a" (消费者 A)
Queue 2: "order-events.service-b" (消费者 B)
→ RabbitMQ 方案二(需要分区顺序):
使用 Consistent Hash Exchange
Exchange: "order-events" (x-consistent-hash)
Queue 1: "order-events-0"
Queue 2: "order-events-1"4.4 Kafka Consumer Group → RabbitMQ
python
# Kafka Consumer Group(多消费者竞争消费)
consumer = KafkaConsumer('orders', group_id='order-processor', ...)
# RabbitMQ 等价:多消费者绑定同一队列
# 消费者 1
channel1.basic_consume(queue='order_queue', on_message_callback=handler)
# 消费者 2(自动实现 round-robin 竞争消费)
channel2.basic_consume(queue='order_queue', on_message_callback=handler)4.5 消息回溯需求(使用 Stream)
bash
# 创建 Stream 队列(支持 offset 回溯)
curl -X PUT http://localhost:15672/api/queues/%2F/event-stream \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{
"arguments": {
"x-queue-type": "stream",
"x-max-age": "7D",
"x-stream-max-segment-size-bytes": 536870912
}
}'java
// Stream 消费者(从任意 offset 消费)
Map<String, Object> consumerArgs = new HashMap<>();
consumerArgs.put("x-stream-offset", "first"); // 从头消费
// consumerArgs.put("x-stream-offset", "last"); // 只消费最新
// consumerArgs.put("x-stream-offset", timestamp); // 从时间点消费
channel.basicConsume("event-stream", false, consumerArgs, callback, tag -> {});5. 从 RocketMQ 迁移
5.1 概念映射
| RocketMQ 概念 | ADMQ RabbitMQ 对应 |
|---|---|
| Topic | Exchange + Queue |
| Tag | Routing Key(Topic Exchange) |
| Message Group | Quorum Queue / Consistent Hash |
| Consumer Group | 同一 Queue 多消费者 |
| 顺序消息 | 单队列(不并行消费) |
| 事务消息 | AMQP 事务 / Publisher Confirms |
| 延迟消息 | 死信队列延迟方案 |
| 消息过滤(SQL) | Header Exchange |
5.2 延迟消息替代方案
RocketMQ 支持按延迟级别发送延迟消息,RabbitMQ 通过 TTL + 死信队列实现:
java
// 实现 5 秒延迟队列
// Step 1: 创建延迟队列(TTL=5000ms,过期后发死信)
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "");
args.put("x-dead-letter-routing-key", "actual_queue");
args.put("x-message-ttl", 5000);
channel.queueDeclare("delay_5s", true, false, false, args);
// Step 2: 发送到延迟队列(5 秒后自动转发到 actual_queue)
channel.basicPublish("", "delay_5s", null, message.getBytes());5.3 顺序消息迁移
java
// RocketMQ 顺序消息:相同 MessageGroup 发到同一分区
// RabbitMQ 等价:发到同一队列,只用一个消费者
channel.basicQos(1); // 确保顺序处理,每次处理完再取下一条6. 从 EMQX 迁移(MQTT 场景)
如当前使用 EMQX 或其他 MQTT Broker,迁移到 ADMQ RabbitMQ 的 MQTT 功能。
6.1 协议完全兼容
MQTT 3.1 / 3.1.1 / 5.0 协议标准一致,客户端代码无需修改,只需更改 Broker 地址:
python
# 只修改 Broker 地址
client.connect("new-admq-rabbitmq-host", 1883, 60)
# 或 TLS
client.tls_set(...)
client.connect("new-admq-rabbitmq-host", 8883, 60)6.2 MQTT 与 AMQP 互通优势
迁移到 ADMQ RabbitMQ 后,MQTT 消息可直接被 AMQP 消费者接收,无需额外桥接:
MQTT 设备 → (MQTT 1883) → ADMQ RabbitMQ → amq.topic Exchange
↓
AMQP 消费者(Java/Python/Go)6.3 EMQX 特有功能替代
| EMQX 功能 | ADMQ RabbitMQ 替代方案 |
|---|---|
| 规则引擎(Rule Engine) | Shovel 插件 + 应用层处理 |
| 数据持久化到数据库 | 消费者应用写入数据库 |
| MQTT 认证插件 | rabbitmq_auth_backend_http |
| 保留消息 | 不支持 MQTT 保留消息(Retained Message) |
| Last Will 遗嘱消息 | ✅ 支持 |
| QoS 0/1 | ✅ 支持 |
| QoS 2 | ✅ 支持 |
| 通配符订阅(# / +) | ✅ 支持 |
注意: ADMQ RabbitMQ 的 MQTT 插件不支持 保留消息(Retained Message),如业务依赖此特性,需要在应用层实现保留消息逻辑,或保留 MQTT 专用 Broker 处理此类消息。
7. 用户数据迁移
7.1 批量创建用户脚本
bash
#!/bin/bash
# migrate_users.sh
# 格式:username:password:tags:vhost:conf_perm:write_perm:read_perm
ADMQ_HOME=/path/to/admq-rabbitmq
ADMIN="$ADMQ_HOME/bin/admq rabbitmq admin"
create_user() {
local user=$1 pass=$2 tags=$3 vhost=$4
$ADMIN add_user "$user" "$pass"
$ADMIN set_user_tags "$user" $tags
$ADMIN set_permissions -p "$vhost" "$user" ".*" ".*" ".*"
echo "Created user: $user"
}
# 按需添加
create_user "app_service" "S3cur3P@ss!" "management" "/prod"
create_user "admin_ops" "Adm1nP@ss!" "administrator" "/"
create_user "readonly_mon" "M0n1t0r!" "monitoring" "/"7.2 从 RabbitMQ Definition 文件迁移用户
python
#!/usr/bin/env python3
# reset_passwords.py - 在 definition.json 中重置用户密码
import json, hashlib, base64, os
with open('rabbitmq-definitions.json') as f:
data = json.load(f)
# RabbitMQ 使用 SHA-256 密码哈希(加盐)
for user in data.get('users', []):
print(f"User: {user['name']}, Tags: {user['tags']}")
# 需要手动设置新密码,无法从旧哈希反推
print("\n请使用以下命令重建用户密码:")
for user in data.get('users', []):
print(f"bin/admq rabbitmq admin change_password {user['name']} NEW_PASSWORD")8. 消息数据迁移
8.1 在线迁移(Shovel 插件)
使用 RabbitMQ Shovel 插件将消息从源 Broker 拉取到 ADMQ:
bash
# 配置 Shovel:从源 RabbitMQ 迁移消息到 ADMQ
curl -X PUT http://localhost:15672/api/parameters/shovel/%2F/migration-shovel \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://user:pass@old-rabbitmq-host",
"src-queue": "source_queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://admq:apusic_123@localhost",
"dest-queue": "target_queue",
"ack-mode": "on-confirm",
"delete-after": "queue-length"
}
}'8.2 验证消息迁移完成
bash
# 检查源队列是否清空
curl http://old-rabbitmq:15672/api/queues/%2F/source_queue -u user:pass \
| python3 -c "import json,sys; q=json.load(sys.stdin); print(f'Messages: {q[\"messages\"]}')"
# 检查目标队列消息数
curl http://localhost:15672/api/queues/%2F/target_queue -u admq:apusic_123 \
| python3 -c "import json,sys; q=json.load(sys.stdin); print(f'Messages: {q[\"messages\"]}')"8.3 Federation(联邦)方案
如需长期双向同步(灰度迁移),使用 Federation 插件:
bash
# 配置 Federation upstream
curl -X PUT http://localhost:15672/api/parameters/federation-upstream/%2F/old-rabbitmq \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{"value": {"uri": "amqp://user:pass@old-rabbitmq-host", "expires": 3600000}}'
# 设置 Federation 策略
curl -X PUT http://localhost:15672/api/policies/%2F/federate-queues \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{
"pattern": "^federated\\.",
"definition": {"federation-upstream": "old-rabbitmq"},
"apply-to": "queues"
}'9. 配置映射参考
9.1 ActiveMQ → ADMQ RabbitMQ 配置映射
| ActiveMQ activemq.xml | ADMQ rabbitmq.conf |
|---|---|
<broker brokerName="..."> | cluster_name = ... |
<transportConnector uri="tcp://:61616"> | listeners.tcp.default = 5672 |
<memoryUsage percentOfJvmHeap="70"/> | vm_memory_high_watermark.relative = 0.7 |
<storeUsage limit="100 gb"/> | disk_free_limit.absolute = 10gb |
<authenticationPlugin> | auth_backends.1 = ... |
9.2 RocketMQ → ADMQ RabbitMQ 配置映射
| RocketMQ broker.conf | ADMQ rabbitmq.conf |
|---|---|
brokerClusterName | cluster_name |
listenPort = 10911 | listeners.tcp.default = 5672 |
maxMessageSize = 4194304 | max_message_size = 4194304 |
sendMessageThreadPoolNums | tcp_listen_options.backlog |
10. 迁移验证清单
10.1 功能验证
- [ ] 应用可以成功连接到 ADMQ RabbitMQ
- [ ] 消息可以正常发布到队列/Exchange
- [ ] 消费者可以正常接收并确认消息
- [ ] TLS 连接测试通过
- [ ] 用户权限验证正确
- [ ] 队列/Exchange/Binding 关系正确重建
- [ ] 死信队列工作正常
- [ ] 消息持久化验证(重启后消息未丢失)
10.2 性能验证
bash
# 使用 PerfTest 工具进行基准测试
# (需单独下载 rabbitmq-perf-test 工具)
java -jar perf-test.jar \
--uri amqp://admq:apusic_123@localhost \
--producers 10 \
--consumers 10 \
--queue perf-test \
--rate 1000 \
--duration 6010.3 监控验证
- [ ] Management UI(15672)可正常访问
- [ ] Prometheus 指标端点(15692)正常返回
- [ ] Apusic 管控台 Agent(58181)心跳正常
- [ ] 队列深度、消息速率指标正常
10.4 高可用验证(集群场景)
- [ ] 集群所有节点状态正常
- [ ] 一个节点重启后服务不中断
- [ ] 仲裁队列(Quorum Queue)正常同步
- [ ] 客户端连接自动故障转移
10.5 切换准备清单
- [ ] DNS 切换计划已确认
- [ ] 回滚方案已准备(保留源系统 48 小时)
- [ ] 通知下游团队变更时间窗口
- [ ] 灰度切换比例确定(建议先 10% 流量)
- [ ] 告警阈值已配置
- [ ] 运维值班安排就绪