外观
ADMQ Kafka 用户手册
版本:1.0
目录
1. 产品概述
ADMQ Kafka 是 Apusic 推出的企业级消息流平台,集成了 Apusic 统一授权体系、监控上报能力和简化的部署脚本,适合以下场景:
- 大数据管道:海量日志采集、ETL 数据流
- 事件驱动架构:微服务解耦、事件溯源
- 流式计算:实时数据分析(结合 Kafka Streams)
- 消息队列替代:高吞吐、低延迟的异步通信
核心能力
| 能力 | 说明 |
|---|---|
| 高吞吐 | 单节点百万级 TPS,磁盘顺序写入 |
| 消息回放 | 支持按 offset 重消费,保留期可配置 |
| 分区并行 | Topic 多分区支持水平扩展 |
| 消费者组 | 自动负载均衡,Rebalance 协议 |
| 精确一次 | Exactly-Once Semantics (EOS) 支持 |
| 流处理 | 内置 Kafka Streams API |
| 连接器 | Kafka Connect 框架(Source/Sink) |
端口一览
| 端口 | 协议 | 用途 |
|---|---|---|
| 9092 | TCP | Broker 客户端通信(PLAINTEXT) |
| 9093 | TCP | Broker SSL(如启用) |
| 2181 | TCP | ZooKeeper 客户端连接 |
| 7071 | HTTP | JMX Prometheus 指标导出 |
| 9308 | HTTP | kafka_exporter Prometheus 指标 |
2. 系统要求
硬件建议
| 配置项 | 最低 | 推荐(生产) |
|---|---|---|
| CPU | 4 核 | 16 核+ |
| 内存 | 4 GB | 32 GB+ |
| 磁盘 | 50 GB HDD | 1 TB+ SSD(XFS 格式) |
| 网络 | 1 Gbps | 10 Gbps |
注:systemd 服务配置
MemoryMax=6G,CPUQuota=400%(4 核),生产环境建议适当调大。
软件要求
- 操作系统:Linux x86_64 / ARM64(CentOS 7+, Ubuntu 18.04+, RHEL 7+)
- Java:JDK 17+(产品内置,无需手动安装)
- ZooKeeper:3.8.6(已内置,仅传统模式需要)
3. 安装与部署
3.1 目录结构
admq-kafka/
├── bin/ # 启动脚本
│ ├── kafka/ # Kafka 原生脚本
│ │ ├── broker/ # Broker 启动脚本
│ │ └── ...
│ └── start.sh / stop.sh # 统一启停脚本
├── config/ # 配置文件
│ ├── kafka-broker.conf # ADMQ 主配置
│ ├── server.properties # Kafka 原生配置(自动生成)
│ ├── zookeeper.properties
│ └── jmx-exporter.yml # JMX 指标配置
├── kafka/ # Kafka 3.9.1 发行包
│ ├── bin/
│ ├── libs/
│ └── config/
├── zookeeper/ # ZooKeeper 3.8.6 发行包
├── exporter/ # kafka_exporter Go 二进制
│ ├── amd64/
│ └── arm64/
├── lib/ # Apusic 附加 JAR
│ ├── admq-monitor-1.0.0-apusic.jar
│ ├── apusic-license-admq-2.0.jar
│ └── apusic-license-core-2.0.jar
├── license/ # 授权文件
└── logs/ # 日志目录3.2 快速安装
bash
# 1. 解压安装包
tar -xzf admq-kafka-1.0.0.tar.gz -C /opt/
cd /opt/admq-kafka
# 2. 放置授权文件
cp your-license.xml license/
# 3. 修改主配置
vi config/kafka-broker.conf
# 4. 启动
./bin/start.sh3.3 授权文件
将 Apusic 提供的授权文件放置到 license/ 目录,支持以下格式:
apusic.lic(LOCAL 模式,XML 格式)- KBC 模式(USB Key,需硬件支持)
- ACLS 模式(在线授权中心)
4. 配置说明
4.1 主配置文件 kafka-broker.conf
properties
# ===== 节点配置 =====
node_id=1 # 节点 ID(集群中唯一)
node_name=kafka-broker-1 # 节点名称
cluster_name=admq-kafka-cluster # 集群名称
# ===== 网络配置 =====
broker_host=0.0.0.0 # 监听地址
broker_port=9092 # 客户端端口
# ===== ZooKeeper 配置(传统模式)=====
zookeeper_host=localhost
zookeeper_port=2181
zookeeper_data_dir=/var/lib/admq-kafka/zookeeper
# ===== 数据存储 =====
log_dirs=/var/lib/admq-kafka/data # 消息存储目录
log_retention_hours=168 # 消息保留时长(小时,默认 7 天)
log_retention_bytes=-1 # 消息保留大小(-1 不限)
log_segment_bytes=1073741824 # 单 Segment 大小(1 GB)
# ===== 性能调优 =====
num_io_threads=8 # IO 线程数
num_network_threads=3 # 网络线程数
socket_send_buffer_bytes=102400
socket_receive_buffer_bytes=102400
socket_request_max_bytes=104857600 # 单消息最大 100 MB
# ===== 副本与分区 =====
default_replication_factor=1 # 默认副本数(生产建议 3)
num_partitions=1 # 默认分区数
# ===== 监控上报 =====
monitor_role=kafka-broker
monitor_username=__sys__monitor
monitor_password=11111111
manager_addr=http://admq-manager:8080
# ===== JMX 指标 =====
jmx_port=7071
kafka_exporter_port=93084.2 JVM 调优
编辑 bin/kafka/broker/kafka-server-start.sh:
bash
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true"4.3 操作系统优化
bash
# 增大文件描述符限制
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf
# 关闭 swap
swapoff -a
# 调整虚拟内存
echo "vm.swappiness=1" >> /etc/sysctl.conf
echo "vm.dirty_background_ratio=5" >> /etc/sysctl.conf
echo "vm.dirty_ratio=60" >> /etc/sysctl.conf
sysctl -p5. 启动与停止
bash
# 启动全部组件(ZooKeeper + Kafka + Exporter)
./bin/start.sh
# 仅启动 Kafka Broker
./bin/start.sh kafka
# 停止所有组件
./bin/stop.sh
# 查看状态
./bin/status.sh日志位置
| 组件 | 日志路径 |
|---|---|
| Kafka Broker | logs/kafka/server.log |
| ZooKeeper | logs/zookeeper/zookeeper.log |
| kafka_exporter | logs/exporter/exporter.log |
| ADMQ Monitor | logs/monitor/monitor.log |
6. Topic 管理
6.1 创建 Topic
bash
# 创建 Topic(3 分区,2 副本)
./kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# 查看所有 Topic
./kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092
# 查看 Topic 详情
./kafka/bin/kafka-topics.sh \
--describe \
--bootstrap-server localhost:9092 \
--topic my-topic6.2 修改 Topic
bash
# 增加分区数(只能增加,不能减少)
./kafka/bin/kafka-topics.sh \
--alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6
# 修改保留时间(毫秒)
./kafka/bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=864000006.3 删除 Topic
bash
./kafka/bin/kafka-topics.sh \
--delete \
--bootstrap-server localhost:9092 \
--topic my-topic7. 生产者与消费者
7.1 命令行生产
bash
./kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
# 输入消息,每行一条,Ctrl+C 退出7.2 命令行消费
bash
# 从最新消息开始
./kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
# 从头开始(重放)
./kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning
# 消费者组
./kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--group my-consumer-group7.3 消费者组管理
bash
# 查看所有消费者组
./kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# 查看消费者组详情(lag 信息)
./kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-consumer-group
# 重置 offset(从头开始)
./kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute8. 集群部署
8.1 三节点集群示例
节点 1 (kafka-broker.conf):
properties
node_id=1
broker_host=192.168.1.101
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.103节点 2 (kafka-broker.conf):
properties
node_id=2
broker_host=192.168.1.102
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.103节点 3 (kafka-broker.conf):
properties
node_id=3
broker_host=192.168.1.103
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.1038.2 ZooKeeper 集群配置
config/zookeeper.properties:
properties
dataDir=/var/lib/admq-kafka/zookeeper
clientPort=2181
maxClientCnxns=60
tickTime=2000
initLimit=10
syncLimit=5
# 集群成员(myid 需与 server.X 中的 X 一致)
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888每个节点创建 myid 文件:
bash
# 节点1
echo "1" > /var/lib/admq-kafka/zookeeper/myid
# 节点2
echo "2" > /var/lib/admq-kafka/zookeeper/myid
# 节点3
echo "3" > /var/lib/admq-kafka/zookeeper/myid9. KRaft 模式(无 ZooKeeper)
KRaft 模式使用 Raft 协议替代 ZooKeeper,简化部署架构。
9.1 生成 Cluster UUID
bash
./kafka/bin/kafka-storage.sh random-uuid
# 输出示例:MkU3OEVBNTcwNTJENDM2Qg9.2 配置 KRaft
config/kraft/server.properties:
properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
log.dirs=/var/lib/admq-kafka/data9.3 初始化并启动
bash
# 格式化存储(使用上面生成的 UUID)
./kafka/bin/kafka-storage.sh format \
-t MkU3OEVBNTcwNTJENDM2Qg \
-c config/kraft/server.properties
# 启动
./kafka/bin/kafka-server-start.sh config/kraft/server.properties10. 监控与运维
10.1 Prometheus + Grafana
ADMQ Kafka 内置两个 Prometheus 指标端点:
| 端点 | 地址 | 说明 |
|---|---|---|
| JMX Exporter | http://host:7071/metrics | Broker JVM + Kafka JMX 指标 |
| kafka_exporter | http://host:9308/metrics | Topic/消费者组/Lag 指标 |
Prometheus 配置示例:
yaml
scrape_configs:
- job_name: 'admq-kafka-jmx'
static_configs:
- targets: ['kafka-host:7071']
- job_name: 'admq-kafka-exporter'
static_configs:
- targets: ['kafka-host:9308']10.2 关键监控指标
| 指标 | 含义 | 告警阈值建议 |
|---|---|---|
kafka_consumer_lag | 消费者积压量 | > 10000 |
kafka_server_BrokerTopicMetrics_MessagesInPerSec | 消息写入速率 | 根据容量规划 |
kafka_server_ReplicaManager_UnderReplicatedPartitions | 副本不足分区数 | > 0 |
kafka_controller_KafkaController_ActiveControllerCount | 活跃 Controller 数 | ≠ 1 告警 |
jvm_memory_used_bytes | JVM 内存使用 | > 80% 堆大小 |
10.3 ADMQ 监控上报
ADMQ Monitor 每 60 秒向管控台上报 Broker 统计信息,包括:
- 消息写入/读取 TPS
- Topic 数、分区数
- 消费者组数量
- 磁盘使用量
11. 安全配置
11.1 启用 SASL/PLAIN 认证
config/server.properties(追加):
properties
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS 配置
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";11.2 启用 TLS/SSL
properties
listeners=SSL://:9093
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required11.3 ACL 授权
bash
# 允许用户 alice 向 topic 写入
./kafka/bin/kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal User:alice \
--operation Write \
--topic my-topic
# 允许消费者组读取
./kafka/bin/kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal User:alice \
--operation Read \
--topic my-topic \
--group my-group12. 常见问题
Q:启动后连接被拒绝? A:检查 advertised.listeners 配置是否为客户端可访问的 IP(非 0.0.0.0)。
Q:消费者 Lag 持续增大? A:增加消费者实例数量(不超过 Topic 分区数),或增加分区数和消费者并行度。
Q:消息丢失? A:生产者设置 acks=all,消费者设置 enable.auto.commit=false 手动提交 offset。
Q:Broker 间副本同步延迟? A:检查网络带宽,调整 replica.fetch.max.bytes 和 replica.lag.time.max.ms。
Q:授权验证失败? A:确认 license/ 目录中授权文件存在,检查系统时间与授权服务器一致性(NTP)。
Q:ZooKeeper 连接超时? A:检查 ZooKeeper 节点状态,确认防火墙开放 2181、2888、3888 端口。