前言:RabbitMQ 是一个流行的开源消息队列软件,实现了高级消息队列协议(AMQP)的标准。主要用于在分布式系统中传递消息,可以在应用程序之间可靠地进行通信。

RabbitMQ 的核心概念包括:

  1. 生产者(Producer):生产者是向 RabbitMQ 发送消息的应用程序。

  2. 队列(Queue):队列是用于存储消息的缓冲区。生产者将消息发送到队列,消费者从队列中接收消息。

  3. 消费者(Consumer):消费者是从 RabbitMQ 接收消息的应用程序。

  4. 交换机(Exchange):交换机是用于将消息路由到一个或多个队列的组件。生产者将消息发送到交换机,交换机根据预定义的规则(称为路由键)将消息路由到一个或多个队列。

  5. 绑定(Binding):绑定是交换机和队列之间的关联,它定义了交换机如何将消息路由到队列。

安装

使用docker来安装RabbitMQ,使用的版本是3.8

软件包上传服务器

这块把软件包拖到home目录,后切换到安装包存放的目录

1
cd /home

加载镜像文件

1
docker load -i mq.tar

启动RabbitMQ

1
docker run -e RABBITMQ_DEFAULT_USER=gust -e RABBITMQ_DEFAULT_PASS=gust -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management

RabbitMQ控制台

在浏览器使用http://服务器ip:15672/ 打开RabbitMQ

image-20240601200248603

概览(Overview):通常指的是对消息队列系统的整体概述或总体情况的描述。

连接(Connections):连接是客户端应用程序与消息队列服务器之间的通信通道。它表示客户端与消息队列服务器之间建立的网络连接,用于发送和接收消息。

通道(Channels):通道是在连接内部创建的虚拟通信通道。通常,一个连接可以包含多个通道,每个通道都可以执行独立的操作,比如发送消息、接收消息、声明队列等。通道提供了多路复用的功能,使得客户端可以在单个连接上进行并发操作。

交换机(Exchanges):交换机是消息的分发器,用于将消息路由到一个或多个队列。当生产者发送消息时,消息首先发送到交换机,然后根据预定义的规则(比如路由键)将消息路由到一个或多个队列中。

队列(Queues):队列是消息的缓冲区,用于存储待处理的消息。消费者从队列中获取消息,并进行处理。队列具有先进先出(FIFO)的特性,保证消息按照顺序进行处理。

管理(Admin):管理指的是消息队列的管理功能,包括监控、配置、维护等操作。管理功能通常由管理员使用管理工具进行操作,用于监控队列的健康状态、性能指标等,并进行必要的管理和调整。

Java项目集成RabbitMQ

maven配置

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.58.101
port: 5672
password: gust
username: gust
virtual-host: /

消息

基础认知:mq发送的消息只能通过交换机来通知给队列,交换机只负责转发消息,并不存储消息,如果没有队列和交换机绑定,将丢失消息

Fanout 广播模式

Fanout 发出的消息只要绑定了对应交换机的队列都可以拿到

生产者
1
2
3
4
5
6
7
8
9
/**
* Fanout 广播模式
* 广播模式发送的消息 绑定的队列都可以拿到消息
*/
@Test
void fanoutMessage() {
String message = "这是广播消息!";
rabbitTemplate.convertAndSend("inform.fanout", null, message);
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.listen.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqFanoutListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "inform1.queue", durable = "true"),
exchange = @Exchange(name = "inform.fanout", type = ExchangeTypes.FANOUT)
))
public void fanoutOneMessage(String msg) {
log.info("广播模式 消费者1:{}", msg);
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "inform2.queue", durable = "true"),
exchange = @Exchange(name = "inform.fanout", type = ExchangeTypes.FANOUT)
))
public void fanoutTwoMessage(String msg) {
log.info("广播模式 消费者2:{}", msg);
}
}

效果展示

Direct 路由模式

通过RoutingKey发送给订阅了消息的队列,这块只发送给路由为red的消费者

生产者
1
2
3
4
5
6
7
8
/**
* Direct 路由
*/
@Test
void directMessage() {
String message = "这是路由消息!";
rabbitTemplate.convertAndSend("subscribe.direct", "red", message);
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqDirectListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT)),
key = "red"
))
public void directOneMessage(String msg) {
log.info("路由模式 消费者1:{}", msg);
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer2.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT)),
key = "blue"
))
public void directTwoMessage(String msg) {
log.info("路由模式 消费者2:{}", msg);
}
}
效果展示

Topic 主题模式

与Direct 类似,区别在于这个可以使用通配符

#匹配一个或者多个词,*匹配一个词

生产者
1
2
3
4
5
6
7
8
/**
* Topic 动态路由
*/
@Test
void topicMessage() {
String message = "这是主题消息!";
rabbitTemplate.convertAndSend("dynamic.topic", "info.important.notice", message);
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqTopicListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.topic.queue", durable = "true"),
exchange = @Exchange(name = "dynamic.topic", type = ExchangeTypes.TOPIC),
key = "#.notice"
))
public void topicOneMessage(String msg) {
log.info("主题模式 消费者1:{}", msg);
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer2.topic.queue", durable = "true"),
exchange = @Exchange(name = "dynamic.topic", type = ExchangeTypes.TOPIC),
key = "*.notice"
))
public void topicTwoMessage(String msg) {
log.info("主题模式 消费者2:{}", msg);
}

}
效果展示

image-20240602095848576

进阶

延迟消息

通过死信队列实现

