基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

特点:生态好,好学习、易理解,时效性强,支持很多不同语言的客户端,扩展性、可用性很好。

官方文档:https://www.rabbitmq.com

基本概念

AMQP 协议(https://www.rabbitmq.com/tutorials/amqp-concepts): 高级消息队列协议

生产者:发消息到交换机

消费者:从某个队列中取消息

交换机(Exchanage):把消息 转发 到对应的队列

队列(Queue):存储消息

路由(Routes):转发

1.png

安装 rabbitMQ 监控面板

1
rabbitmq-plugins.bat enable rabbitmq_management

访问:http://localhost:15672

账号密码默认为:guest

2.png

快速入门

1
2
3
4
5
6

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>

多消费者

场景:多个机器同时去接受并处理任务。一个生产者给一个队列发消息,多个消费者从这个队列取消息

3.png

队列持久化:

第二个参数(durable)设置为true,服务器重启后队列不会丢失:

1
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

消息持久化:

指定 MessageProperties.PERSISTENT_TEXT_PLAIN 参数,服务器重启后消息不会丢失:

1
2
3
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MultiConsumer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
//建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
for (int i = 0; i < 2; i++) {
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);
// 定义如何处理消息
int finalI = i+1;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理工作
System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 停20秒
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开启消费者监听
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MultiProducer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

控制单个消费者的处理任务积压数:

这个参数告诉RabbitMQ不要同时给一个消费者超过一个(在这个例子中是1)未确认的消息。

这意味着消费者在处理并确认一个消息之前,队列不会发送新的消息给这个消费者。

1
channel.basicQos(1);

消息确认机制:

当消费者接收到消息后,要给一个反馈:

  • ack:消费成功
  • nack:消费失败
  • reject:拒绝

自动确认(Auto Acknowledgment):当消费者从队列中获取到消息后,RabbitMQ会立即认为消费者已经成功处理了消息,并从队列中移除。这种方式可能会因为消费者在处理消息时发生异常而导致消息丢失。

建议将autoack(第二个参数)改为 false,去手动确认

1
2
3
// 开启消费者监听
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});

指定确认某条消息:

1
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

第二个参数 multiple 批量确认: 是指是否要一次性确认所有的历史消息直到当前这条

指定拒绝某条消息:

1
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);

第三个参数表示是否重新入队,可用于重试

交换机

一个生产者给 多个 队列发消息

作用:提供转发功能,把消息转发到不同的队列上

绑定:交换机和队列关联起来,也可以叫路由,算是一个算法或转发策略

4.png

交换机类别:fanout、direct、topic、headers

fanout 交换机:

扇出、广播

特点:消息会转发到所有绑定该交换机的队列上

场景:很适用于发布订阅的场景

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.nextLine();

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] argv) throws Exception {
// 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();
// 声明交换机
channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建队列
String queueName = "小雨的工作队列";
channel1.queueDeclare(queueName, true, false, false, null);
channel1.queueBind(queueName, EXCHANGE_NAME, "");

String queueName2 = "小明的工作队列";
channel2.queueDeclare(queueName2, true, false, false, null);
channel2.queueBind(queueName2, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小雨] Received '" + message + "'");
};

DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小明] Received '" + message + "'");
};
channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
}
}

Direct 交换机:

绑定:可以让交换机和队列进行关联,可指定交换机将指定的消息发送给指定的队列(可以理解为网线)

routingKey:路由键,控制消息要转发给哪个队列(可以理解为 IP 地址)

特点:消息会根据路由键转发到指定的队列

场景:特定的消息只交给特定的系统(程序)来处理

5.png

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String userInput = scanner.nextLine();
String[] strings = userInput.split(" ");
if(strings.length<1){
continue;
}
String message = strings[0];
String routingKey = strings[1];

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + " with routing: " + routingKey + "'");
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DirectConsumer {
private static final String EXCHANGE_NAME = "direct-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 创建队列-1
String queueName1 = "direct1-test";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueBind(queueName1, EXCHANGE_NAME, "direct1");
// 创建队列-2
String queueName2 = "direct2-test";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, EXCHANGE_NAME, "direct2");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback direct1DeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [direct1] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback direct2DeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [direct2] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName1, true, direct1DeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName2, true, direct2DeliverCallback, consumerTag -> {
});
}
}

