Skip to content

ADMQ Kafka 开发集成手册

版本:1.0


目录

  1. 客户端配置基础
  2. Java 客户端
  3. Spring Boot 集成
  4. Python 客户端
  5. Go 客户端
  6. Node.js 客户端
  7. Kafka Streams 流处理
  8. Kafka Connect 连接器
  9. 事务与精确一次语义
  10. Schema Registry 集成
  11. REST Proxy 接口
  12. 最佳实践

1. 客户端配置基础

1.1 连接参数

Bootstrap Server: localhost:9092

1.2 关键生产者参数

参数默认值推荐值(高可靠)说明
acks1all确认级别
retries03重试次数
linger.ms05批量等待时间
batch.size1638465536批次大小(字节)
compression.typenonelz4压缩算法
enable.idempotencefalsetrue幂等生产者

1.3 关键消费者参数

参数默认值推荐值说明
enable.auto.committruefalse手动提交 offset
auto.offset.resetlatest按需无初始 offset 时行为
max.poll.records500100~500每次 poll 最大消息数
session.timeout.ms4500030000消费者心跳超时
fetch.max.wait.ms500500拉取等待时间

2. Java 客户端

2.1 Maven 依赖

xml
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.1</version>
</dependency>

2.2 生产者示例

java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.Future;

public class AdmqKafkaProducer {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer");

        // 高可靠配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // 性能优化
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    "my-topic",                // topic
                    "key-" + i,               // key(决定分区)
                    "Hello ADMQ Kafka #" + i  // value
                );

                // 异步发送(带回调)
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("发送失败: " + exception.getMessage());
                    } else {
                        System.out.printf("已发送: topic=%s, partition=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    }
                });
            }
            producer.flush();
        }
    }
}

2.3 消费者示例

java
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class AdmqKafkaConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringDeserializer");

        // 手动提交,精确控制
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset=%d, key=%s, value=%s%n",
                        record.offset(), record.key(), record.value());
                    // 处理消息...
                }

                // 手动提交(处理完批次后)
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        }
    }
}

2.4 Admin API 示例

java
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class AdmqKafkaAdmin {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (AdminClient adminClient = AdminClient.create(props)) {
            // 创建 Topic
            NewTopic newTopic = new NewTopic("new-topic", 3, (short) 2);
            newTopic.configs(Map.of(
                "retention.ms", "86400000",
                "compression.type", "lz4"
            ));
            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();

            // 查看 Topic 列表
            Set<String> topics = adminClient.listTopics().names().get();
            System.out.println("Topics: " + topics);

            // 查看消费者组 Lag
            Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
                .listConsumerGroupOffsets("my-group")
                .partitionsToOffsetAndMetadata()
                .get();
            offsets.forEach((tp, offset) ->
                System.out.printf("  %s offset=%d%n", tp, offset.offset()));
        }
    }
}

3. Spring Boot 集成

3.1 依赖配置

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2 application.yml

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        linger.ms: 5
        batch.size: 65536
        compression.type: lz4
    consumer:
      group-id: spring-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        spring.json.trusted.packages: "com.example.*"
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 3        # 消费者线程数(≤ 分区数)

3.3 生产者 Service

java
@Service
@RequiredArgsConstructor
public class MessageProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String topic, String key, Object payload) {
        kafkaTemplate.send(topic, key, payload)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("消息发送失败 topic={}: {}", topic, ex.getMessage());
                } else {
                    log.info("消息已发送 partition={} offset={}",
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }
}

3.4 消费者 Listener

java
@Component
@Slf4j
public class OrderEventListener {

    @KafkaListener(
        topics = "order-events",
        groupId = "order-processor",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            ConsumerRecord<String, OrderEvent> record,
            Acknowledgment ack) {
        try {
            log.info("收到订单事件: {}", record.value());
            // 处理业务逻辑...
            processOrder(record.value());
            ack.acknowledge();  // 手动确认
        } catch (Exception e) {
            log.error("处理失败,不确认: {}", e.getMessage());
            // 不调用 ack.acknowledge(),消息会被重新投递
        }
    }

