基于springboot实现的rabbitmq消息确认
目录
- 概述
- 步骤
- 1、引入rabbitmq包
- 2、yml配置文件
- 3、消息转换器MessageConverterConfig
- 4、消息发送确认
- 5、配置类(定义exchange和queue,并将queue绑定在exchange)
- 6、消息发送者
- 7、消息消费者
- 8、测试方法
- 9、补充
概述
RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
步骤
1、引入rabbitmq包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、yml配置文件
spring:
rabbitmq:
#连接地址
host: 192.168.56.10
#端口号
port: 5672
#账号
username: guest
#密码
password: guest
#虚拟主机
virtual-host: /
#开启发送端消息抵达broker的确认
publisher-confirm-type: correlated
#开启发送端消息抵达队列的确认
publisher-returns: true
template:
#mandatory 默认为FALSE,指定消息在没有被队列接收时是否强行退回还是直接丢弃,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会直接将消息扔掉
mandatory: true
listener:
simple:
# 设置消费端手动 ack
acknowledge-mode: manual
3、消息转换器MessageConverterConfig
package com.itheima.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: XuXin
* @date: 2023/9/26
*/
@Configuration
public class MessageConverterConfig {
/**
* 使用json序列化机制,进行消息转换
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4、消息发送确认
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。
消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
我们可以利用这两个Callback来确保消息的100%送达。
ConfirmCallback确认模式
消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。package com.itheima.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/**
- @author: XuXin
@date: 2023/9/22
*/
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {/**
- 不管消息是否成功到达交换机都会被调用
* - @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
- @param ack 消息是否成功收到 只要消息抵达broker就ack=true
@param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(“ 回调id:” + correlationData);
if (ack) {log.info("消息成功发送");
} else {
log.info("消息发送失败:" + cause);
}
}
}
- 不管消息是否成功到达交换机都会被调用
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
ack:消息投递到broker 的状态,true表示成功。
cause:表示投递失败的原因。
但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback。
ReturnCallback 退回模式
如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。package com.itheima.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/**
- @author: XuXin
@date: 2023/9/25
*/
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnsCallback {/**
- @param re 只有在交换机到达队列失败的时候才会被触发,当这个回调函数被调用的时候说明交换机的消息没有顺利的到达队列
- message 投递失败的消息详细信息
- replyCode 回复的状态码
- replyText回复的文本内容
- exchange 当时这个消息发给哪个交换机
routingKey 当时这个消息发给哪个路由键
*/
@Override
public void returnedMessage(ReturnedMessage re) {log.info(“Returned: “ + re.getMessage() + “\nreplyCode: “ + re.getReplyCode()
- “\nreplyText: “ + re.getReplyText() + “\nexchange: “
- re.getExchange() + “\nroutingKey: “ + re.getRoutingKey());
}
}
实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
在rabbitTemplate中设置 Confirm 和 Return 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id 为10000000000。
5、配置类(定义exchange和queue,并将queue绑定在exchange)
package com.itheima.config;
import com.itheima.producer.ConfirmCallback;
import com.itheima.producer.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @author xx
* @date 2023/9/22
* springboot自动读取yml文件自动配置,这里可删
* 定义完成后 rabbitmq服务器会自动创建交换机和队列以及绑定关系
* 在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态创建交换机、队列、交换机和队列的绑定关系,让我们无需进行重复的编码工作。
*/
@Slf4j
@Configuration
public class RabbitMQConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallback confirmCallback;
@Autowired
private ReturnCallback returnCallback;
/**
* 队列
*/
public static final String QUEUE1 = "atguigu";
public static final String QUEUE2 = "atguigu.news";
public static final String QUEUE3 = "atguigu.emps";
public static final String QUEUE4 = "gulixueyuan.news";
/**
* 定义交换机名称
*/
public static final String EXCHANGE_DIRECT_NAME = "exchange.direct";
public static final String EXCHANGE_FANOUT_NAME = "exchange.fanout";
public static final String EXCHANGE_TOPIC_NAME = "exchange.topic";
/**
* 设置路由key
* #匹配0个或多个单词,*匹配一个单词
*/
public static final String ROUTINGKEY1 = "atguigu";
public static final String ROUTINGKEY2 = "atguigu.news";
public static final String ROUTINGKEY3 = "atguigu.emps";
public static final String ROUTINGKEY4 = "gulixueyuan.news";
public static final String ROUTINGKEY5 = "atguigu.#";
public static final String ROUTINGKEY6 = "*.news";
/**
* 定义 直连交换机
*/
@Bean("directExchange")
public DirectExchange directExchange() {
//参数 交换机名称
return new DirectExchange(EXCHANGE_DIRECT_NAME, true, false);
}
/**
* 定义 扇形交换机
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
//参数 交换机名称
return new FanoutExchange(EXCHANGE_FANOUT_NAME, true, false);
}
/**
* 定义 主题交换机
*/
@Bean("topicExchange")
public TopicExchange topicExchange() {
//参数 交换机名称
return new TopicExchange(EXCHANGE_TOPIC_NAME, true, false);
}
/**
* 创建队列
* 参数一:队列名称
* 参数二durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* 参数三exclusive:默认也是false,是否独占队列
* 参数四autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
*/
@Bean("queue1")
public Queue queue1() {
return new Queue(QUEUE1, true, false, false);
}
@Bean("queue2")
public Queue queue2() {
return new Queue(QUEUE2, true, false, false);
}
@Bean("queue3")
public Queue queue3() {
return new Queue(QUEUE3, true, false, false);
}
@Bean("queue4")
public Queue queue4() {
return new Queue(QUEUE4, true, false, false);
}
/**
* 队列绑定交换机
*
* @param queue 队列注入到容器的id,也就是方法名 Queue1
* @param directExchange 交换机注入到容器的id,也就是方法名 directExchange
* @return
*/
@Bean
public Binding bindingQueue1DirectExchange(@Qualifier("queue1") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY1);
}
@Bean
public Binding bindingQueue2DirectExchange(@Qualifier("queue2") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY2);
}
@Bean
public Binding bindingQueue3DirectExchange(@Qualifier("queue3") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY3);
}
@Bean
public Binding bindingQueue4DirectExchange(@Qualifier("queue4") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY4);
}
@Bean
public Binding bindingQueue1FanoutExchange(@Qualifier("queue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2FanoutExchange(@Qualifier("queue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue3FanoutExchange(@Qualifier("queue3") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue4FanoutExchange(@Qualifier("queue4") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue1TopicExchange(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue2TopicExchange(@Qualifier("queue2") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue3TopicExchange(@Qualifier("queue3") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue4TopicExchange(@Qualifier("queue4") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY6);
}
/**
* 定制rabbitTemplate
* 1、服务收到消息就回调
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调ConfirmCallback
* 2、消息正确抵达队列进行回调
* 1、spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.template.mandatory= true
* 2、设置确认回调ReturnCallback
* <p>
* 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。
* listener.simple.acknowledge-mode=manual
* 1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
* 问题:
* 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失;
* 消费者手动确认。只要我们没有明确告诉MQ,货物被签收。没有Ack,消息就一 直是unacked状态。即使Consumer宕机,
* 消息也不会丢失,会重新变为Ready,下次有新的Consumer连接进来就发给他
* 2、如何签收:
* channel.basicAck(deliveryTag, false) ;签收;业务成功完成就应该签收
* channel.basicNack(deliveryTag, false, true);拒签;业务失败,拒签|
*/
@PostConstruct
public void initRabbitTemplate() {
//设置确认回调
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnCallback);
}
}
6、消息发送者
package com.itheima.producer;
import com.itheima.config.RabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author: XuXin
* @date: 2023/9/25
* 1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
*/
@Component
public class MsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String routingKey, Object content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT_NAME, routingKey, content, correlationId);
}
public void sendMsg(String routingKey, Object content, String uuid) {
CorrelationData correlationId = new CorrelationData(uuid);
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT_NAME, routingKey, content, correlationId);
}
}
7、消息消费者
package com.itheima.receiver;
import com.itheima.domain.Book;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: XuXin
* @date: 2023/9/25
*/
@Slf4j
@Component
@RabbitListener(queues = "atguigu.news")
public class MsgReceiver {
/**
* queues: 声明需要监听的所有队列
* org.springframework-amqp.core.Message
* 参数可以写一下类型
* 1、Message message: 原生消息详细信息。头+体
* 2、T<发送的消息的类型> OrderReturnReasonEntity content;
* 3、Channel channel: 当前传输数据的通道
* <p>
* Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
* 场景:
* 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一-个消息
*
* @param message
* @param content
* @throws IOException
*/
@RabbitHandler
public void processHandler(Message message, Book content, Channel channel) throws IOException {
try {
log.info("收到消息:{}", content);
//TODO 具体业务
//是deliveryTagchannel内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收消息,非批量模式
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息即将再次返回队列处理...");
//退货 requeue=false丢弃 requeue=true发回服务器,服务器重新入队。
//long deliveryTag, boolean multiple, boolean requeue
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
监听消息必须要有@EnableRabbit注解,如果只是创建交换机,队列以及绑定不需要此注解。
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
- basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple) - deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
2、basicNack
- basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) - deliveryTag:表示消息投递序号。
multiple:是否批量确认。 - requeue:值为 true 消息将重新入队列。
3、basicReject
- basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue) - deliveryTag:表示消息投递序号。
- requeue:值为 true 消息将重新入队列。
8、测试方法
package com.itheima.controller;
import com.itheima.config.RabbitConfig;
import com.itheima.domain.Book;
import com.itheima.producer.MsgProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: XuXin
* @date: 2023/9/25
*/
@Slf4j
@RestController
public class TestRabbitMq {
@Autowired
private MsgProducer msgProducer;
@GetMapping("/sendMq")
public String sendMq() {
System.out.println("执行");
Book book = new Book();
book.setName("cook");
book.setType("eat");
msgProducer.sendMsg(RabbitConfig.ROUTINGKEY2,book);
return "ok";
}
}
9、补充
- 别忘确认消息
这是一个非常没技术含量的坑,但却是非常容易犯错的地方。
开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。 消息无限投递
在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消费者 2 号收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
重复消费
如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis将消息持久化,通过再消息中的唯一性属性校验。
可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:
消息生产者 - > rabbitmq服务器(消息发送失败)
rabbitmq服务器自身故障导致消息丢失
消息消费者 - > rabbitmq服务(消费消息失败)
还没有评论,来说两句吧...