Skip to content

ADMQ Kafka 迁移手册

版本:1.0 | 涵盖从主流消息系统迁移至 ADMQ Kafka 的完整指南


目录

  1. 迁移总览
  2. 从 RocketMQ 迁移
  3. 从 RabbitMQ 迁移
  4. 从 ActiveMQ 迁移
  5. 从自建 Kafka 迁移
  6. 数据迁移工具
  7. 迁移验证
  8. 回滚方案

1. 迁移总览

1.1 概念映射表

源系统ADMQ Kafka 对应概念说明
RocketMQ TopicKafka Topic直接对应
RocketMQ TagKafka Header 或 Topic 分支需改造
RocketMQ ConsumerGroupKafka ConsumerGroup直接对应
RocketMQ OrderMessageKafka 相同 Key 消息分区内有序
RabbitMQ Exchange应用层路由 / Kafka Streams无原生对应
RabbitMQ QueueKafka Topic + ConsumerGroup语义差异大
RabbitMQ Binding客户端 subscribe 逻辑需重构
ActiveMQ QueueKafka Topic(single consumer group)语义近似
ActiveMQ TopicKafka Topic(多 consumer group)直接对应

1.2 迁移挑战

挑战程度解决方案
路由逻辑重建用 Kafka Streams 或应用层路由替代
消息顺序保证使用一致的 Key,利用分区内顺序
延迟消息独立实现延迟队列(RocksDB 时间轮)
事务消息Kafka 原生事务 API
消息积压处理Kafka 天然支持大积压,可配置保留期
消息回放Kafka 原生支持,优于大多数来源系统

2. 从 RocketMQ 迁移

2.1 生产者迁移

RocketMQ 原代码:

java
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);

迁移至 Kafka:

java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Tag 改为 Header
ProducerRecord<String, String> record = new ProducerRecord<>("TopicTest", "Hello Kafka");
record.headers().add("tag", "TagA".getBytes());
producer.send(record);

消费者端过滤(替代 Tag):

java
consumer.subscribe(Collections.singletonList("TopicTest"));

for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(1000))) {
    // 从 Header 读取 tag
    Header tagHeader = record.headers().lastHeader("tag");
    String tag = tagHeader != null ? new String(tagHeader.value()) : "";

    if ("TagA".equals(tag)) {
        // 处理 TagA 消息
    }
}

2.2 顺序消息迁移

RocketMQ 顺序消息:

java
// 顺序发送:同一 orderId 发到同一队列
producer.send(msg, new MessageQueueSelector() {
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int index = (int)(orderId % mqs.size());
        return mqs.get(index);
    }
}, orderId);

Kafka 等价实现:

java
// 相同 key 的消息自动路由到同一分区,分区内有序
producer.send(new ProducerRecord<>("orders", String.valueOf(orderId), orderJson));

2.3 事务消息迁移

RocketMQ 事务消息:

java
TransactionMQProducer producer = new TransactionMQProducer("group");
// Half Message → 本地事务 → Commit/Rollback

Kafka 事务(本地事务 + 消息原子提交):

