Java-RabbitMQ:总结
RabbitMQ
初始MQ
同步调用
异步调用
MQ技术选型
MQ:消息队列

RabbitMQ
安装部署
RabbitMQ的整体架构及核心概念:
- virtual-host:虚拟主机,起到数据隔离的作用
- publisher:消息发送者
- consumer:消息的消费者
- queue:队列,存储消息
- exchange:交换机,负责路由消息
快速入门
- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
数据隔离
不同virtual host之间的数据是隔离的。
Java客户端
快速入门
- AMQP
- Spring AMQP
SpringAMQP收发消息
- 引入spring-boot-starter-amqp依赖
- 配置rabbitmq服务端信息
- 利用RabbitTemplate发送消息
- 利用**@RabbitListener注解声明要监听的队列,监听消息**
发送消息:
1 | |
接收消息:
1 | |
Work queues
Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息。
- 多个消费者绑定到一个队列,加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
Fanout交换机
交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。
常见交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。
Direct 交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
Direct交换机与Fanout交换机的差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同RoutingKey,则与Fanout功能类似
Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。
Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个或多个单词
- *:代指一个单词
声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
SpringAMQP提供了基于@RabbitListener注解来声明队列和交换机的方式:
1 | |
消息转换器
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
存在下列问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
使用JSON方式来做序列化和反序列化:
引入依赖:
1 | |
注入消息转换器:
1 | |
发送者的可靠性
发送者重连
有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置可以开启连接失败后的重连机制(阻塞式):
1 | |
发送者确认
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
1 | |

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化
RabbitMQ实现数据持久化包括3个方面:
- 交换机持久化:
Durable - 队列持久化:
Durable - 消息持久化:
Non-persistent
Lazy Queue
从RabbitMQ的3.6.0版本开始增加Lazy Queue,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘,不再存储到内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
- 在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
RabbitMQ如何保证消息的可靠性
- 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
- RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为队列的默认模式。LazyQueue会将所有消息都持久化。
- 开启持久化和发送者确认时, RabbitMQ只有在消息持久化完成后才会给发送者返回ACK回执
消费者的可靠性
消费者确认机制
消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
如果是业务异常,会自动返回nack
如果是消息处理或校验异常,自动返回reject
1 | |
失败重试机制
SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。
1 | |
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
1
2
3
4@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
业务幂等性
幂等:f(x) = f(f(x))
- 定义:同一个业务,执行一次或多次对业务状态影响一致
- 业务:
- 幂等:根据id查询、根据id删除
- 非幂等:下单业务扣减库存、退款业务恢复余额
- 解决:
- 唯一消息id
- 业务判断
唯一消息id
给每个消息都设置一个唯一id,利用id区分是否是重复消息:保证消息只被消费一次
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
业务判断
结合业务逻辑,基于业务本身做判断。

存在问题:交易服务标记订单已支付,由于网络中断,消费者ACK没有发回消息队列,此时消息重新入队;同时用户申请退款,交给交易服务标记订单为退款;这次网络恢复,消息队列重连,又投递标记订单支付的消息,这时标记订单为已支付,把退款中状态覆盖掉。(非幂等)

解决:标记订单为已支付前,先判断订单是否为未支付。
延迟消息
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务。
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
延迟消息插件
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
超时订单问题