Topics 交换机

特点:消息会根据一个 模糊的路由键 转发到指定的队列

场景:特定的消息交给特定的一类系统(程序)来处理

绑定关系:可以模糊匹配多个绑定

  • :匹配一个单词,例如:.banana ,那么 a.banana、b.banana 都能匹配
  • #:匹配 0 个或多个单词,例如:a.#,那么 a.a、a.b、a.a.a 都能匹配

6.png

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String userInput = scanner.nextLine();
String[] strings = userInput.split(" ");
if(strings.length<1){
continue;
}
String message = strings[0];
String routingKey = strings[1];

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + " with routing: " + routingKey + "'");
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class TopicConsumer {

private static final String EXCHANGE_NAME = "topic_exchange";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 创建队列-1
String queueName1 = "frontend-test";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueBind(queueName1, EXCHANGE_NAME, "#.前端.#");

// 创建队列-2
String queueName2 = "backend-test";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");

// 创建队列-3
String queueName3 = "product-test";
channel.queueDeclare(queueName3, true, false, false, null);
channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 处理从队列接收的消息
DeliverCallback direct1DeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [direct1] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback direct2DeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [direct2] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback direct3DeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [direct3] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};

// 消费队列的消息
channel.basicConsume(queueName1, true, direct1DeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName2, true, direct2DeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName3, true, direct3DeliverCallback, consumerTag -> {
});
}
}

核心特性

消息过期机制

消息确认机制

死信队列

面试考点

  1. 消息队列的概念、模型、应用场景

  2. 交换机的类别、路由绑定的关系

  3. 消息可靠性

    1. 消息确认机制
    2. 消息持久化
    3. 消息过期机制
    4. 死信队列
  4. 延迟队列(类似死信队列)

  5. 顺序消费、消费幂等性

  6. 可扩展性

    1. 集群
    2. 故障的恢复机制
    3. 镜像
  7. 运维监控警告

RabbitMQ 项目实战

怎么在项目中使用 RabbitMQ ?

选择客户端

  1. 使用官方的客户端

优点:兼容性好,换语言成本低,灵活

缺点:太灵活,要自己维护、管理,太麻烦

  1. 使用封装好的客户端,比如 Spring Boot RabbitMQ Starter

优点:简单易用,可直接配置使用

缺点:封装的太好,有门槛,需要学习。不够灵活,被框架限制

根据场景选择,没有绝对的优劣之分:类似于 jdbc 和 MyBatis萨拉

引入依赖

注意:Maven引用时 AMQP 的版本一定要和 Spring Boot 的版本一致

1
2
3
4
5
6

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.2</version>
</dependency>

在 yml 中引入配置

1
2
3
4
5
6
spring:
rabbitmq:
host: localhost
port: 5672
password: guest
username: guest

创建交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 创建交换机和队列--用于测试(启动程序前执行一次,用于创建队列、交换机)
*/
public class MqInitMain {

public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String EXCHANGE_NAME = "code_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

// 创建队列
String queueName = "code_queue";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "my_routingKey");
} catch (Exception e){

}
}
}

生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class MyMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;

public void sendMessage(String exchange, String routingKey, String message){
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class MyMessageConsumer {

// 指定程序监听的消息队列和确认机制
@SneakyThrows
@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
log.info("receiveMessage message = {}", message);
channel.basicAck(deliveryTag, false);
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
11
@SpringBootTest
class MyMessageProducerTest {

@Resource
private MyMessageProducer myMessageProducer;

@Test
void sendMessage() {
myMessageProducer.sendMessage("code_exchange", "my_routingKey", "你好!");
}
}