java
producer.initTransactions();
producer.beginTransaction();
try {
    producer.send(new ProducerRecord<>("topic", key, value));
    // 执行本地事务...
    localDbCommit();
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

注意:Kafka 事务保证 Producer 端原子性,但不支持 RocketMQ 的"先查本地事务状态"回查机制,需应用层自行实现幂等消费。

2.4 延迟消息迁移

RocketMQ 支持固定延迟级别(1s/5s/10s/...2h),Kafka 无原生延迟消息支持。

方案一:独立延迟 Topic

java
// 发送到延迟 topic,带上目标时间戳
ProducerRecord<String, String> record = new ProducerRecord<>("delay-queue", key, value);
record.headers().add("target-time",
    String.valueOf(System.currentTimeMillis() + delayMs).getBytes());

// 延迟服务轮询 delay-queue,时间到再转发到真实 topic

方案二:使用 ADMQ 统一持久化层(见持久化方案文档)


3. 从 RabbitMQ 迁移

3.1 架构差异理解

RabbitMQ 模型:
Producer → Exchange → [Binding Rules] → Queue → Consumer

Kafka 模型:
Producer → Topic(Partition) → Consumer Group

关键差异

  • RabbitMQ Queue 消息消费后即删除;Kafka 消息保留(按时间/大小)
  • RabbitMQ Exchange 实现路由;Kafka 无中间路由,消费端过滤
  • RabbitMQ 支持 DLQ(死信队列);Kafka 需手动实现

3.2 Exchange 类型迁移

RabbitMQ Exchange 类型Kafka 实现方案
Direct(精确路由)一个 routing key → 一个 Topic
Fanout(广播)多个 Consumer Group 消费同一 Topic
Topic(通配路由)应用层过滤 Header,或按规则拆分 Topic
Headers(Header 匹配)Kafka Header + 消费端过滤

3.3 生产者迁移示例

RabbitMQ 原代码:

java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare("order-exchange", "direct");
channel.basicPublish("order-exchange", "order.created",
    null, "order body".getBytes());

迁移至 Kafka(Direct 路由):

java
// routing key "order.created" → Topic "order-created"
producer.send(new ProducerRecord<>("order-created", orderId, orderJson));

迁移至 Kafka(Fanout 广播):

java
// 生产者只发一次到 "notifications" topic
producer.send(new ProducerRecord<>("notifications", key, message));

// 多个服务各自创建独立 ConsumerGroup,都能收到全量消息
// email-service 的 group: "email-notification-group"
// sms-service 的 group: "sms-notification-group"

3.4 消费者迁移示例

RabbitMQ 原代码:

java
channel.queueDeclare("order-queue", true, false, false, null);
channel.queueBind("order-queue", "order-exchange", "order.created");

channel.basicConsume("order-queue", false, (tag, delivery) -> {
    String body = new String(delivery.getBody());
    processOrder(body);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});

迁移至 Kafka:

java
consumer.subscribe(Collections.singletonList("order-created"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<String, String> record : records) {
        processOrder(record.value());
    }
    consumer.commitSync();
}

3.5 DLQ(死信队列)迁移

RabbitMQ DLQ:

java
// 声明队列时绑定 DLX
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
channel.queueDeclare("my-queue", true, false, false, args);

Kafka DLQ 实现:

java
// Spring Kafka 内置 DLT 支持
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
    DeadLetterPublishingRecoverer recoverer =
        new DeadLetterPublishingRecoverer(kafkaTemplate);
    // 重试 3 次后发送到 topic.DLT
    return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}

4. 从 ActiveMQ 迁移

4.1 JMS 概念映射

JMS / ActiveMQKafka 等价
QueueTopic + 单 ConsumerGroup
Topic (Pub/Sub)Topic + 多 ConsumerGroup
Durable SubscriptionConsumerGroup(Kafka 默认持久)
Message Selector消费端过滤 Header
JMSCorrelationIDKafka Header
JMSExpirationKafka record.headers() + 消费端 TTL 检查
Connection FactoryKafkaProducer/Consumer Properties

4.2 Queue 模式迁移

ActiveMQ 原代码:

java
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 点对点队列
Queue queue = session.createQueue("ORDER.QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage("order body");
producer.send(msg);

迁移至 Kafka(Queue → 单消费者组):

java
// 生产者
producer.send(new ProducerRecord<>("ORDER.QUEUE", orderId, orderBody));

// 消费者(同一 group 内只有一个实例处理每条消息 → 等价 Queue 语义)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-queue-consumers");
consumer.subscribe(Collections.singletonList("ORDER.QUEUE"));

4.3 Topic 模式迁移

ActiveMQ 持久订阅:

java
Topic topic = session.createTopic("ALERTS");
MessageConsumer consumer = session.createDurableSubscriber(topic, "alert-subscriber-1");

迁移至 Kafka(Topic → 多消费者组):

java
// 消费者组名 = 原订阅者名
props.put(ConsumerConfig.GROUP_ID_CONFIG, "alert-subscriber-1");
consumer.subscribe(Collections.singletonList("ALERTS"));
// Kafka 默认即持久化,无需特殊配置"持久订阅"

4.4 Spring JMS → Spring Kafka

原 Spring JMS:

java
@JmsListener(destination = "ORDER.QUEUE", containerFactory = "jmsListenerContainerFactory")
public void receiveOrder(TextMessage message) {
    processOrder(message.getText());
}

迁移至 Spring Kafka:

java
@KafkaListener(topics = "ORDER.QUEUE", groupId = "order-processors")
public void receiveOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
    processOrder(record.value());
    ack.acknowledge();
}

5. 从自建 Kafka 迁移

5.1 版本升级注意

ADMQ Kafka 支持 Kafka 3.x 协议,如来源系统版本较旧:

来源版本主要变化迁移风险
< 2.0消费者组协调 API 变更高:需升级客户端
2.xProducer API 无破坏性变更
3.0-3.8KRaft 稳定化,ZK 弃用中:可选迁移 KRaft

5.2 配置项变更(3.x → 3.9.1)

废弃配置替代配置
log.message.format.version已移除,无需配置
inter.broker.protocol.version已移除
zookeeper.connect(KRaft 模式)不再需要

5.3 数据目录直接迁移

如目标是从自建 Kafka 迁移到 ADMQ Kafka 的 Kafka 版本相同(3.9.x):

