Skip to content

ADMQ RabbitMQ 开发集成手册

产品版本: ADMQ RabbitMQ 2.0 日期: 2026-06


目录

  1. 连接参数速查
  2. Java 集成
  3. Spring Boot 集成
  4. Python 集成
  5. Node.js 集成
  6. Go 集成
  7. MQTT 客户端接入
  8. STOMP 客户端接入
  9. REST API 集成
  10. 消息模式最佳实践
  11. 生产环境建议

1. 连接参数速查

参数默认值说明
Host服务器 IP建议使用 DNS 名称
AMQP Port5672标准 AMQP 端口
AMQP TLS Port5671TLS 加密连接
Management Port15672HTTP 管理 API
MQTT Port1883MQTT 协议
STOMP Port61613STOMP 协议
Virtual Host/默认 vhost
Usernameadmq默认用户(生产请修改)
Passwordapusic_123默认密码(生产请修改)

2. Java 集成

2.1 Maven 依赖

xml
<!-- AMQP 客户端 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.21.0</version>
</dependency>

2.2 建立连接

java
import com.rabbitmq.client.*;

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admq");
factory.setPassword("apusic_123");

// 连接超时(毫秒)
factory.setConnectionTimeout(5000);
// 心跳间隔(秒,0 = 禁用)
factory.setRequestedHeartbeat(60);
// 自动恢复
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    // 使用 channel 进行操作
}

2.3 Work Queue(任务队列)

java
// 生产者
public class TaskProducer {
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = buildFactory();
        try (Connection conn = factory.newConnection();
             Channel channel = conn.createChannel()) {

            // 声明持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            String message = "Hello, ADMQ!";
            channel.basicPublish("", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes(StandardCharsets.UTF_8));

            System.out.println("Sent: " + message);
        }
    }
}

// 消费者
public class TaskConsumer {
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = buildFactory();
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 每次只处理 1 条消息(公平分发)
        channel.basicQos(1);

        DeliverCallback callback = (tag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            try {
                processTask(message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (Exception e) {
                // 重新入队
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
            }
        };

        // autoAck=false,手动确认
        channel.basicConsume(QUEUE_NAME, false, callback, tag -> {});
    }
}

2.4 发布/订阅(Fanout)

java
// 发布者
channel.exchangeDeclare("logs", "fanout", true);
channel.basicPublish("logs", "", null, message.getBytes());

// 订阅者
channel.exchangeDeclare("logs", "fanout", true);
String queueName = channel.queueDeclare().getQueue(); // 临时队列
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, callback, tag -> {});

2.5 主题路由(Topic)

java
// 绑定示例:接收所有 order.* 事件
channel.exchangeDeclare("events", "topic", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "events", "order.*");
channel.queueBind(queueName, "events", "payment.#");

// 发布到 order.created
channel.basicPublish("events", "order.created", null, payload.getBytes());

2.6 RPC 模式

java
// RPC 客户端
public class RpcClient {
    public String call(String message) throws Exception {
        String corrId = UUID.randomUUID().toString();
        String replyQueue = channel.queueDeclare().getQueue();

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .correlationId(corrId)
            .replyTo(replyQueue)
            .build();

        channel.basicPublish("", "rpc_queue", props, message.getBytes());

        BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        channel.basicConsume(replyQueue, true, (tag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody()));
            }
        }, tag -> {});

        return response.poll(10, TimeUnit.SECONDS);
    }
}

2.7 Dead Letter Queue(死信队列)

java
// 声明死信交换机
channel.exchangeDeclare("dlx", "direct", true);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("dead_letter_queue", "dlx", "dead");

// 业务队列绑定死信配置
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
args.put("x-dead-letter-routing-key", "dead");
args.put("x-message-ttl", 30000);      // 消息 30 秒超时后进入死信
args.put("x-max-length", 1000);         // 队列最多 1000 条,超出后进入死信
channel.queueDeclare("main_queue", true, false, false, args);

3. Spring Boot 集成

3.1 Maven 依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 配置文件

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admq
    password: apusic_123
    connection-timeout: 5000
    requested-heartbeat: 60
    listener:
      simple:
        acknowledge-mode: manual       # 手动确认
        prefetch: 1                    # 每次处理 1 条
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000
    template:
      retry:
        enabled: true
        max-attempts: 3

