Skip to content

ADMQ Kafka 用户手册

版本:1.0


目录

  1. 产品概述
  2. 系统要求
  3. 安装与部署
  4. 配置说明
  5. 启动与停止
  6. Topic 管理
  7. 生产者与消费者
  8. 集群部署
  9. KRaft 模式(无 ZooKeeper)
  10. 监控与运维
  11. 安全配置
  12. 常见问题

1. 产品概述

ADMQ Kafka 是 Apusic 推出的企业级消息流平台,集成了 Apusic 统一授权体系、监控上报能力和简化的部署脚本,适合以下场景:

  • 大数据管道:海量日志采集、ETL 数据流
  • 事件驱动架构:微服务解耦、事件溯源
  • 流式计算:实时数据分析(结合 Kafka Streams)
  • 消息队列替代:高吞吐、低延迟的异步通信

核心能力

能力说明
高吞吐单节点百万级 TPS,磁盘顺序写入
消息回放支持按 offset 重消费,保留期可配置
分区并行Topic 多分区支持水平扩展
消费者组自动负载均衡,Rebalance 协议
精确一次Exactly-Once Semantics (EOS) 支持
流处理内置 Kafka Streams API
连接器Kafka Connect 框架(Source/Sink)

端口一览

端口协议用途
9092TCPBroker 客户端通信(PLAINTEXT)
9093TCPBroker SSL(如启用)
2181TCPZooKeeper 客户端连接
7071HTTPJMX Prometheus 指标导出
9308HTTPkafka_exporter Prometheus 指标

2. 系统要求

硬件建议

配置项最低推荐(生产)
CPU4 核16 核+
内存4 GB32 GB+
磁盘50 GB HDD1 TB+ SSD(XFS 格式)
网络1 Gbps10 Gbps

:systemd 服务配置 MemoryMax=6GCPUQuota=400%(4 核),生产环境建议适当调大。

软件要求

  • 操作系统:Linux x86_64 / ARM64(CentOS 7+, Ubuntu 18.04+, RHEL 7+)
  • Java:JDK 17+(产品内置,无需手动安装)
  • ZooKeeper:3.8.6(已内置,仅传统模式需要)

3. 安装与部署

3.1 目录结构

admq-kafka/
├── bin/                    # 启动脚本
│   ├── kafka/              # Kafka 原生脚本
│   │   ├── broker/         # Broker 启动脚本
│   │   └── ...
│   └── start.sh / stop.sh  # 统一启停脚本
├── config/                 # 配置文件
│   ├── kafka-broker.conf   # ADMQ 主配置
│   ├── server.properties   # Kafka 原生配置(自动生成)
│   ├── zookeeper.properties
│   └── jmx-exporter.yml    # JMX 指标配置
├── kafka/                  # Kafka 3.9.1 发行包
│   ├── bin/
│   ├── libs/
│   └── config/
├── zookeeper/              # ZooKeeper 3.8.6 发行包
├── exporter/               # kafka_exporter Go 二进制
│   ├── amd64/
│   └── arm64/
├── lib/                    # Apusic 附加 JAR
│   ├── admq-monitor-1.0.0-apusic.jar
│   ├── apusic-license-admq-2.0.jar
│   └── apusic-license-core-2.0.jar
├── license/                # 授权文件
└── logs/                   # 日志目录

3.2 快速安装

bash
# 1. 解压安装包
tar -xzf admq-kafka-1.0.0.tar.gz -C /opt/
cd /opt/admq-kafka

# 2. 放置授权文件
cp your-license.xml license/

# 3. 修改主配置
vi config/kafka-broker.conf

# 4. 启动
./bin/start.sh

3.3 授权文件

将 Apusic 提供的授权文件放置到 license/ 目录,支持以下格式:

  • apusic.lic(LOCAL 模式,XML 格式)
  • KBC 模式(USB Key,需硬件支持)
  • ACLS 模式(在线授权中心)

4. 配置说明

4.1 主配置文件 kafka-broker.conf

properties
# ===== 节点配置 =====
node_id=1                           # 节点 ID(集群中唯一)
node_name=kafka-broker-1            # 节点名称
cluster_name=admq-kafka-cluster     # 集群名称

# ===== 网络配置 =====
broker_host=0.0.0.0                 # 监听地址
broker_port=9092                    # 客户端端口