bash
# 停止源 Kafka
src-kafka/bin/kafka-server-stop.sh

# 直接复制数据目录(离线迁移)
rsync -avz /data/kafka-logs/ admq-host:/var/lib/admq-kafka/data/

# 启动 ADMQ Kafka(使用相同 cluster.id)

6. 数据迁移工具

6.1 MirrorMaker 2(在线镜像)

MirrorMaker 2 可以将源 Kafka 集群的消息实时同步到 ADMQ Kafka,实现零停机迁移。

配置文件 mm2.properties

properties
# 集群别名
clusters=source, target

# 源集群连接
source.bootstrap.servers=source-kafka:9092

# 目标集群连接(ADMQ Kafka)
target.bootstrap.servers=admq-kafka-host:9092

# 同步规则
source->target.enabled=true
source->target.topics=.*          # 同步所有 topic
source->target.groups=.*          # 同步消费者组 offset

# topic 重命名(去掉 "source." 前缀)
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

启动 MirrorMaker 2:

bash
./kafka/bin/connect-mirror-maker.sh mm2.properties

流量切换流程:

1. 启动 MirrorMaker 2,开始同步历史数据
2. 等待 Lag 降为 0(监控 mirror_maker_producer_record_error_rate)
3. 将生产者配置指向 ADMQ Kafka
4. 等待消费者消费完 ADMQ Kafka 中的消息
5. 关闭 MirrorMaker 2,下线源集群

6.2 Kafka Connect JDBC(数据库消息迁移)

用于将其他 MQ 的消息先落库再导入 Kafka:

bash
# 部署 JDBC Source Connector,从过渡数据库读取消息导入 Kafka
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "migration-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:mysql://migration-db:3306/mq_migration",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "table.whitelist": "messages",
      "topic.prefix": "migrated-"
    }
  }'

6.3 脚本批量迁移(小数据量)

python
import pika
from kafka import KafkaProducer
import json

# 从 RabbitMQ 拉取消息
rabbitmq_conn = pika.BlockingConnection(pika.ConnectionParameters('source-rabbitmq'))
channel = rabbitmq_conn.channel()

# 写入 ADMQ Kafka
kafka_producer = KafkaProducer(
    bootstrap_servers=['admq-kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

method, properties, body = channel.basic_get('source-queue', auto_ack=False)
while method:
    kafka_producer.send('target-topic', value={'body': body.decode(), 'props': str(properties)})
    channel.basic_ack(method.delivery_tag)
    method, properties, body = channel.basic_get('source-queue', auto_ack=False)

kafka_producer.flush()
print("迁移完成")

7. 迁移验证

7.1 消息数量验证

bash
# 检查 Topic 总消息数
./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --bootstrap-server localhost:9092 \
  --topic migrated-topic \
  --time -1  # 最新 offset(=总消息数)

7.2 消费者 Lag 验证

bash
./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-consumer-group
# LAG 列应为 0 表示已消费完毕

7.3 消息内容抽样对比

python
# 对比源系统和 ADMQ Kafka 的消息内容
sample_from_source = get_sample_from_rabbitmq(count=100)
sample_from_kafka = consume_n_messages('admq-kafka:9092', 'migrated-topic', count=100)

mismatches = []
for s, k in zip(sample_from_source, sample_from_kafka):
    if s['body'] != k['value']['body']:
        mismatches.append({'source': s, 'kafka': k})

print(f"抽样验证: {len(mismatches)} 条不一致(共 100 条)")

7.4 性能基准对比

bash
# 生产者性能测试
./kafka/bin/kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=localhost:9092 acks=all

# 消费者性能测试
./kafka/bin/kafka-consumer-perf-test.sh \
  --bootstrap-server localhost:9092 \
  --topic perf-test \
  --messages 1000000

8. 回滚方案

8.1 灰度切换策略

阶段一(1周):  生产者 10% 流量 → ADMQ Kafka,90% → 原系统
阶段二(2周):  生产者 50% 流量 → ADMQ Kafka
阶段三(3周):  生产者 100% 流量 → ADMQ Kafka,保持原系统消费者在线
阶段四(观察期):停止原系统消费者,下线原系统

8.2 双写方案(最安全)

java
// 同时写入新旧系统,切换消费者
public void sendMessage(String topic, String key, String value) {
    // 写入原系统
    legacyProducer.send(topic, key, value);

    // 同时写入 ADMQ Kafka
    kafkaProducer.send(new ProducerRecord<>(topic, key, value));
}

8.3 快速回滚

如出现问题,将消费者 bootstrap.servers 切回原集群,同时将 MirrorMaker 2 改为反向同步(target→source),即可快速回滚。

金蝶天燕(Apusic)企业级消息中间件套件