外观
ADMQ Kafka 开发集成手册
版本:1.0
目录
- 客户端配置基础
- Java 客户端
- Spring Boot 集成
- Python 客户端
- Go 客户端
- Node.js 客户端
- Kafka Streams 流处理
- Kafka Connect 连接器
- 事务与精确一次语义
- Schema Registry 集成
- REST Proxy 接口
- 最佳实践
1. 客户端配置基础
1.1 连接参数
Bootstrap Server: localhost:90921.2 关键生产者参数
| 参数 | 默认值 | 推荐值(高可靠) | 说明 |
|---|---|---|---|
acks | 1 | all | 确认级别 |
retries | 0 | 3 | 重试次数 |
linger.ms | 0 | 5 | 批量等待时间 |
batch.size | 16384 | 65536 | 批次大小(字节) |
compression.type | none | lz4 | 压缩算法 |
enable.idempotence | false | true | 幂等生产者 |
1.3 关键消费者参数
| 参数 | 默认值 | 推荐值 | 说明 |
|---|---|---|---|
enable.auto.commit | true | false | 手动提交 offset |
auto.offset.reset | latest | 按需 | 无初始 offset 时行为 |
max.poll.records | 500 | 100~500 | 每次 poll 最大消息数 |
session.timeout.ms | 45000 | 30000 | 消费者心跳超时 |
fetch.max.wait.ms | 500 | 500 | 拉取等待时间 |
2. Java 客户端
2.1 Maven 依赖
xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>2.2 生产者示例
java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.Future;
public class AdmqKafkaProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 高可靠配置
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 性能优化
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic", // topic
"key-" + i, // key(决定分区)
"Hello ADMQ Kafka #" + i // value
);
// 异步发送(带回调)
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.printf("已发送: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
producer.flush();
}
}
}2.3 消费者示例
java
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AdmqKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 手动提交,精确控制
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
// 处理消息...
}
// 手动提交(处理完批次后)
if (!records.isEmpty()) {
consumer.commitSync();
}
}
}
}
}2.4 Admin API 示例
java
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class AdmqKafkaAdmin {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 创建 Topic
NewTopic newTopic = new NewTopic("new-topic", 3, (short) 2);
newTopic.configs(Map.of(
"retention.ms", "86400000",
"compression.type", "lz4"
));
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
// 查看 Topic 列表
Set<String> topics = adminClient.listTopics().names().get();
System.out.println("Topics: " + topics);
// 查看消费者组 Lag
Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
.listConsumerGroupOffsets("my-group")
.partitionsToOffsetAndMetadata()
.get();
offsets.forEach((tp, offset) ->
System.out.printf(" %s offset=%d%n", tp, offset.offset()));
}
}
}3. Spring Boot 集成
3.1 依赖配置
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>3.2 application.yml
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
linger.ms: 5
batch.size: 65536
compression.type: lz4
consumer:
group-id: spring-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.json.trusted.packages: "com.example.*"
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 3 # 消费者线程数(≤ 分区数)3.3 生产者 Service
java
@Service
@RequiredArgsConstructor
public class MessageProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(String topic, String key, Object payload) {
kafkaTemplate.send(topic, key, payload)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("消息发送失败 topic={}: {}", topic, ex.getMessage());
} else {
log.info("消息已发送 partition={} offset={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}3.4 消费者 Listener
java
@Component
@Slf4j
public class OrderEventListener {
@KafkaListener(
topics = "order-events",
groupId = "order-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("收到订单事件: {}", record.value());
// 处理业务逻辑...
processOrder(record.value());
ack.acknowledge(); // 手动确认
} catch (Exception e) {
log.error("处理失败,不确认: {}", e.getMessage());
// 不调用 ack.acknowledge(),消息会被重新投递
}
}
// 批量消费
@KafkaListener(topics = "bulk-events", batch = "true")
public void handleBatch(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
log.info("批量处理 {} 条消息", records.size());
records.forEach(r -> process(r.value()));
ack.acknowledge();
}
}3.5 KafkaTemplate 配置
java
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 死信队列配置
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3L)
));
return factory;
}
}4. Python 客户端
4.1 安装
bash
pip install kafka-python-ng4.2 生产者
python
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
linger_ms=5,
batch_size=65536,
compression_type='lz4'
)
for i in range(100):
future = producer.send(
'my-topic',
key=f'key-{i}',
value={'id': i, 'msg': f'Hello #{i}'}
)
record_metadata = future.get(timeout=10)
print(f"offset={record_metadata.offset}, partition={record_metadata.partition}")
producer.flush()
producer.close()4.3 消费者
python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='python-consumer-group',
key_deserializer=lambda k: k.decode('utf-8') if k else None,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False
)
for message in consumer:
print(f"offset={message.offset} key={message.key} value={message.value}")
consumer.commit()5. Go 客户端
5.1 依赖
bash
go get github.com/IBM/sarama@latest5.2 生产者
go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
for i := 0; i < 10; i++ {
msg := &sarama.ProducerMessage{
Topic: "my-topic",
Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("Hello ADMQ Kafka #%d", i)),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Printf("发送失败: %v\n", err)
} else {
fmt.Printf("已发送: partition=%d offset=%d\n", partition, offset)
}
}
}5.3 消费者组
go
package main
import (
"context"
"log"
"github.com/IBM/sarama"
)
type ConsumerGroupHandler struct{}
func (h ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("offset=%d key=%s value=%s", msg.Offset, msg.Key, msg.Value)
session.MarkMessage(msg, "")
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false
cg, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "go-group", config)
if err != nil {
panic(err)
}
defer cg.Close()
ctx := context.Background()
for {
cg.Consume(ctx, []string{"my-topic"}, ConsumerGroupHandler{})
}
}6. Node.js 客户端
6.1 安装
bash
npm install kafkajs6.2 生产者与消费者
javascript
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'admq-node-client',
brokers: ['localhost:9092']
});
// 生产者
async function produce() {
const producer = kafka.producer({
allowAutoTopicCreation: false,
idempotent: true,
});
await producer.connect();
await producer.send({
topic: 'my-topic',
messages: [
{ key: 'key-1', value: JSON.stringify({ id: 1, msg: 'Hello' }) },
{ key: 'key-2', value: JSON.stringify({ id: 2, msg: 'World' }) },
],
});
await producer.disconnect();
}
// 消费者
async function consume() {
const consumer = kafka.consumer({ groupId: 'node-consumer-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message, heartbeat, commitOffsetsIfNecessary }) => {
console.log({
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
});
await commitOffsetsIfNecessary();
},
});
}
produce().catch(console.error);
consume().catch(console.error);7. Kafka Streams 流处理
7.1 词频统计示例
java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\s+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));
wordCounts.toStream().to("word-count-output",
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}7.2 流式 Join 示例
java
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");
// Stream-Table Join:订单关联客户信息
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderSerde, customerSerde));
enrichedOrders.to("enriched-orders");8. Kafka Connect 连接器
8.1 启动 Connect Worker
bash
./kafka/bin/connect-distributed.sh config/connect-distributed.propertiesconfig/connect-distributed.properties:
properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
rest.port=80838.2 JDBC Source Connector(MySQL → Kafka)
bash
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "kafka",
"connection.password": "kafka-pass",
"table.whitelist": "orders,customers",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db-"
}
}'8.3 File Sink Connector(Kafka → 文件)
bash
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"file": "/tmp/output.txt"
}
}'9. 事务与精确一次语义
9.1 事务生产者
java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}9.2 Read-Process-Write 模式(Exactly-Once)
java
// 消费 + 处理 + 生产,包装在同一个事务中
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
producer.beginTransaction();
try {
// 处理并发送到另一个 topic
String result = process(record.value());
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
// 将 offset 也提交到事务中(关键!)
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
});10. Schema Registry 集成
使用 Avro 序列化(需要独立部署 Schema Registry 或使用 Confluent Cloud):
xml
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>java
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// 使用 Avro Schema
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[" +
"{\"name\":\"id\",\"type\":\"int\"}," +
"{\"name\":\"name\",\"type\":\"string\"}]}"
);
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");
producer.send(new ProducerRecord<>("users", "user-1", user));11. REST Proxy 接口
使用 Confluent REST Proxy 通过 HTTP 操作 Kafka(无需 Kafka 客户端库):
bash
# 发送消息
curl -X POST http://localhost:8082/topics/my-topic \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-d '{"records":[{"key":"key1","value":{"id":1,"msg":"hello"}}]}'
# 创建消费者
curl -X POST http://localhost:8082/consumers/my-group \
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{"name":"my-consumer","format":"json","auto.offset.reset":"earliest"}'
# 订阅 Topic
curl -X POST http://localhost:8082/consumers/my-group/instances/my-consumer/subscription \
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{"topics":["my-topic"]}'
# 拉取消息
curl http://localhost:8082/consumers/my-group/instances/my-consumer/records \
-H "Accept: application/vnd.kafka.json.v2+json"12. 最佳实践
12.1 分区数设计
| 场景 | 建议分区数 | 说明 |
|---|---|---|
| 低吞吐(< 10 MB/s) | 3~6 | 过多分区反而增加开销 |
| 中等吞吐(10-100 MB/s) | 12~24 | 消费者并行度 = 分区数 |
| 高吞吐(> 100 MB/s) | 48~96 | 单 Broker 建议 < 4000 分区 |
12.2 消息键设计
java
// 相关消息使用相同 key → 保证同一分区顺序
// 订单相关消息:key = orderId
producer.send(new ProducerRecord<>("order-events", orderId, event));
// 需要广播(所有消费者都消费)→ key = null(随机分区)
producer.send(new ProducerRecord<>("config-updates", null, configChange));12.3 消费者组扩展
分区数=6,消费者=3 → 每个消费者处理2个分区(最优)
分区数=6,消费者=6 → 每个消费者处理1个分区(最大并行)
分区数=6,消费者=8 → 2个消费者空闲(浪费)12.4 错误处理模式
java
// 死信队列模式
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}12.5 性能基准
| 场景 | 参考 TPS |
|---|---|
| 单分区顺序写入(无副本) | 500K/s |
| 三副本集群(acks=all) | 200K/s |
| 压缩(lz4)写入 | 150K/s(消息小),400K/s(消息大) |
| 批量消费(batch.size=64K) | 300K/s |