简介

ack——acknowledge(vt. 承认;答谢;报偿;告知已收到;确认的意思),在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息

当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。

RabbitMQ提供了三种确认方式:

  1. 自动确认acknowledge=”none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉RabbitMQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否,这是RabbitMQ默认的

  2. 手动确认acknowledge=”manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向RabbitMQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让RabbitMQ重新向消费者发送消息都是可以的,一般使用这种方式

  3. 根据异常情况确认acknowledge=”auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等),这种方式比较麻烦,一般不使用

实现过程

rabbitmq-consumer消费端application.yml添加以下配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #ack手动确认

rabbitmq-consumer消费端的RabbitMQConsumerService类添加一个接口方法:

1
2
3
4
5
6
7
8
/**
* 手动确认消息的接收
*
* @param message
* @param channel
* @param deliveryTag
*/
void receiveMessageAck(String message, Channel channel, long deliveryTag);

rabbitmq-consumer消费端的RabbitMQConsumerServiceImpl类实现上面的接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 手动确认消息的接收
*
* @param message 接收的消息
* @param channel 队列和消费端的连接管道
* @param deliveryTag 消息接收tag
*/
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE})
public void receiveMessageAck(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
System.out.println("接收到的消息:" + message);
try {
//如果没有异常就接收消息,第一个参数为消息接收tag,第二个参数为是否批量确认
channel.basicAck(deliveryTag, t);
} catch (IOException e) {
e.printStackTrace();
try {
//让消息重新入列,第一个参数为消息接收tag,第二个参数为是否批量确认,第三个参数为被拒绝的消息是否重新入列(如果设置为false将会删除被拒绝的消息)
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

把rabbitmq-consumer消费端的RabbitMQConsumerServiceImpl类监听RabbitMQConfig.DIRECT_QUEUE队列的消息的@RabbitListener注解删除,这是为了只能让receiveMessageAck()方法消费RabbitMQConfig.DIRECT_QUEUE队列的消息

Channel.basicAck(long deliveryTag, boolean multiple)方法

功能:确认消费

参数说明:

第一个参数deliveryTag:消息ID,从1开始

第二个参数multiple:是否批量,将一次性ack所有小于deliveryTag的消息

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)方法

功能:反馈消息消费失败

参数说明:

第一个参数deliveryTag:消息ID,从1开始

第二个参数multiple:是否批量,将一次性拒绝所有小于deliveryTag的消息

第三个参数requeue:被拒绝的消息是否重新入列

Channel.basicReject(long deliveryTag, boolean requeue)

功能:反馈消息消费失败

参数说明:

第一个参数deliveryTag:消息ID,从1开始

第二个参数requeue:被拒绝的消息是否重新入列