3.3 配置类

java
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_ORDER = "queue.order";
    public static final String EXCHANGE_ORDER = "exchange.order";
    public static final String ROUTING_KEY = "order.created";

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(QUEUE_ORDER)
            .withArgument("x-dead-letter-exchange", "exchange.dlx")
            .withArgument("x-message-ttl", 60000)
            .build();
    }

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(EXCHANGE_ORDER, true, false);
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY);
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

3.4 生产者

java
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createOrder(Order order) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_ORDER,
            RabbitMQConfig.ROUTING_KEY,
            order
        );
    }

    // 带延迟发送(需要 rabbitmq_delayed_message_exchange 插件)
    public void scheduleOrder(Order order, long delayMs) {
        rabbitTemplate.convertAndSend(
            "exchange.delayed", "order.scheduled", order,
            m -> {
                m.getMessageProperties().setDelay((int) delayMs);
                return m;
            }
        );
    }
}

3.5 消费者

java
@Component
public class OrderConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_ORDER)
    public void processOrder(Order order, Channel channel,
                             @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            // 处理业务逻辑
            doProcess(order);
            channel.basicAck(tag, false);
        } catch (BusinessException e) {
            // 业务异常:不重试,直接发死信
            channel.basicNack(tag, false, false);
        } catch (Exception e) {
            // 临时异常:重新入队
            channel.basicNack(tag, false, true);
        }
    }
}

4. Python 集成

4.1 安装依赖

bash
pip install pika

4.2 基本生产者/消费者

python
import pika
import json

def get_connection():
    credentials = pika.PlainCredentials('admq', 'apusic_123')
    parameters = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials,
        heartbeat=60,
        blocked_connection_timeout=300
    )
    return pika.BlockingConnection(parameters)

# 生产者
def publish_message(queue_name: str, message: dict):
    connection = get_connection()
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=pika.DeliveryMode.Persistent,
            content_type='application/json'
        )
    )
    connection.close()

# 消费者
def consume_messages(queue_name: str):
    connection = get_connection()
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)

    def callback(ch, method, properties, body):
        try:
            message = json.loads(body)
            process(message)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()

4.3 异步消费者(aio-pika)

python
import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://admq:apusic_123@localhost/"
    )
    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("task_queue", durable=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    await handle(message.body)

asyncio.run(main())

5. Node.js 集成

5.1 安装依赖

bash
npm install amqplib

5.2 基本示例

javascript
const amqp = require('amqplib');

const AMQP_URL = 'amqp://admq:apusic_123@localhost';

// 生产者
async function publish(queue, message) {
    const conn = await amqp.connect(AMQP_URL);
    const ch = await conn.createChannel();
    await ch.assertQueue(queue, { durable: true });
    ch.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
        persistent: true,
        contentType: 'application/json'
    });
    await ch.close();
    await conn.close();
}

// 消费者(带重连)
async function consume(queue, handler) {
    let conn, ch;
    const connect = async () => {
        conn = await amqp.connect(AMQP_URL);
        conn.on('error', (err) => {
            console.error('AMQP connection error:', err);
            setTimeout(connect, 5000);
        });
        ch = await conn.createChannel();
        await ch.assertQueue(queue, { durable: true });
        ch.prefetch(1);
        ch.consume(queue, async (msg) => {
            if (!msg) return;
            try {
                await handler(JSON.parse(msg.content.toString()));
                ch.ack(msg);
            } catch (err) {
                ch.nack(msg, false, true);
            }
        });
    };
    await connect();
}

6. Go 集成

6.1 安装依赖

bash
go get github.com/rabbitmq/amqp091-go

6.2 连接与发布

go
package main

import (
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "context"
    "time"
)

func connectRabbitMQ() (*amqp.Connection, *amqp.Channel, error) {
    conn, err := amqp.Dial("amqp://admq:apusic_123@localhost:5672/")
    if err != nil {
        return nil, nil, err
    }
    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, nil, err
    }
    return conn, ch, nil
}