    // 批量消费
    @KafkaListener(topics = "bulk-events", batch = "true")
    public void handleBatch(List<ConsumerRecord<String, String>> records,
                            Acknowledgment ack) {
        log.info("批量处理 {} 条消息", records.size());
        records.forEach(r -> process(r.value()));
        ack.acknowledge();
    }
}

3.5 KafkaTemplate 配置

java
@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object>
           kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        // 死信队列配置
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            new FixedBackOff(1000L, 3L)
        ));
        return factory;
    }
}

4. Python 客户端

4.1 安装

bash
pip install kafka-python-ng

4.2 生产者

python
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=3,
    linger_ms=5,
    batch_size=65536,
    compression_type='lz4'
)

for i in range(100):
    future = producer.send(
        'my-topic',
        key=f'key-{i}',
        value={'id': i, 'msg': f'Hello #{i}'}
    )
    record_metadata = future.get(timeout=10)
    print(f"offset={record_metadata.offset}, partition={record_metadata.partition}")

producer.flush()
producer.close()

4.3 消费者

python
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='python-consumer-group',
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

for message in consumer:
    print(f"offset={message.offset} key={message.key} value={message.value}")
    consumer.commit()

5. Go 客户端

5.1 依赖

bash
go get github.com/IBM/sarama@latest

5.2 生产者

go
package main

import (
    "fmt"
    "github.com/IBM/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 3
    config.Producer.Return.Successes = true
    config.Producer.Compression = sarama.CompressionLZ4

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        msg := &sarama.ProducerMessage{
            Topic: "my-topic",
            Key:   sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
            Value: sarama.StringEncoder(fmt.Sprintf("Hello ADMQ Kafka #%d", i)),
        }
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Printf("发送失败: %v\n", err)
        } else {
            fmt.Printf("已发送: partition=%d offset=%d\n", partition, offset)
        }
    }
}

5.3 消费者组

go
package main

import (
    "context"
    "log"
    "github.com/IBM/sarama"
)

type ConsumerGroupHandler struct{}

func (h ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(
    session sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        log.Printf("offset=%d key=%s value=%s", msg.Offset, msg.Key, msg.Value)
        session.MarkMessage(msg, "")
    }
    return nil
}

func main() {
    config := sarama.NewConfig()
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = false

    cg, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "go-group", config)
    if err != nil {
        panic(err)
    }
    defer cg.Close()

    ctx := context.Background()
    for {
        cg.Consume(ctx, []string{"my-topic"}, ConsumerGroupHandler{})
    }
}

6. Node.js 客户端

6.1 安装

bash
npm install kafkajs

6.2 生产者与消费者

javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'admq-node-client',
  brokers: ['localhost:9092']
});

// 生产者
async function produce() {
  const producer = kafka.producer({
    allowAutoTopicCreation: false,
    idempotent: true,
  });
  await producer.connect();

  await producer.send({
    topic: 'my-topic',
    messages: [
      { key: 'key-1', value: JSON.stringify({ id: 1, msg: 'Hello' }) },
      { key: 'key-2', value: JSON.stringify({ id: 2, msg: 'World' }) },
    ],
  });

  await producer.disconnect();
}

// 消费者
async function consume() {
  const consumer = kafka.consumer({ groupId: 'node-consumer-group' });
  await consumer.connect();
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message, heartbeat, commitOffsetsIfNecessary }) => {
      console.log({
        offset: message.offset,
        key: message.key?.toString(),
        value: message.value?.toString(),
      });
      await commitOffsetsIfNecessary();
    },
  });
}

produce().catch(console.error);
consume().catch(console.error);

7. Kafka Streams 流处理

7.1 词频统计示例

