简介

在使用RabbitMQ的时候,消息生产者发送消息不希望出现消息丢失或者投递失败的现象,RabbitMQ在消息投递可靠性方面给我们提供了两种模式:①confirm确认模式 ②return退回模式

confirm确认模式

介绍

作用于生产者发送到交换机过程,发送成功与否都会调用

消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障

实现

rabbitmq-producer模块的application.yml文件添加以下配置:

1
spring.rabbitmq.publisher-confirm-type=correlated

RabbitMQProducerServiceImpl类实现RabbitTemplate.ConfirmCallback接口,再实现confirm方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @param correlationData 消息唯一标识
* @param b 交换机是否成功收到消息 true成功 false失败
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被执行了..." + correlationData);
if (b) {
//发送成功失败原因为null
System.out.println("消息成功发送到交换机" + s);
} else {
System.out.println("消息发送失败" + s);
}
}

RabbitMQProducerServiceImpl类注入RabbitTemplate模版

1
2
@Resource
private RabbitTemplate rabbitTemplate;

RabbitMQProducerServiceImpl类初始化设置,放在所有模板注入的后面

1
2
3
4
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

RabbitMQProducerService类添加一个接口方法:

1
2
3
4
5
6
/**
* 发送消息并返回发送情况
*
* @param message
*/
void sendMessageConfirm(String message);

RabbitMQProducerServiceImpl类实现上面的接口方法:

1
2
3
4
5
@Override
public void sendMessageConfirm(String message) {
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTINGKEY, message,correlationData);
}

实现一个调用Confirm方法的发送消息功能

在sendInformationPage.html下添加一个form标签:

1
2
3
4
5
<form action="/sendMessageConfirm" method="post">
<h2>发送消息调用Confirm方法</h2>
消息:<input type="text" name="message" required="required">
<input type="submit" value="发送">
</form>

IndexController类下添加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 发送消息调用Confirm方法
*
* @param message
* @return
*/
@ResponseBody
@RequestMapping("/sendMessageConfirm")
public String sendMessageConfirm(String message) {
rabbitMQProducerService.sendMessageConfirm(message);
return "发送消息成功<a href='/toSendInformationPage'><button>继续发送</button></a>";
}

测试

启动rabbitmq-producer模块,然后在浏览器地址栏输入:http://localhost/toSendInformationPage 进入发送消息页面,发送一条消息

然后去rabbitmq-producer模块启动控制台查看结果

return退回模式

介绍

作用于交换机发送到队列过程,发送失败才调用

Return Listener用于处理一些不可路由的消息

我们的消息生产者,通过制定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或指定的路由key路由不到,这个时候我们需要监听这种不可达的消息

实现

rabbitmq-producer模块的application.yml文件添加以下配置:

1
spring.rabbitmq.publisher-returns-type=correlated

RabbitMQProducerServiceImpl类实现RabbitTemplate.ReturnCallback接口,再实现returnedMessage方法

1
2
3
4
5
6
7
8
9
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("return方法执行...");
System.out.println("发送的消息内容:" + new String(returnedMessage.getMessage().getBody()));
System.out.println("replyCode:" + returnedMessage.getReplyCode());
System.out.println("replyText:" + returnedMessage.getReplyText());
System.out.println("exchange:" + returnedMessage.getExchange());
System.out.println("routingKey:" + returnedMessage.getRoutingKey());
}

RabbitMQProducerServiceImpl类初始化设置中添加以下代码:

1
rabbitTemplate.setReturnsCallback(this);

RabbitMQProducerServiceImpl类修改sendMessageConfirm方法,修改rabbitTemplate.convertAndSend()第二个参数,只要不和对应消息队列的routingKey一样就行(为了让交换器发送到队列失败)

测试

和上面confirm确认模式那样测试即可