简介
在使用RabbitMQ的时候,消息生产者发送消息不希望出现消息丢失或者投递失败的现象,RabbitMQ在消息投递可靠性方面给我们提供了两种模式:①confirm确认模式 ②return退回模式
confirm确认模式
介绍
作用于生产者发送到交换机过程,发送成功与否都会调用
消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
实现
rabbitmq-producer模块的application.yml文件添加以下配置:
1
| spring.rabbitmq.publisher-confirm-type=correlated
|
RabbitMQProducerServiceImpl类实现RabbitTemplate.ConfirmCallback接口,再实现confirm方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("confirm方法被执行了..." + correlationData); if (b) { System.out.println("消息成功发送到交换机" + s); } else { System.out.println("消息发送失败" + s); } }
|
RabbitMQProducerServiceImpl类注入RabbitTemplate模版
1 2
| @Resource private RabbitTemplate rabbitTemplate;
|
RabbitMQProducerServiceImpl类初始化设置,放在所有模板注入的后面
1 2 3 4
| @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); }
|
RabbitMQProducerService类添加一个接口方法:
1 2 3 4 5 6
|
void sendMessageConfirm(String message);
|
RabbitMQProducerServiceImpl类实现上面的接口方法:
1 2 3 4 5
| @Override public void sendMessageConfirm(String message) { CorrelationData correlationData = new CorrelationData(); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTINGKEY, message,correlationData); }
|
实现一个调用Confirm方法的发送消息功能
在sendInformationPage.html下添加一个form标签:
1 2 3 4 5
| <form action="/sendMessageConfirm" method="post"> <h2>发送消息调用Confirm方法</h2> 消息:<input type="text" name="message" required="required"> <input type="submit" value="发送"> </form>
|
IndexController类下添加一个方法:
1 2 3 4 5 6 7 8 9 10 11 12
|
@ResponseBody @RequestMapping("/sendMessageConfirm") public String sendMessageConfirm(String message) { rabbitMQProducerService.sendMessageConfirm(message); return "发送消息成功<a href='/toSendInformationPage'><button>继续发送</button></a>"; }
|
测试
启动rabbitmq-producer模块,然后在浏览器地址栏输入:http://localhost/toSendInformationPage 进入发送消息页面,发送一条消息
然后去rabbitmq-producer模块启动控制台查看结果
return退回模式
介绍
作用于交换机发送到队列过程,发送失败才调用
Return Listener用于处理一些不可路由的消息
我们的消息生产者,通过制定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或指定的路由key路由不到,这个时候我们需要监听这种不可达的消息
实现
rabbitmq-producer模块的application.yml文件添加以下配置:
1
| spring.rabbitmq.publisher-returns-type=correlated
|
RabbitMQProducerServiceImpl类实现RabbitTemplate.ReturnCallback接口,再实现returnedMessage方法
1 2 3 4 5 6 7 8 9
| @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("return方法执行..."); System.out.println("发送的消息内容:" + new String(returnedMessage.getMessage().getBody())); System.out.println("replyCode:" + returnedMessage.getReplyCode()); System.out.println("replyText:" + returnedMessage.getReplyText()); System.out.println("exchange:" + returnedMessage.getExchange()); System.out.println("routingKey:" + returnedMessage.getRoutingKey()); }
|
RabbitMQProducerServiceImpl类初始化设置中添加以下代码:
1
| rabbitTemplate.setReturnsCallback(this);
|
RabbitMQProducerServiceImpl类修改sendMessageConfirm方法,修改rabbitTemplate.convertAndSend()第二个参数,只要不和对应消息队列的routingKey一样就行(为了让交换器发送到队列失败)
测试
和上面confirm确认模式那样测试即可