java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream("text-input");

        KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\s+")))
            .groupBy((key, word) -> word)
            .count(Materialized.as("word-count-store"));

        wordCounts.toStream().to("word-count-output",
            Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

7.2 流式 Join 示例

java
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");

// Stream-Table Join:订单关联客户信息
KStream<String, EnrichedOrder> enrichedOrders = orders
    .join(customers,
          (order, customer) -> new EnrichedOrder(order, customer),
          Joined.with(Serdes.String(), orderSerde, customerSerde));

enrichedOrders.to("enriched-orders");

8. Kafka Connect 连接器

8.1 启动 Connect Worker

bash
./kafka/bin/connect-distributed.sh config/connect-distributed.properties

config/connect-distributed.properties

properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
rest.port=8083

8.2 JDBC Source Connector(MySQL → Kafka)

bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:mysql://localhost:3306/mydb",
      "connection.user": "kafka",
      "connection.password": "kafka-pass",
      "table.whitelist": "orders,customers",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "db-"
    }
  }'

8.3 File Sink Connector(Kafka → 文件)

bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "file-sink",
    "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": "1",
      "topics": "my-topic",
      "file": "/tmp/output.txt"
    }
  }'

9. 事务与精确一次语义

9.1 事务生产者

java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));

    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

9.2 Read-Process-Write 模式(Exactly-Once)

java
// 消费 + 处理 + 生产,包装在同一个事务中
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
    producer.beginTransaction();
    try {
        // 处理并发送到另一个 topic
        String result = process(record.value());
        producer.send(new ProducerRecord<>("output-topic", record.key(), result));

        // 将 offset 也提交到事务中(关键!)
        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
    }
});

10. Schema Registry 集成

使用 Avro 序列化(需要独立部署 Schema Registry 或使用 Confluent Cloud):

xml
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
java
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

// 使用 Avro Schema
Schema schema = new Schema.Parser().parse(
    "{\"type\":\"record\",\"name\":\"User\",\"fields\":[" +
    "{\"name\":\"id\",\"type\":\"int\"}," +
    "{\"name\":\"name\",\"type\":\"string\"}]}"
);

GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");

producer.send(new ProducerRecord<>("users", "user-1", user));

11. REST Proxy 接口

使用 Confluent REST Proxy 通过 HTTP 操作 Kafka(无需 Kafka 客户端库):

bash
# 发送消息
curl -X POST http://localhost:8082/topics/my-topic \
  -H "Content-Type: application/vnd.kafka.json.v2+json" \
  -d '{"records":[{"key":"key1","value":{"id":1,"msg":"hello"}}]}'

# 创建消费者
curl -X POST http://localhost:8082/consumers/my-group \
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{"name":"my-consumer","format":"json","auto.offset.reset":"earliest"}'

# 订阅 Topic
curl -X POST http://localhost:8082/consumers/my-group/instances/my-consumer/subscription \
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{"topics":["my-topic"]}'

# 拉取消息
curl http://localhost:8082/consumers/my-group/instances/my-consumer/records \
  -H "Accept: application/vnd.kafka.json.v2+json"

12. 最佳实践

12.1 分区数设计

场景建议分区数说明
低吞吐(< 10 MB/s)3~6过多分区反而增加开销
中等吞吐(10-100 MB/s)12~24消费者并行度 = 分区数
高吞吐(> 100 MB/s)48~96单 Broker 建议 < 4000 分区

12.2 消息键设计

java
// 相关消息使用相同 key → 保证同一分区顺序
// 订单相关消息:key = orderId
producer.send(new ProducerRecord<>("order-events", orderId, event));

// 需要广播(所有消费者都消费)→ key = null(随机分区)
producer.send(new ProducerRecord<>("config-updates", null, configChange));

12.3 消费者组扩展

分区数=6,消费者=3 → 每个消费者处理2个分区(最优)
分区数=6,消费者=6 → 每个消费者处理1个分区(最大并行)
分区数=6,消费者=8 → 2个消费者空闲(浪费)

12.4 错误处理模式

java
// 死信队列模式
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
        (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
    return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}

12.5 性能基准

场景参考 TPS
单分区顺序写入(无副本)500K/s
三副本集群(acks=all)200K/s
压缩(lz4)写入150K/s(消息小),400K/s(消息大)
批量消费(batch.size=64K)300K/s

金蝶天燕(Apusic)企业级消息中间件套件