外观
ADMQ RabbitMQ 开发集成手册
产品版本: ADMQ RabbitMQ 2.0 日期: 2026-06
目录
1. 连接参数速查
| 参数 | 默认值 | 说明 |
|---|---|---|
| Host | 服务器 IP | 建议使用 DNS 名称 |
| AMQP Port | 5672 | 标准 AMQP 端口 |
| AMQP TLS Port | 5671 | TLS 加密连接 |
| Management Port | 15672 | HTTP 管理 API |
| MQTT Port | 1883 | MQTT 协议 |
| STOMP Port | 61613 | STOMP 协议 |
| Virtual Host | / | 默认 vhost |
| Username | admq | 默认用户(生产请修改) |
| Password | apusic_123 | 默认密码(生产请修改) |
2. Java 集成
2.1 Maven 依赖
xml
<!-- AMQP 客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.21.0</version>
</dependency>2.2 建立连接
java
import com.rabbitmq.client.*;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admq");
factory.setPassword("apusic_123");
// 连接超时(毫秒)
factory.setConnectionTimeout(5000);
// 心跳间隔(秒,0 = 禁用)
factory.setRequestedHeartbeat(60);
// 自动恢复
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 使用 channel 进行操作
}2.3 Work Queue(任务队列)
java
// 生产者
public class TaskProducer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = buildFactory();
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
// 声明持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, ADMQ!";
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("Sent: " + message);
}
}
}
// 消费者
public class TaskConsumer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = buildFactory();
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 每次只处理 1 条消息(公平分发)
channel.basicQos(1);
DeliverCallback callback = (tag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
try {
processTask(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
// autoAck=false,手动确认
channel.basicConsume(QUEUE_NAME, false, callback, tag -> {});
}
}2.4 发布/订阅(Fanout)
java
// 发布者
channel.exchangeDeclare("logs", "fanout", true);
channel.basicPublish("logs", "", null, message.getBytes());
// 订阅者
channel.exchangeDeclare("logs", "fanout", true);
String queueName = channel.queueDeclare().getQueue(); // 临时队列
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, callback, tag -> {});2.5 主题路由(Topic)
java
// 绑定示例:接收所有 order.* 事件
channel.exchangeDeclare("events", "topic", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "events", "order.*");
channel.queueBind(queueName, "events", "payment.#");
// 发布到 order.created
channel.basicPublish("events", "order.created", null, payload.getBytes());2.6 RPC 模式
java
// RPC 客户端
public class RpcClient {
public String call(String message) throws Exception {
String corrId = UUID.randomUUID().toString();
String replyQueue = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueue)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueue, true, (tag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody()));
}
}, tag -> {});
return response.poll(10, TimeUnit.SECONDS);
}
}2.7 Dead Letter Queue(死信队列)
java
// 声明死信交换机
channel.exchangeDeclare("dlx", "direct", true);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("dead_letter_queue", "dlx", "dead");
// 业务队列绑定死信配置
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
args.put("x-dead-letter-routing-key", "dead");
args.put("x-message-ttl", 30000); // 消息 30 秒超时后进入死信
args.put("x-max-length", 1000); // 队列最多 1000 条,超出后进入死信
channel.queueDeclare("main_queue", true, false, false, args);3. Spring Boot 集成
3.1 Maven 依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.2 配置文件
yaml
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admq
password: apusic_123
connection-timeout: 5000
requested-heartbeat: 60
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次处理 1 条
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
template:
retry:
enabled: true
max-attempts: 33.3 配置类
java
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_ORDER = "queue.order";
public static final String EXCHANGE_ORDER = "exchange.order";
public static final String ROUTING_KEY = "order.created";
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_ORDER)
.withArgument("x-dead-letter-exchange", "exchange.dlx")
.withArgument("x-message-ttl", 60000)
.build();
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(EXCHANGE_ORDER, true, false);
}
@Bean
public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}3.4 生产者
java
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_ORDER,
RabbitMQConfig.ROUTING_KEY,
order
);
}
// 带延迟发送(需要 rabbitmq_delayed_message_exchange 插件)
public void scheduleOrder(Order order, long delayMs) {
rabbitTemplate.convertAndSend(
"exchange.delayed", "order.scheduled", order,
m -> {
m.getMessageProperties().setDelay((int) delayMs);
return m;
}
);
}
}3.5 消费者
java
@Component
public class OrderConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_ORDER)
public void processOrder(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
// 处理业务逻辑
doProcess(order);
channel.basicAck(tag, false);
} catch (BusinessException e) {
// 业务异常:不重试,直接发死信
channel.basicNack(tag, false, false);
} catch (Exception e) {
// 临时异常:重新入队
channel.basicNack(tag, false, true);
}
}
}4. Python 集成
4.1 安装依赖
bash
pip install pika4.2 基本生产者/消费者
python
import pika
import json
def get_connection():
credentials = pika.PlainCredentials('admq', 'apusic_123')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300
)
return pika.BlockingConnection(parameters)
# 生产者
def publish_message(queue_name: str, message: dict):
connection = get_connection()
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
content_type='application/json'
)
)
connection.close()
# 消费者
def consume_messages(queue_name: str):
connection = get_connection()
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
message = json.loads(body)
process(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()4.3 异步消费者(aio-pika)
python
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust(
"amqp://admq:apusic_123@localhost/"
)
async with connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("task_queue", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
await handle(message.body)
asyncio.run(main())5. Node.js 集成
5.1 安装依赖
bash
npm install amqplib5.2 基本示例
javascript
const amqp = require('amqplib');
const AMQP_URL = 'amqp://admq:apusic_123@localhost';
// 生产者
async function publish(queue, message) {
const conn = await amqp.connect(AMQP_URL);
const ch = await conn.createChannel();
await ch.assertQueue(queue, { durable: true });
ch.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true,
contentType: 'application/json'
});
await ch.close();
await conn.close();
}
// 消费者(带重连)
async function consume(queue, handler) {
let conn, ch;
const connect = async () => {
conn = await amqp.connect(AMQP_URL);
conn.on('error', (err) => {
console.error('AMQP connection error:', err);
setTimeout(connect, 5000);
});
ch = await conn.createChannel();
await ch.assertQueue(queue, { durable: true });
ch.prefetch(1);
ch.consume(queue, async (msg) => {
if (!msg) return;
try {
await handler(JSON.parse(msg.content.toString()));
ch.ack(msg);
} catch (err) {
ch.nack(msg, false, true);
}
});
};
await connect();
}6. Go 集成
6.1 安装依赖
bash
go get github.com/rabbitmq/amqp091-go6.2 连接与发布
go
package main
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
"context"
"time"
)
func connectRabbitMQ() (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial("amqp://admq:apusic_123@localhost:5672/")
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, err
}
return conn, ch, nil
}
func publish(ch *amqp.Channel, queue, message string) error {
_, err := ch.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return ch.PublishWithContext(ctx, "", queue, false, false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: []byte(message),
},
)
}
func consume(ch *amqp.Channel, queue string, handler func([]byte) error) error {
ch.Qos(1, 0, false)
msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
if err != nil {
return err
}
for d := range msgs {
if err := handler(d.Body); err != nil {
d.Nack(false, true)
} else {
d.Ack(false)
}
}
return nil
}7. MQTT 客户端接入
架构说明: RabbitMQ 通过
rabbitmq_mqtt插件支持 MQTT,本质是协议转换适配器,不是原生 MQTT Broker。MQTT 消息在内部转换为 AMQP 消息,因此 MQTT 与 AMQP 消费者天然互通,但保留消息(Retained Message)不支持,QoS 2 降级处理。适合"企业后端用 AMQP,偶尔有 IoT 设备用 MQTT"的混合场景;大规模 IoT 专用场景请使用 ADMQ MQTT。
7.1 MQTT 与 AMQP 的映射关系
| MQTT | AMQP |
|---|---|
| Topic | Routing Key(. 替换为 .) |
| Publish | Exchange amq.topic |
| Subscribe | Queue bound to amq.topic |
| QoS 0 | 非持久消息 |
| QoS 1 | 持久消息 + ACK |
7.2 Python MQTT 客户端
python
import paho.mqtt.client as mqtt
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set("admq", "apusic_123")
def on_connect(c, userdata, flags, rc, props):
print(f"Connected: {rc}")
c.subscribe("sensors/#", qos=1)
def on_message(c, userdata, msg):
print(f"{msg.topic}: {msg.payload.decode()}")
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()7.3 MQTT 消息从 AMQP 消费
MQTT 发布到 sensors/temperature/room1 后,AMQP 消费者可通过 amq.topic Exchange 接收:
java
// AMQP 消费者接收 MQTT 消息
channel.queueBind(queueName, "amq.topic", "sensors.temperature.room1");
// 注意:MQTT 的 / 分隔符在 AMQP 中对应 .8. STOMP 客户端接入
8.1 Python(stomp.py)
python
import stomp
class MyListener(stomp.ConnectionListener):
def on_message(self, frame):
print(f"Received: {frame.body}")
conn = stomp.Connection([('localhost', 61613)])
conn.set_listener('', MyListener())
conn.connect('admq', 'apusic_123', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='client')8.2 JavaScript(WebSocket STOMP)
javascript
import { Client } from '@stomp/stompjs';
const client = new Client({
brokerURL: 'ws://localhost:15674/ws',
connectHeaders: { login: 'admq', passcode: 'apusic_123' },
onConnect: () => {
client.subscribe('/queue/test', (msg) => {
console.log('Received:', msg.body);
msg.ack();
});
client.publish({ destination: '/topic/news', body: 'Hello STOMP' });
}
});
client.activate();9. REST API 集成
9.1 队列管理
bash
# 创建队列
curl -X PUT http://localhost:15672/api/queues/%2F/my-queue \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{"durable": true, "arguments": {"x-queue-type": "quorum"}}'
# 查看队列信息
curl http://localhost:15672/api/queues/%2F/my-queue -u admq:apusic_123
# 获取队列中的消息(不消费,只查看)
curl -X POST http://localhost:15672/api/queues/%2F/my-queue/get \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{"count": 1, "ackmode": "ack_requeue_true", "encoding": "auto"}'
# 清空队列
curl -X DELETE http://localhost:15672/api/queues/%2F/my-queue/contents \
-u admq:apusic_1239.2 发布消息(通过 API)
bash
curl -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{
"routing_key": "my-queue",
"payload": "{\"event\":\"order.created\",\"id\":12345}",
"payload_encoding": "string",
"properties": {
"content_type": "application/json",
"delivery_mode": 2
}
}'9.3 策略管理
bash
# 设置队列 HA 策略(集群)
curl -X PUT http://localhost:15672/api/policies/%2F/ha-all \
-u admq:apusic_123 \
-H "content-type: application/json" \
-d '{
"pattern": "^ha\\.",
"definition": {"ha-mode": "all"},
"apply-to": "queues",
"priority": 0
}'9.4 监控接口
bash
# 节点状态
curl http://localhost:15672/api/nodes -u admq:apusic_123
# 所有连接
curl http://localhost:15672/api/connections -u admq:apusic_123
# 所有 Channel
curl http://localhost:15672/api/channels -u admq:apusic_123
# Consumer 列表
curl http://localhost:15672/api/consumers -u admq:apusic_12310. 消息模式最佳实践
10.1 选择队列类型
| 场景 | 推荐队列类型 | 原因 |
|---|---|---|
| 关键业务(订单、支付) | Quorum | 强一致性,不丢消息 |
| 高吞吐日志/事件 | Stream | 可重放,低延迟 |
| 简单任务分发 | Classic | 成熟稳定,延迟低 |
| 会话/临时数据 | Classic(非持久) | 性能最优 |
10.2 避免消息积压
java
// 消费者端设置 prefetch,防止单消费者拿走太多消息导致其他消费者饥饿
channel.basicQos(10); // 每次最多 10 条未 ACK
// 生产者端配置 Publisher Confirms,确认消息真正到达 Broker
channel.confirmSelect();
channel.waitForConfirmsOrDie(5000); // 等待 5 秒确认10.3 消息幂等性
消费者应实现幂等处理,因为消息可能被重复投递(网络异常时 ACK 失败导致重发):
java
@RabbitListener(queues = "order_queue")
public void processOrder(Order order, @Header("message-id") String msgId) {
if (processedIds.contains(msgId)) {
return; // 幂等:已处理过,直接跳过
}
doProcess(order);
processedIds.add(msgId);
}10.4 连接池管理
java
// 不要每次操作都新建 Connection!
// Connection 对象复用,Channel 按需创建
public class RabbitMQPool {
private final Connection connection;
public Channel getChannel() throws IOException {
return connection.createChannel(); // Channel 轻量,可多创建
}
}10.5 消息大小建议
| 消息大小 | 建议 |
|---|---|
| < 1 MB | 直接在消息体传递 |
| 1 MB ~ 10 MB | 考虑压缩或拆分 |
| > 10 MB | 存储到对象存储,消息中传递 URL(Claim Check 模式) |
11. 生产环境建议
11.1 连接配置
java
factory.setAutomaticRecoveryEnabled(true); // 自动重连
factory.setNetworkRecoveryInterval(5000); // 重连间隔 5 秒
factory.setTopologyRecoveryEnabled(true); // 自动恢复 Exchange/Queue/Binding
factory.setRequestedHeartbeat(60); // 心跳 60 秒
factory.setConnectionTimeout(10000); // 连接超时 10 秒11.2 安全加固
bash
# 1. 删除 guest 账号(默认只能 localhost 访问,但最好删除)
bin/admq rabbitmq admin delete_user guest
# 2. 修改默认密码
bin/admq rabbitmq admin change_password admq "YourStrongPassword!"
# 3. 创建应用专用账号(最小权限)
bin/admq rabbitmq admin add_user app_user app_password
bin/admq rabbitmq admin set_permissions -p /prod app_user "app\\..*" "app\\..*" "app\\..*"11.3 监控告警阈值建议
| 指标 | 告警阈值 | 说明 |
|---|---|---|
| 队列深度 | > 10,000 | 消费者可能跟不上 |
| 未确认消息数 | > 消费者数 × prefetch × 2 | 消费者异常 |
| 内存使用 | > 80% 水位 | 即将触发流量控制 |
| 磁盘剩余 | < 2× 内存 | 消息可能无法持久化 |
| 连接数 | > 8,000 | 接近上限 10,000 |
11.4 集群推荐配置
ini
# 使用仲裁队列(替代经典镜像队列)
default_queue_type = quorum
# 网络分区处理
cluster_partition_handling = pause_minority
# 禁用不必要的协议(如果只用 AMQP)
# 在插件配置中禁用 STOMP/MQTT(按需保留)11.5 资源调优
ini
# 连接数上限(根据实际需求调整)
connection_max = 10000
channel_max = 2048
# 内存阈值(可用内存的 60%)
vm_memory_high_watermark.relative = 0.6
# TCP 缓冲区
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true