func publish(ch *amqp.Channel, queue, message string) error {
    _, err := ch.QueueDeclare(queue, true, false, false, false, nil)
    if err != nil {
        return err
    }
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    return ch.PublishWithContext(ctx, "", queue, false, false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "application/json",
            Body:         []byte(message),
        },
    )
}

func consume(ch *amqp.Channel, queue string, handler func([]byte) error) error {
    ch.Qos(1, 0, false)
    msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
    if err != nil {
        return err
    }
    for d := range msgs {
        if err := handler(d.Body); err != nil {
            d.Nack(false, true)
        } else {
            d.Ack(false)
        }
    }
    return nil
}

7. MQTT 客户端接入

架构说明: RabbitMQ 通过 rabbitmq_mqtt 插件支持 MQTT,本质是协议转换适配器,不是原生 MQTT Broker。MQTT 消息在内部转换为 AMQP 消息,因此 MQTT 与 AMQP 消费者天然互通,但保留消息(Retained Message)不支持,QoS 2 降级处理。适合"企业后端用 AMQP,偶尔有 IoT 设备用 MQTT"的混合场景;大规模 IoT 专用场景请使用 ADMQ MQTT。

7.1 MQTT 与 AMQP 的映射关系

MQTTAMQP
TopicRouting Key(. 替换为 .
PublishExchange amq.topic
SubscribeQueue bound to amq.topic
QoS 0非持久消息
QoS 1持久消息 + ACK

7.2 Python MQTT 客户端

python
import paho.mqtt.client as mqtt

client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set("admq", "apusic_123")

def on_connect(c, userdata, flags, rc, props):
    print(f"Connected: {rc}")
    c.subscribe("sensors/#", qos=1)

def on_message(c, userdata, msg):
    print(f"{msg.topic}: {msg.payload.decode()}")

client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()

7.3 MQTT 消息从 AMQP 消费

MQTT 发布到 sensors/temperature/room1 后,AMQP 消费者可通过 amq.topic Exchange 接收:

java
// AMQP 消费者接收 MQTT 消息
channel.queueBind(queueName, "amq.topic", "sensors.temperature.room1");
// 注意:MQTT 的 / 分隔符在 AMQP 中对应 .

8. STOMP 客户端接入

8.1 Python(stomp.py)

python
import stomp

class MyListener(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"Received: {frame.body}")

conn = stomp.Connection([('localhost', 61613)])
conn.set_listener('', MyListener())
conn.connect('admq', 'apusic_123', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='client')

8.2 JavaScript(WebSocket STOMP)

javascript
import { Client } from '@stomp/stompjs';

const client = new Client({
    brokerURL: 'ws://localhost:15674/ws',
    connectHeaders: { login: 'admq', passcode: 'apusic_123' },
    onConnect: () => {
        client.subscribe('/queue/test', (msg) => {
            console.log('Received:', msg.body);
            msg.ack();
        });
        client.publish({ destination: '/topic/news', body: 'Hello STOMP' });
    }
});
client.activate();

9. REST API 集成

9.1 队列管理

bash
# 创建队列
curl -X PUT http://localhost:15672/api/queues/%2F/my-queue \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{"durable": true, "arguments": {"x-queue-type": "quorum"}}'

# 查看队列信息
curl http://localhost:15672/api/queues/%2F/my-queue -u admq:apusic_123

# 获取队列中的消息(不消费,只查看)
curl -X POST http://localhost:15672/api/queues/%2F/my-queue/get \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{"count": 1, "ackmode": "ack_requeue_true", "encoding": "auto"}'

# 清空队列
curl -X DELETE http://localhost:15672/api/queues/%2F/my-queue/contents \
  -u admq:apusic_123

9.2 发布消息(通过 API)

bash
curl -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{
    "routing_key": "my-queue",
    "payload": "{\"event\":\"order.created\",\"id\":12345}",
    "payload_encoding": "string",
    "properties": {
      "content_type": "application/json",
      "delivery_mode": 2
    }
  }'

9.3 策略管理

bash
# 设置队列 HA 策略(集群)
curl -X PUT http://localhost:15672/api/policies/%2F/ha-all \
  -u admq:apusic_123 \
  -H "content-type: application/json" \
  -d '{
    "pattern": "^ha\\.",
    "definition": {"ha-mode": "all"},
    "apply-to": "queues",
    "priority": 0
  }'

