外观
ADMQ Kafka 迁移手册
版本:1.0 | 涵盖从主流消息系统迁移至 ADMQ Kafka 的完整指南
目录
1. 迁移总览
1.1 概念映射表
| 源系统 | ADMQ Kafka 对应概念 | 说明 |
|---|---|---|
| RocketMQ Topic | Kafka Topic | 直接对应 |
| RocketMQ Tag | Kafka Header 或 Topic 分支 | 需改造 |
| RocketMQ ConsumerGroup | Kafka ConsumerGroup | 直接对应 |
| RocketMQ OrderMessage | Kafka 相同 Key 消息 | 分区内有序 |
| RabbitMQ Exchange | 应用层路由 / Kafka Streams | 无原生对应 |
| RabbitMQ Queue | Kafka Topic + ConsumerGroup | 语义差异大 |
| RabbitMQ Binding | 客户端 subscribe 逻辑 | 需重构 |
| ActiveMQ Queue | Kafka Topic(single consumer group) | 语义近似 |
| ActiveMQ Topic | Kafka Topic(多 consumer group) | 直接对应 |
1.2 迁移挑战
| 挑战 | 程度 | 解决方案 |
|---|---|---|
| 路由逻辑重建 | 高 | 用 Kafka Streams 或应用层路由替代 |
| 消息顺序保证 | 中 | 使用一致的 Key,利用分区内顺序 |
| 延迟消息 | 高 | 独立实现延迟队列(RocksDB 时间轮) |
| 事务消息 | 中 | Kafka 原生事务 API |
| 消息积压处理 | 低 | Kafka 天然支持大积压,可配置保留期 |
| 消息回放 | 低 | Kafka 原生支持,优于大多数来源系统 |
2. 从 RocketMQ 迁移
2.1 生产者迁移
RocketMQ 原代码:
java
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);迁移至 Kafka:
java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Tag 改为 Header
ProducerRecord<String, String> record = new ProducerRecord<>("TopicTest", "Hello Kafka");
record.headers().add("tag", "TagA".getBytes());
producer.send(record);消费者端过滤(替代 Tag):
java
consumer.subscribe(Collections.singletonList("TopicTest"));
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(1000))) {
// 从 Header 读取 tag
Header tagHeader = record.headers().lastHeader("tag");
String tag = tagHeader != null ? new String(tagHeader.value()) : "";
if ("TagA".equals(tag)) {
// 处理 TagA 消息
}
}2.2 顺序消息迁移
RocketMQ 顺序消息:
java
// 顺序发送:同一 orderId 发到同一队列
producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = (int)(orderId % mqs.size());
return mqs.get(index);
}
}, orderId);Kafka 等价实现:
java
// 相同 key 的消息自动路由到同一分区,分区内有序
producer.send(new ProducerRecord<>("orders", String.valueOf(orderId), orderJson));2.3 事务消息迁移
RocketMQ 事务消息:
java
TransactionMQProducer producer = new TransactionMQProducer("group");
// Half Message → 本地事务 → Commit/RollbackKafka 事务(本地事务 + 消息原子提交):
java
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(new ProducerRecord<>("topic", key, value));
// 执行本地事务...
localDbCommit();
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}注意:Kafka 事务保证 Producer 端原子性,但不支持 RocketMQ 的"先查本地事务状态"回查机制,需应用层自行实现幂等消费。
2.4 延迟消息迁移
RocketMQ 支持固定延迟级别(1s/5s/10s/...2h),Kafka 无原生延迟消息支持。
方案一:独立延迟 Topic
java
// 发送到延迟 topic,带上目标时间戳
ProducerRecord<String, String> record = new ProducerRecord<>("delay-queue", key, value);
record.headers().add("target-time",
String.valueOf(System.currentTimeMillis() + delayMs).getBytes());
// 延迟服务轮询 delay-queue,时间到再转发到真实 topic方案二:使用 ADMQ 统一持久化层(见持久化方案文档)
3. 从 RabbitMQ 迁移
3.1 架构差异理解
RabbitMQ 模型:
Producer → Exchange → [Binding Rules] → Queue → Consumer
Kafka 模型:
Producer → Topic(Partition) → Consumer Group关键差异:
- RabbitMQ Queue 消息消费后即删除;Kafka 消息保留(按时间/大小)
- RabbitMQ Exchange 实现路由;Kafka 无中间路由,消费端过滤
- RabbitMQ 支持 DLQ(死信队列);Kafka 需手动实现
3.2 Exchange 类型迁移
| RabbitMQ Exchange 类型 | Kafka 实现方案 |
|---|---|
| Direct(精确路由) | 一个 routing key → 一个 Topic |
| Fanout(广播) | 多个 Consumer Group 消费同一 Topic |
| Topic(通配路由) | 应用层过滤 Header,或按规则拆分 Topic |
| Headers(Header 匹配) | Kafka Header + 消费端过滤 |
3.3 生产者迁移示例
RabbitMQ 原代码:
java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("order-exchange", "direct");
channel.basicPublish("order-exchange", "order.created",
null, "order body".getBytes());迁移至 Kafka(Direct 路由):
java
// routing key "order.created" → Topic "order-created"
producer.send(new ProducerRecord<>("order-created", orderId, orderJson));迁移至 Kafka(Fanout 广播):
java
// 生产者只发一次到 "notifications" topic
producer.send(new ProducerRecord<>("notifications", key, message));
// 多个服务各自创建独立 ConsumerGroup,都能收到全量消息
// email-service 的 group: "email-notification-group"
// sms-service 的 group: "sms-notification-group"3.4 消费者迁移示例
RabbitMQ 原代码:
java
channel.queueDeclare("order-queue", true, false, false, null);
channel.queueBind("order-queue", "order-exchange", "order.created");
channel.basicConsume("order-queue", false, (tag, delivery) -> {
String body = new String(delivery.getBody());
processOrder(body);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});迁移至 Kafka:
java
consumer.subscribe(Collections.singletonList("order-created"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value());
}
consumer.commitSync();
}3.5 DLQ(死信队列)迁移
RabbitMQ DLQ:
java
// 声明队列时绑定 DLX
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
channel.queueDeclare("my-queue", true, false, false, args);Kafka DLQ 实现:
java
// Spring Kafka 内置 DLT 支持
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// 重试 3 次后发送到 topic.DLT
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}4. 从 ActiveMQ 迁移
4.1 JMS 概念映射
| JMS / ActiveMQ | Kafka 等价 |
|---|---|
| Queue | Topic + 单 ConsumerGroup |
| Topic (Pub/Sub) | Topic + 多 ConsumerGroup |
| Durable Subscription | ConsumerGroup(Kafka 默认持久) |
| Message Selector | 消费端过滤 Header |
| JMSCorrelationID | Kafka Header |
| JMSExpiration | Kafka record.headers() + 消费端 TTL 检查 |
| Connection Factory | KafkaProducer/Consumer Properties |
4.2 Queue 模式迁移
ActiveMQ 原代码:
java
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 点对点队列
Queue queue = session.createQueue("ORDER.QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage("order body");
producer.send(msg);迁移至 Kafka(Queue → 单消费者组):
java
// 生产者
producer.send(new ProducerRecord<>("ORDER.QUEUE", orderId, orderBody));
// 消费者(同一 group 内只有一个实例处理每条消息 → 等价 Queue 语义)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-queue-consumers");
consumer.subscribe(Collections.singletonList("ORDER.QUEUE"));4.3 Topic 模式迁移
ActiveMQ 持久订阅:
java
Topic topic = session.createTopic("ALERTS");
MessageConsumer consumer = session.createDurableSubscriber(topic, "alert-subscriber-1");迁移至 Kafka(Topic → 多消费者组):
java
// 消费者组名 = 原订阅者名
props.put(ConsumerConfig.GROUP_ID_CONFIG, "alert-subscriber-1");
consumer.subscribe(Collections.singletonList("ALERTS"));
// Kafka 默认即持久化,无需特殊配置"持久订阅"4.4 Spring JMS → Spring Kafka
原 Spring JMS:
java
@JmsListener(destination = "ORDER.QUEUE", containerFactory = "jmsListenerContainerFactory")
public void receiveOrder(TextMessage message) {
processOrder(message.getText());
}迁移至 Spring Kafka:
java
@KafkaListener(topics = "ORDER.QUEUE", groupId = "order-processors")
public void receiveOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
processOrder(record.value());
ack.acknowledge();
}5. 从自建 Kafka 迁移
5.1 版本升级注意
ADMQ Kafka 支持 Kafka 3.x 协议,如来源系统版本较旧:
| 来源版本 | 主要变化 | 迁移风险 |
|---|---|---|
| < 2.0 | 消费者组协调 API 变更 | 高:需升级客户端 |
| 2.x | Producer API 无破坏性变更 | 低 |
| 3.0-3.8 | KRaft 稳定化,ZK 弃用 | 中:可选迁移 KRaft |
5.2 配置项变更(3.x → 3.9.1)
| 废弃配置 | 替代配置 |
|---|---|
log.message.format.version | 已移除,无需配置 |
inter.broker.protocol.version | 已移除 |
zookeeper.connect(KRaft 模式) | 不再需要 |
5.3 数据目录直接迁移
如目标是从自建 Kafka 迁移到 ADMQ Kafka 的 Kafka 版本相同(3.9.x):
bash
# 停止源 Kafka
src-kafka/bin/kafka-server-stop.sh
# 直接复制数据目录(离线迁移)
rsync -avz /data/kafka-logs/ admq-host:/var/lib/admq-kafka/data/
# 启动 ADMQ Kafka(使用相同 cluster.id)6. 数据迁移工具
6.1 MirrorMaker 2(在线镜像)
MirrorMaker 2 可以将源 Kafka 集群的消息实时同步到 ADMQ Kafka,实现零停机迁移。
配置文件 mm2.properties:
properties
# 集群别名
clusters=source, target
# 源集群连接
source.bootstrap.servers=source-kafka:9092
# 目标集群连接(ADMQ Kafka)
target.bootstrap.servers=admq-kafka-host:9092
# 同步规则
source->target.enabled=true
source->target.topics=.* # 同步所有 topic
source->target.groups=.* # 同步消费者组 offset
# topic 重命名(去掉 "source." 前缀)
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy启动 MirrorMaker 2:
bash
./kafka/bin/connect-mirror-maker.sh mm2.properties流量切换流程:
1. 启动 MirrorMaker 2,开始同步历史数据
2. 等待 Lag 降为 0(监控 mirror_maker_producer_record_error_rate)
3. 将生产者配置指向 ADMQ Kafka
4. 等待消费者消费完 ADMQ Kafka 中的消息
5. 关闭 MirrorMaker 2,下线源集群6.2 Kafka Connect JDBC(数据库消息迁移)
用于将其他 MQ 的消息先落库再导入 Kafka:
bash
# 部署 JDBC Source Connector,从过渡数据库读取消息导入 Kafka
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "migration-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://migration-db:3306/mq_migration",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "messages",
"topic.prefix": "migrated-"
}
}'6.3 脚本批量迁移(小数据量)
python
import pika
from kafka import KafkaProducer
import json
# 从 RabbitMQ 拉取消息
rabbitmq_conn = pika.BlockingConnection(pika.ConnectionParameters('source-rabbitmq'))
channel = rabbitmq_conn.channel()
# 写入 ADMQ Kafka
kafka_producer = KafkaProducer(
bootstrap_servers=['admq-kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
method, properties, body = channel.basic_get('source-queue', auto_ack=False)
while method:
kafka_producer.send('target-topic', value={'body': body.decode(), 'props': str(properties)})
channel.basic_ack(method.delivery_tag)
method, properties, body = channel.basic_get('source-queue', auto_ack=False)
kafka_producer.flush()
print("迁移完成")7. 迁移验证
7.1 消息数量验证
bash
# 检查 Topic 总消息数
./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server localhost:9092 \
--topic migrated-topic \
--time -1 # 最新 offset(=总消息数)7.2 消费者 Lag 验证
bash
./kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-consumer-group
# LAG 列应为 0 表示已消费完毕7.3 消息内容抽样对比
python
# 对比源系统和 ADMQ Kafka 的消息内容
sample_from_source = get_sample_from_rabbitmq(count=100)
sample_from_kafka = consume_n_messages('admq-kafka:9092', 'migrated-topic', count=100)
mismatches = []
for s, k in zip(sample_from_source, sample_from_kafka):
if s['body'] != k['value']['body']:
mismatches.append({'source': s, 'kafka': k})
print(f"抽样验证: {len(mismatches)} 条不一致(共 100 条)")7.4 性能基准对比
bash
# 生产者性能测试
./kafka/bin/kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092 acks=all
# 消费者性能测试
./kafka/bin/kafka-consumer-perf-test.sh \
--bootstrap-server localhost:9092 \
--topic perf-test \
--messages 10000008. 回滚方案
8.1 灰度切换策略
阶段一(1周): 生产者 10% 流量 → ADMQ Kafka,90% → 原系统
阶段二(2周): 生产者 50% 流量 → ADMQ Kafka
阶段三(3周): 生产者 100% 流量 → ADMQ Kafka,保持原系统消费者在线
阶段四(观察期):停止原系统消费者,下线原系统8.2 双写方案(最安全)
java
// 同时写入新旧系统,切换消费者
public void sendMessage(String topic, String key, String value) {
// 写入原系统
legacyProducer.send(topic, key, value);
// 同时写入 ADMQ Kafka
kafkaProducer.send(new ProducerRecord<>(topic, key, value));
}8.3 快速回滚
如出现问题,将消费者 bootstrap.servers 切回原集群,同时将 MirrorMaker 2 改为反向同步(target→source),即可快速回滚。