在mq中,当一个消息的TTL过期后,并不会马上投递出去,只有在队首的时候才会被处理,不推荐使用这种方式来实现延迟消息

通过延时插件实现
安装插件

实现延时消息需要安装rabbitmq-delayed-message-exchange插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange下载对应版本的插件

使用docker查看mq的挂载地址,将插件上传进去

1
docker volume inspect mq-plugins

docker加载延时插件

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 延时消息
*/
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "这是延时消息!";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class MQListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
}
效果展示

image-20240602110020320

指定消费者预取值

正常情况下,消息是平均分配给消费者的,但每个消费者的处理速度不一样,有的消费者都处理完了在闲置,有的还在处理,这块指定为1,这样消费者会当这条数据处理完成后,再接着处理下一条,处理快的消费者,多处理一些,避免消息积压

yml配置
1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 192.168.58.101
port: 5672
password: gust
username: gust
virtual-host: /
listener:
simple:
prefetch: 1 #指定消费者的预取值(prefetch),即每次从 RabbitMQ 服务器获取的消息数量。这里设置为 1,表示每次获取一条消息。
生产者

消费者平均分配消息

1
2
3
4
5
6
7
8
9
10
/**
* Direct 路由
*/
@Test
void fanoutMessageOne() {
for (int i = 1; i < 50; i++) {
String message = "这是路由消息!";
rabbitTemplate.convertAndSend("subscribe.direct", "red", message);
}
}
消费者

一个间隔100毫秒,一个间隔500毫秒,第一个消费者处理完了在闲置,第二个消费者处理的慢,但消息是平均的,如果一直进来消息,消费者2的消息就会积压起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqDirectListener {
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "red"
))
public void directOneMessage(String msg) {
Thread.sleep(100);
log.info("路由模式 消费者1:{}", msg);
}

@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "red"
))
public void directOneMessage1(String msg) {
Thread.sleep(500);
log.info("路由模式 消费者2:{}", msg);
}
}
不配置消费者预取值
效果展示

image-20240602105036243

配置消费者预取值
效果展示

消息转换器

默认的消息默认是通过JDK来序列化,不方便查看,配置消息转换器后就方便了些

maven
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;

@Configurable
@ConditionalOnClass(value = {MessageConverter.class})
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return jackson2JsonMessageConverter;
}
}
生产者
1
2
3
4
5
6
7
8
9
10
/**
* Direct 路由
*/
@Test
void directMessage() {
HashMap map = new HashMap();
map.put("name", "胡图图");
map.put("id", "1");
rabbitTemplate.convertAndSend("subscribe.direct", "red", map);
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
* @Auther: chang
* @Date: 2024/6/1 - 06 - 01 - 20:33
* @Description: com.listen.rabbitmq.config
* @version: 1.0
*/
@Component
@Slf4j
public class MqDirectListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "red"
))
public void directOneMessage(HashMap msg) {
log.info("路由模式 消费者1:{}", msg);
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer2.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "blue"
))
public void directTwoMessage(HashMap msg) {
log.info("路由模式 消费者2:{}", msg);
}
}
效果展示
处理前

image-20240602145314558

处理后

消费者进阶

消费失败重试机制

在消费者处理消失时,可能会因为一些原因处理失败,在配置以下内容后,消费者处理失败后会重试三次,如果三次还是失败就会丢弃消息

yml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
host: 192.168.58.101
port: 5672
password: gust
username: gust
virtual-host: /
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class MqDirectListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer1.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "red"
))
public void directOneMessage(String msg) {
log.info("路由模式 消费者1:{}", msg);
throw new RuntimeException("处理订单消息时发生错误");
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "consumer2.direct.queue", durable = "true"),
exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT),
key = "blue"
))
public void directTwoMessage(String msg) {
log.info("路由模式 消费者2:{}", msg);
throw new RuntimeException("处理订单消息时发生错误");
}
}
效果展示

消费异常处理机制

接上所述,当消费者消费三次后还是失败,丢弃肯定是不对的,这块把消息发送到指定存放异常消息的队列,后续由开发者来进行处理,当然开发者也并非一天到晚盯着mq控制台,比较常见的情况是有人给打电话说软件出了问题,然后开发者再排查问题,处理消息

配置

添加该配置后,如果消费者处理失败三次后,把消息投递到error.queue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}


@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
效果展示

控制台

MQ控制台

消息幂等性处理

世事无常,没有什么是绝对的,可能消息会被重复消费,像删除操作,重复执行无所谓,但如果是退款操作这些的话,如果重复退款的话,那估计可以直接回家了,解决方法常见的是给每条消息一个唯一标识,数据库创建message表,每次处理消息前,查找消息中是否存在对应的id,如果存在直接返回

配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;


@Configurable
@ConditionalOnClass(value = {MessageConverter.class})
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 幂等性处理
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}
生产者

发送消息时不需要添加其他内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Direct 路由
*/
@Test
void directMessage() {
Long orderId = Long.valueOf(123456789);
rabbitTemplate.convertAndSend("trade.delay.direct", "delay.order.query", orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(1000);
return message;
}
});
}
消费者

消息的唯一标识就在message.getMessageProperties().getMessageId();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class OrderDelayMessageListener {


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.delay.order.queue", durable = "true"),
exchange = @Exchange(name = "trade.delay.direct", delayed = "true"),
key = "delay.order.query"
))
public void listenOrderDelayMessage(Long orderId, Message message) {
String messageId = message.getMessageProperties().getMessageId();
log.info("消息id:{}", messageId);
log.info("订单ID:{}", orderId);
}
}
效果展示