前言:RabbitMQ
是一个流行的开源消息队列软件,实现了高级消息队列
协议(AMQP)的标准。主要用于在分布式系统中传递消息,可以在应用程序之间可靠地进行通信。
RabbitMQ 的核心概念包括:
生产者(Producer):生产者是向 RabbitMQ 发送消息的应用程序。
队列(Queue):队列是用于存储消息的缓冲区。生产者将消息发送到队列,消费者从队列中接收消息。
消费者(Consumer):消费者是从 RabbitMQ 接收消息的应用程序。
交换机(Exchange):交换机是用于将消息路由到一个或多个队列的组件。生产者将消息发送到交换机,交换机根据预定义的规则(称为路由键)将消息路由到一个或多个队列。
绑定(Binding):绑定是交换机和队列之间的关联,它定义了交换机如何将消息路由到队列。
安装
使用docker来安装RabbitMQ,使用的版本是3.8
软件包上传服务器
这块把软件包拖到home
目录,后切换到安装包存放的目录
加载镜像文件
启动RabbitMQ
1
| docker run -e RABBITMQ_DEFAULT_USER=gust -e RABBITMQ_DEFAULT_PASS=gust -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management
|
RabbitMQ控制台
在浏览器使用http://服务器ip:15672/ 打开RabbitMQ
概览(Overview):通常指的是对消息队列系统的整体概述或总体情况的描述。
连接(Connections):连接是客户端应用程序与消息队列服务器之间的通信通道。它表示客户端与消息队列服务器之间建立的网络连接,用于发送和接收消息。
通道(Channels):通道是在连接内部创建的虚拟通信通道。通常,一个连接可以包含多个通道,每个通道都可以执行独立的操作,比如发送消息、接收消息、声明队列等。通道提供了多路复用的功能,使得客户端可以在单个连接上进行并发操作。
交换机(Exchanges):交换机是消息的分发器,用于将消息路由到一个或多个队列。当生产者发送消息时,消息首先发送到交换机,然后根据预定义的规则(比如路由键)将消息路由到一个或多个队列中。
队列(Queues):队列是消息的缓冲区,用于存储待处理的消息。消费者从队列中获取消息,并进行处理。队列具有先进先出(FIFO)的特性,保证消息按照顺序进行处理。
管理(Admin):管理指的是消息队列的管理功能,包括监控、配置、维护等操作。管理功能通常由管理员使用管理工具进行操作,用于监控队列的健康状态、性能指标等,并进行必要的管理和调整。
Java项目集成RabbitMQ
maven配置
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
yml配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.58.101 port: 5672 password: gust username: gust virtual-host: /
|
消息
基础认知:mq发送的消息只能通过交换机来通知给队列,交换机只负责转发消息,并不存储消息,如果没有队列和交换机绑定,将丢失消息
Fanout 广播模式
Fanout 发出的消息只要绑定了对应交换机的队列都可以拿到
生产者
1 2 3 4 5 6 7 8 9
|
@Test void fanoutMessage() { String message = "这是广播消息!"; rabbitTemplate.convertAndSend("inform.fanout", null, message); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.listen.rabbitmq.config;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqFanoutListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "inform1.queue", durable = "true"), exchange = @Exchange(name = "inform.fanout", type = ExchangeTypes.FANOUT) )) public void fanoutOneMessage(String msg) { log.info("广播模式 消费者1:{}", msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "inform2.queue", durable = "true"), exchange = @Exchange(name = "inform.fanout", type = ExchangeTypes.FANOUT) )) public void fanoutTwoMessage(String msg) { log.info("广播模式 消费者2:{}", msg); } }
|
效果展示
Direct 路由模式
通过RoutingKey发送给订阅了消息的队列,这块只发送给路由为red的消费者
生产者
1 2 3 4 5 6 7 8
|
@Test void directMessage() { String message = "这是路由消息!"; rabbitTemplate.convertAndSend("subscribe.direct", "red", message); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqDirectListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT)), key = "red" )) public void directOneMessage(String msg) { log.info("路由模式 消费者1:{}", msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer2.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT)), key = "blue" )) public void directTwoMessage(String msg) { log.info("路由模式 消费者2:{}", msg); } }
|
效果展示
Topic 主题模式
与Direct 类似,区别在于这个可以使用通配符
#匹配一个或者多个词,*匹配一个词
生产者
1 2 3 4 5 6 7 8
|
@Test void topicMessage() { String message = "这是主题消息!"; rabbitTemplate.convertAndSend("dynamic.topic", "info.important.notice", message); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqTopicListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.topic.queue", durable = "true"), exchange = @Exchange(name = "dynamic.topic", type = ExchangeTypes.TOPIC), key = "#.notice" )) public void topicOneMessage(String msg) { log.info("主题模式 消费者1:{}", msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer2.topic.queue", durable = "true"), exchange = @Exchange(name = "dynamic.topic", type = ExchangeTypes.TOPIC), key = "*.notice" )) public void topicTwoMessage(String msg) { log.info("主题模式 消费者2:{}", msg); }
}
|
效果展示
进阶
延迟消息
通过死信队列实现
在mq中,当一个消息的TTL过期后,并不会马上投递出去,只有在队首的时候才会被处理,不推荐使用这种方式来实现延迟消息
通过延时插件实现
安装插件
实现延时消息需要安装rabbitmq-delayed-message-exchange
插件
在https://github.com/rabbitmq/rabbitmq-delayed-message-exchange下载对应版本的插件
使用docker查看mq的挂载地址,将插件上传进去
1
| docker volume inspect mq-plugins
|
docker加载延时插件
1
| docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Test void testPublisherDelayMessage() { String message = "这是延时消息!"; rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(5000); return message; } }); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component @Slf4j public class MQListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延迟消息:{}", msg); } }
|
效果展示
指定消费者预取值
正常情况下,消息是平均分配给消费者的,但每个消费者的处理速度不一样,有的消费者都处理完了在闲置,有的还在处理,这块指定为1,这样消费者会当这条数据处理完成后,再接着处理下一条,处理快的消费者,多处理一些,避免消息积压
yml配置
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: 192.168.58.101 port: 5672 password: gust username: gust virtual-host: / listener: simple: prefetch: 1
|
生产者
消费者平均分配消息
1 2 3 4 5 6 7 8 9 10
|
@Test void fanoutMessageOne() { for (int i = 1; i < 50; i++) { String message = "这是路由消息!"; rabbitTemplate.convertAndSend("subscribe.direct", "red", message); } }
|
消费者
一个间隔100毫秒,一个间隔500毫秒,第一个消费者处理完了在闲置,第二个消费者处理的慢,但消息是平均的,如果一直进来消息,消费者2的消息就会积压起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqDirectListener { @SneakyThrows @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "red" )) public void directOneMessage(String msg) { Thread.sleep(100); log.info("路由模式 消费者1:{}", msg); }
@SneakyThrows @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "red" )) public void directOneMessage1(String msg) { Thread.sleep(500); log.info("路由模式 消费者2:{}", msg); } }
|
不配置消费者预取值
效果展示
配置消费者预取值
效果展示
消息转换器
默认的消息默认是通过JDK来序列化,不方便查看,配置消息转换器后就方便了些
maven
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean;
@Configurable @ConditionalOnClass(value = {MessageConverter.class}) public class MqConfig { @Bean public MessageConverter messageConverter() { Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); return jackson2JsonMessageConverter; } }
|
生产者
1 2 3 4 5 6 7 8 9 10
|
@Test void directMessage() { HashMap map = new HashMap(); map.put("name", "胡图图"); map.put("id", "1"); rabbitTemplate.convertAndSend("subscribe.direct", "red", map); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.HashMap;
@Component @Slf4j public class MqDirectListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "red" )) public void directOneMessage(HashMap msg) { log.info("路由模式 消费者1:{}", msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer2.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "blue" )) public void directTwoMessage(HashMap msg) { log.info("路由模式 消费者2:{}", msg); } }
|
效果展示
处理前
处理后
消费者进阶
消费失败重试机制
在消费者处理消失时,可能会因为一些原因处理失败,在配置以下内容后,消费者处理失败后会重试三次,如果三次还是失败就会丢弃消息
yml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: rabbitmq: host: 192.168.58.101 port: 5672 password: gust username: gust virtual-host: / listener: simple: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 stateless: true
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqDirectListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer1.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "red" )) public void directOneMessage(String msg) { log.info("路由模式 消费者1:{}", msg); throw new RuntimeException("处理订单消息时发生错误"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "consumer2.direct.queue", durable = "true"), exchange = @Exchange(name = "subscribe.direct", type = ExchangeTypes.DIRECT), key = "blue" )) public void directTwoMessage(String msg) { log.info("路由模式 消费者2:{}", msg); throw new RuntimeException("处理订单消息时发生错误"); } }
|
效果展示
消费异常处理机制
接上所述,当消费者消费三次后还是失败,丢弃肯定是不对的,这块把消息发送到指定存放异常消息的队列,后续由开发者来进行处理,当然开发者也并非一天到晚盯着mq控制台,比较常见的情况是有人给打电话说软件出了问题,然后开发者再排查问题,处理消息
配置
添加该配置后,如果消费者处理失败三次后,把消息投递到error.queue
队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
|
效果展示
控制台
MQ控制台
消息幂等性处理
世事无常,没有什么是绝对的,可能消息会被重复消费,像删除操作,重复执行无所谓,但如果是退款操作这些的话,如果重复退款的话,那估计可以直接回家了,解决方法常见的是给每条消息一个唯一标识,数据库创建message表,每次处理消息前,查找消息中是否存在对应的id,如果存在直接返回
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean;
@Configurable @ConditionalOnClass(value = {MessageConverter.class}) public class MqConfig { @Bean public MessageConverter messageConverter() { Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; } }
|
生产者
发送消息时不需要添加其他内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Test void directMessage() { Long orderId = Long.valueOf(123456789); rabbitTemplate.convertAndSend("trade.delay.direct", "delay.order.query", orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(1000); return message; } }); }
|
消费者
消息的唯一标识就在message.getMessageProperties().getMessageId();
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Slf4j @Component public class OrderDelayMessageListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "trade.delay.order.queue", durable = "true"), exchange = @Exchange(name = "trade.delay.direct", delayed = "true"), key = "delay.order.query" )) public void listenOrderDelayMessage(Long orderId, Message message) { String messageId = message.getMessageProperties().getMessageId(); log.info("消息id:{}", messageId); log.info("订单ID:{}", orderId); } }
|
效果展示