# ===== ZooKeeper 配置(传统模式)=====
zookeeper_host=localhost
zookeeper_port=2181
zookeeper_data_dir=/var/lib/admq-kafka/zookeeper

# ===== 数据存储 =====
log_dirs=/var/lib/admq-kafka/data   # 消息存储目录
log_retention_hours=168             # 消息保留时长(小时,默认 7 天)
log_retention_bytes=-1              # 消息保留大小(-1 不限)
log_segment_bytes=1073741824        # 单 Segment 大小(1 GB)

# ===== 性能调优 =====
num_io_threads=8                    # IO 线程数
num_network_threads=3               # 网络线程数
socket_send_buffer_bytes=102400
socket_receive_buffer_bytes=102400
socket_request_max_bytes=104857600  # 单消息最大 100 MB

# ===== 副本与分区 =====
default_replication_factor=1        # 默认副本数(生产建议 3)
num_partitions=1                    # 默认分区数

# ===== 监控上报 =====
monitor_role=kafka-broker
monitor_username=__sys__monitor
monitor_password=11111111
manager_addr=http://admq-manager:8080

# ===== JMX 指标 =====
jmx_port=7071
kafka_exporter_port=9308

4.2 JVM 调优

编辑 bin/kafka/broker/kafka-server-start.sh

bash
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=20 \
  -XX:InitiatingHeapOccupancyPercent=35 \
  -XX:+ExplicitGCInvokesConcurrent \
  -Djava.awt.headless=true"

4.3 操作系统优化

bash
# 增大文件描述符限制
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf

# 关闭 swap
swapoff -a

# 调整虚拟内存
echo "vm.swappiness=1" >> /etc/sysctl.conf
echo "vm.dirty_background_ratio=5" >> /etc/sysctl.conf
echo "vm.dirty_ratio=60" >> /etc/sysctl.conf
sysctl -p

5. 启动与停止

bash
# 启动全部组件(ZooKeeper + Kafka + Exporter)
./bin/start.sh

# 仅启动 Kafka Broker
./bin/start.sh kafka

# 停止所有组件
./bin/stop.sh

# 查看状态
./bin/status.sh

日志位置

组件日志路径
Kafka Brokerlogs/kafka/server.log
ZooKeeperlogs/zookeeper/zookeeper.log
kafka_exporterlogs/exporter/exporter.log
ADMQ Monitorlogs/monitor/monitor.log

6. Topic 管理

6.1 创建 Topic

bash
# 创建 Topic(3 分区,2 副本)
./kafka/bin/kafka-topics.sh \
  --create \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2

# 查看所有 Topic
./kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server localhost:9092

# 查看 Topic 详情
./kafka/bin/kafka-topics.sh \
  --describe \
  --bootstrap-server localhost:9092 \
  --topic my-topic

6.2 修改 Topic

bash
# 增加分区数(只能增加,不能减少)
./kafka/bin/kafka-topics.sh \
  --alter \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partitions 6

# 修改保留时间(毫秒)
./kafka/bin/kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --alter \
  --entity-type topics \
  --entity-name my-topic \
  --add-config retention.ms=86400000

6.3 删除 Topic

bash
./kafka/bin/kafka-topics.sh \
  --delete \
  --bootstrap-server localhost:9092 \
  --topic my-topic

7. 生产者与消费者

7.1 命令行生产

bash
./kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic
# 输入消息,每行一条,Ctrl+C 退出

7.2 命令行消费

bash
# 从最新消息开始
./kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic

# 从头开始(重放)
./kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --from-beginning

# 消费者组
./kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --group my-consumer-group

7.3 消费者组管理

bash
# 查看所有消费者组
./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --list

# 查看消费者组详情(lag 信息)
./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-consumer-group

# 重置 offset(从头开始)
./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --execute

8. 集群部署

8.1 三节点集群示例

节点 1 (kafka-broker.conf):

properties
node_id=1
broker_host=192.168.1.101
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.103

节点 2 (kafka-broker.conf):

properties
node_id=2
broker_host=192.168.1.102
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.103

节点 3 (kafka-broker.conf):

properties
node_id=3
broker_host=192.168.1.103
zookeeper_host=192.168.1.101,192.168.1.102,192.168.1.103

8.2 ZooKeeper 集群配置

config/zookeeper.properties

