基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
特点:生态好,好学习、易理解,时效性强,支持很多不同语言的客户端,扩展性、可用性很好。
官方文档:https://www.rabbitmq.com
基本概念
AMQP 协议(https://www.rabbitmq.com/tutorials/amqp-concepts): 高级消息队列协议
生产者:发消息到交换机
消费者:从某个队列中取消息
交换机(Exchanage):把消息 转发 到对应的队列
队列(Queue):存储消息
路由(Routes):转发

安装 rabbitMQ 监控面板
1
| rabbitmq-plugins.bat enable rabbitmq_management
|
访问:http://localhost:15672
账号密码默认为:guest

快速入门
1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version> </dependency>
|
多消费者
场景:多个机器同时去接受并处理任务。一个生产者给一个队列发消息,多个消费者从这个队列取消息

队列持久化:
第二个参数(durable)设置为true,服务器重启后队列不会丢失:
1
| channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
消息持久化:
指定 MessageProperties.PERSISTENT_TEXT_PLAIN 参数,服务器重启后消息不会丢失:
1 2 3
| channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class MultiConsumer { private static final String TASK_QUEUE_NAME = "multi_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); for (int i = 0; i < 2; i++) { final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); int finalI = i+1; DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); try { System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class MultiProducer { private static final String TASK_QUEUE_NAME = "multi_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.nextLine(); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
控制单个消费者的处理任务积压数:
这个参数告诉RabbitMQ不要同时给一个消费者超过一个(在这个例子中是1)未确认的消息。
这意味着消费者在处理并确认一个消息之前,队列不会发送新的消息给这个消费者。
消息确认机制:
当消费者接收到消息后,要给一个反馈:
- ack:消费成功
- nack:消费失败
- reject:拒绝
自动确认(Auto Acknowledgment):当消费者从队列中获取到消息后,RabbitMQ会立即认为消费者已经成功处理了消息,并从队列中移除。这种方式可能会因为消费者在处理消息时发生异常而导致消息丢失。
建议将autoack(第二个参数)改为 false,去手动确认
1 2 3
| channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
|
指定确认某条消息:
1
| channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
第二个参数 multiple 批量确认: 是指是否要一次性确认所有的历史消息直到当前这条
指定拒绝某条消息:
1
| channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
|
第三个参数表示是否重新入队,可用于重试
交换机
一个生产者给 多个 队列发消息
作用:提供转发功能,把消息转发到不同的队列上
绑定:交换机和队列关联起来,也可以叫路由,算是一个算法或转发策略

交换机类别:fanout、direct、topic、headers
fanout 交换机:
扇出、广播
特点:消息会转发到所有绑定该交换机的队列上
场景:很适用于发布订阅的场景
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout-exchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout-exchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel(); channel1.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = "小雨的工作队列"; channel1.queueDeclare(queueName, true, false, false, null); channel1.queueBind(queueName, EXCHANGE_NAME, "");
String queueName2 = "小明的工作队列"; channel2.queueDeclare(queueName2, true, false, false, null); channel2.queueBind(queueName2, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [小雨] Received '" + message + "'"); };
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [小明] Received '" + message + "'"); }; channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { }); channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } }
|
Direct 交换机:
绑定:可以让交换机和队列进行关联,可指定交换机将指定的消息发送给指定的队列(可以理解为网线)
routingKey:路由键,控制消息要转发给哪个队列(可以理解为 IP 地址)
特点:消息会根据路由键转发到指定的队列
场景:特定的消息只交给特定的系统(程序)来处理

生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class DirectProducer { private static final String EXCHANGE_NAME = "direct-exchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String userInput = scanner.nextLine(); String[] strings = userInput.split(" "); if(strings.length<1){ continue; } String message = strings[0]; String routingKey = strings[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + " with routing: " + routingKey + "'"); } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class DirectConsumer { private static final String EXCHANGE_NAME = "direct-exchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName1 = "direct1-test"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueBind(queueName1, EXCHANGE_NAME, "direct1"); String queueName2 = "direct2-test"; channel.queueDeclare(queueName2, true, false, false, null); channel.queueBind(queueName2, EXCHANGE_NAME, "direct2"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback direct1DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [direct1] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; DeliverCallback direct2DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [direct2] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, direct1DeliverCallback, consumerTag -> { }); channel.basicConsume(queueName2, true, direct2DeliverCallback, consumerTag -> { }); } }
|
Topics 交换机
特点:消息会根据一个 模糊的路由键 转发到指定的队列
场景:特定的消息交给特定的一类系统(程序)来处理
绑定关系:可以模糊匹配多个绑定
- :匹配一个单词,例如:.banana ,那么 a.banana、b.banana 都能匹配
- #:匹配 0 个或多个单词,例如:a.#,那么 a.a、a.b、a.a.a 都能匹配

生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class TopicProducer { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String userInput = scanner.nextLine(); String[] strings = userInput.split(" "); if(strings.length<1){ continue; } String message = strings[0]; String routingKey = strings[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + " with routing: " + routingKey + "'"); } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class TopicConsumer {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName1 = "frontend-test"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueBind(queueName1, EXCHANGE_NAME, "#.前端.#");
String queueName2 = "backend-test"; channel.queueDeclare(queueName2, true, false, false, null); channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");
String queueName3 = "product-test"; channel.queueDeclare(queueName3, true, false, false, null); channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback direct1DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [direct1] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; DeliverCallback direct2DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [direct2] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; DeliverCallback direct3DeliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [direct3] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); };
channel.basicConsume(queueName1, true, direct1DeliverCallback, consumerTag -> { }); channel.basicConsume(queueName2, true, direct2DeliverCallback, consumerTag -> { }); channel.basicConsume(queueName3, true, direct3DeliverCallback, consumerTag -> { }); } }
|
核心特性
消息过期机制
消息确认机制
死信队列
面试考点
消息队列的概念、模型、应用场景
交换机的类别、路由绑定的关系
消息可靠性
- 消息确认机制
- 消息持久化
- 消息过期机制
- 死信队列
延迟队列(类似死信队列)
顺序消费、消费幂等性
可扩展性
- 集群
- 故障的恢复机制
- 镜像
运维监控警告
RabbitMQ 项目实战
怎么在项目中使用 RabbitMQ ?
选择客户端
- 使用官方的客户端
优点:兼容性好,换语言成本低,灵活
缺点:太灵活,要自己维护、管理,太麻烦
- 使用封装好的客户端,比如 Spring Boot RabbitMQ Starter
优点:简单易用,可直接配置使用
缺点:封装的太好,有门槛,需要学习。不够灵活,被框架限制
根据场景选择,没有绝对的优劣之分:类似于 jdbc 和 MyBatis萨拉
引入依赖
注意:Maven引用时 AMQP 的版本一定要和 Spring Boot 的版本一致
1 2 3 4 5 6
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.2</version> </dependency>
|
在 yml 中引入配置
1 2 3 4 5 6
| spring: rabbitmq: host: localhost port: 5672 password: guest username: guest
|
创建交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class MqInitMain {
public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String EXCHANGE_NAME = "code_exchange"; channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = "code_queue"; channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "my_routingKey"); } catch (Exception e){
} } }
|
生产者
1 2 3 4 5 6 7 8 9 10
| @Component public class MyMessageProducer { @Resource private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message){ rabbitTemplate.convertAndSend(exchange, routingKey, message); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12
| @Component @Slf4j public class MyMessageConsumer {
@SneakyThrows @RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { log.info("receiveMessage message = {}", message); channel.basicAck(deliveryTag, false); } }
|
单元测试
1 2 3 4 5 6 7 8 9 10 11
| @SpringBootTest class MyMessageProducerTest {
@Resource private MyMessageProducer myMessageProducer;
@Test void sendMessage() { myMessageProducer.sendMessage("code_exchange", "my_routingKey", "你好!"); } }
|