9.4 监控接口

bash
# 节点状态
curl http://localhost:15672/api/nodes -u admq:apusic_123

# 所有连接
curl http://localhost:15672/api/connections -u admq:apusic_123

# 所有 Channel
curl http://localhost:15672/api/channels -u admq:apusic_123

# Consumer 列表
curl http://localhost:15672/api/consumers -u admq:apusic_123

10. 消息模式最佳实践

10.1 选择队列类型

场景推荐队列类型原因
关键业务(订单、支付)Quorum强一致性,不丢消息
高吞吐日志/事件Stream可重放,低延迟
简单任务分发Classic成熟稳定,延迟低
会话/临时数据Classic(非持久)性能最优

10.2 避免消息积压

java
// 消费者端设置 prefetch,防止单消费者拿走太多消息导致其他消费者饥饿
channel.basicQos(10); // 每次最多 10 条未 ACK

// 生产者端配置 Publisher Confirms,确认消息真正到达 Broker
channel.confirmSelect();
channel.waitForConfirmsOrDie(5000); // 等待 5 秒确认

10.3 消息幂等性

消费者应实现幂等处理,因为消息可能被重复投递(网络异常时 ACK 失败导致重发):

java
@RabbitListener(queues = "order_queue")
public void processOrder(Order order, @Header("message-id") String msgId) {
    if (processedIds.contains(msgId)) {
        return; // 幂等:已处理过,直接跳过
    }
    doProcess(order);
    processedIds.add(msgId);
}

10.4 连接池管理

java
// 不要每次操作都新建 Connection!
// Connection 对象复用,Channel 按需创建
public class RabbitMQPool {
    private final Connection connection;

    public Channel getChannel() throws IOException {
        return connection.createChannel(); // Channel 轻量,可多创建
    }
}

10.5 消息大小建议

消息大小建议
< 1 MB直接在消息体传递
1 MB ~ 10 MB考虑压缩或拆分
> 10 MB存储到对象存储,消息中传递 URL(Claim Check 模式)

11. 生产环境建议

11.1 连接配置

java
factory.setAutomaticRecoveryEnabled(true);      // 自动重连
factory.setNetworkRecoveryInterval(5000);        // 重连间隔 5 秒
factory.setTopologyRecoveryEnabled(true);        // 自动恢复 Exchange/Queue/Binding
factory.setRequestedHeartbeat(60);               // 心跳 60 秒
factory.setConnectionTimeout(10000);             // 连接超时 10 秒

11.2 安全加固

bash
# 1. 删除 guest 账号(默认只能 localhost 访问,但最好删除)
bin/admq rabbitmq admin delete_user guest

# 2. 修改默认密码
bin/admq rabbitmq admin change_password admq "YourStrongPassword!"

# 3. 创建应用专用账号(最小权限)
bin/admq rabbitmq admin add_user app_user app_password
bin/admq rabbitmq admin set_permissions -p /prod app_user "app\\..*" "app\\..*" "app\\..*"

11.3 监控告警阈值建议

指标告警阈值说明
队列深度> 10,000消费者可能跟不上
未确认消息数> 消费者数 × prefetch × 2消费者异常
内存使用> 80% 水位即将触发流量控制
磁盘剩余< 2× 内存消息可能无法持久化
连接数> 8,000接近上限 10,000

11.4 集群推荐配置

ini
# 使用仲裁队列(替代经典镜像队列)
default_queue_type = quorum

# 网络分区处理
cluster_partition_handling = pause_minority

# 禁用不必要的协议(如果只用 AMQP)
# 在插件配置中禁用 STOMP/MQTT(按需保留)

11.5 资源调优

ini
# 连接数上限(根据实际需求调整)
connection_max = 10000
channel_max = 2048

# 内存阈值(可用内存的 60%)
vm_memory_high_watermark.relative = 0.6

# TCP 缓冲区
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true

金蝶天燕(Apusic)企业级消息中间件套件