properties
dataDir=/var/lib/admq-kafka/zookeeper
clientPort=2181
maxClientCnxns=60
tickTime=2000
initLimit=10
syncLimit=5

# 集群成员(myid 需与 server.X 中的 X 一致)
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

每个节点创建 myid 文件:

bash
# 节点1
echo "1" > /var/lib/admq-kafka/zookeeper/myid
# 节点2
echo "2" > /var/lib/admq-kafka/zookeeper/myid
# 节点3
echo "3" > /var/lib/admq-kafka/zookeeper/myid

9. KRaft 模式(无 ZooKeeper)

KRaft 模式使用 Raft 协议替代 ZooKeeper,简化部署架构。

9.1 生成 Cluster UUID

bash
./kafka/bin/kafka-storage.sh random-uuid
# 输出示例:MkU3OEVBNTcwNTJENDM2Qg

9.2 配置 KRaft

config/kraft/server.properties

properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.1.101:9093,2@192.168.1.102:9093,3@192.168.1.103:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
log.dirs=/var/lib/admq-kafka/data

9.3 初始化并启动

bash
# 格式化存储(使用上面生成的 UUID)
./kafka/bin/kafka-storage.sh format \
  -t MkU3OEVBNTcwNTJENDM2Qg \
  -c config/kraft/server.properties

# 启动
./kafka/bin/kafka-server-start.sh config/kraft/server.properties

10. 监控与运维

10.1 Prometheus + Grafana

ADMQ Kafka 内置两个 Prometheus 指标端点:

端点地址说明
JMX Exporterhttp://host:7071/metricsBroker JVM + Kafka JMX 指标
kafka_exporterhttp://host:9308/metricsTopic/消费者组/Lag 指标

Prometheus 配置示例:

yaml
scrape_configs:
  - job_name: 'admq-kafka-jmx'
    static_configs:
      - targets: ['kafka-host:7071']
  - job_name: 'admq-kafka-exporter'
    static_configs:
      - targets: ['kafka-host:9308']

10.2 关键监控指标

指标含义告警阈值建议
kafka_consumer_lag消费者积压量> 10000
kafka_server_BrokerTopicMetrics_MessagesInPerSec消息写入速率根据容量规划
kafka_server_ReplicaManager_UnderReplicatedPartitions副本不足分区数> 0
kafka_controller_KafkaController_ActiveControllerCount活跃 Controller 数≠ 1 告警
jvm_memory_used_bytesJVM 内存使用> 80% 堆大小

10.3 ADMQ 监控上报

ADMQ Monitor 每 60 秒向管控台上报 Broker 统计信息,包括:

  • 消息写入/读取 TPS
  • Topic 数、分区数
  • 消费者组数量
  • 磁盘使用量

11. 安全配置

11.1 启用 SASL/PLAIN 认证

config/server.properties(追加):

properties
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS 配置
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
  org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-secret" \
  user_admin="admin-secret" \
  user_alice="alice-secret";

11.2 启用 TLS/SSL

properties
listeners=SSL://:9093
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required

11.3 ACL 授权

bash
# 允许用户 alice 向 topic 写入
./kafka/bin/kafka-acls.sh \
  --bootstrap-server localhost:9092 \
  --add \
  --allow-principal User:alice \
  --operation Write \
  --topic my-topic

# 允许消费者组读取
./kafka/bin/kafka-acls.sh \
  --bootstrap-server localhost:9092 \
  --add \
  --allow-principal User:alice \
  --operation Read \
  --topic my-topic \
  --group my-group

12. 常见问题

Q:启动后连接被拒绝? A:检查 advertised.listeners 配置是否为客户端可访问的 IP(非 0.0.0.0)。

Q:消费者 Lag 持续增大? A:增加消费者实例数量(不超过 Topic 分区数),或增加分区数和消费者并行度。

Q:消息丢失? A:生产者设置 acks=all,消费者设置 enable.auto.commit=false 手动提交 offset。

Q:Broker 间副本同步延迟? A:检查网络带宽,调整 replica.fetch.max.bytesreplica.lag.time.max.ms

Q:授权验证失败? A:确认 license/ 目录中授权文件存在,检查系统时间与授权服务器一致性(NTP)。

Q:ZooKeeper 连接超时? A:检查 ZooKeeper 节点状态,确认防火墙开放 2181、2888、3888 端口。

金蝶天燕(Apusic)企业级消息中间件套件