简介

当消息成为Dead Message后,可以重新被发送到另一个交换机,这个交换机就是死信交换机DLX(死信交换机),然后这个Dead Message会发送到与之绑定的死信队列。

消息成为Dead Message的三种情况:

  1. 消息由于消息有效期(per-message TTL)过期,队列的有效期并不会导致其中的消息过期
  2. 消息由于队列超过其长度限制而被丢弃
  3. 消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)

实现过程

情况一

消息由于消息有效期(per-message TTL)过期

rabbitmq-common模块的RabbitMQConfig类添加配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* dlx交换机名称
*/
public static final String DLX_DIRECT_EXCHANGE = "dlxExchange";

/**
* dlx队列名称
*/
public static final String DLX_QUEUE = "dlxQueue";

/**
* dlx路由key
*/
public static final String DLX_ROUTING_KEY = "dlxRoutingKey";

/**
* 定义一个dlx交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_DIRECT_EXCHANGE);
}

/**
* 定义一个dlx队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE);
}

/**
* dlx队列绑定dlx交换机
*
* @return
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}

修改rabbitmq-common模块的RabbitMQConfig类的ttlQueue方法,主要实现TTL队列和死信交换机的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 定义一个ttl队列
*
* @return
*/
@Bean
public Queue ttlQueue() {
Map<String, Object> map = new HashMap<>(16);
//设置队列的消息过期时间为4秒
map.put("x-message-ttl", 4000);
//绑定死信交换机,过期的消息会进入死信交换机然后进入死信队列
map.put("x-dead-letter-exchange", DLX_DIRECT_EXCHANGE);
//绑定死信交换机的路由
map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
//参数一为队列名称,参数二为是否持久化,参数三是否独享、排外的,参数四为是否自动删除,参数五为队列的其他属性参数
return new Queue(TTL_QUEUE, true, false, false, map);
}

测试

首先删除ttlQueue不然该队列的新配置不会生效,向ttlQueue队列发送一个TTL消息后,如果过了消息的有效期,该消息就会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列

情况二

消息由于队列超过其长度限制而被丢弃,比如:队列可容纳的最大消息数为10,但是我们一下子向里面发送20条消息,后面的10条消息会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列

rabbitmq-common模块的RabbitMQConfig类的ttlQueue添加配置x-max-length

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 定义一个ttl队列
*
* @return
*/
@Bean
public Queue ttlQueue() {
Map<String, Object> map = new HashMap<>(16);
//设置队列的消息过期时间为4秒
map.put("x-message-ttl", 10000);
//绑定死信交换机,过期的消息会进入死信交换机然后进入死信队列
map.put("x-dead-letter-exchange", DLX_DIRECT_EXCHANGE);
//绑定死信交换机的路由
map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
//设置队列的最大长度
map.put("x-max-length", 10);
//参数一为队列名称,参数二为是否持久化,参数三是否独享、排外的,参数四为是否自动删除,参数五为队列的其他属性参数
return new Queue(TTL_QUEUE, true, false, false, map);
}

直接修改rabbitmq-producer模块的启动类,实现批量发送20条ttl消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.ledao.producer;

import com.ledao.producer.service.RabbitMQProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

/**
* 提供者启动类
*
* @author LeDao
* @company
* @create 2021-09-04 12:26
*/
@SpringBootApplication
public class ProducerApplication {

public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
RabbitMQProducerService rabbitMQProducerService = (RabbitMQProducerService) ac.getBean("rabbitMQProducerService");
for (int i = 0; i < 20; i++) {
rabbitMQProducerService.sendMessageTll("消息" + i + 1);
}
}
}

测试

首先删除ttlQueue不然该队列的新配置不会生效,批量向ttlQueue队列发送20条TTL消息后,后面进入的10条消息就会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列,过了消息的有效期,ttlQueue队列的10条消息最后也会变成死信

情况三

消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)

rabbitmq-consumer模块的RabbitMQConsumerService类添加一个接口方法:

1
2
3
4
5
6
7
8
/**
* 监听队列接收ttl消息
*
* @param message
* @param channel
* @param deliveryTag
*/
void receiveMessageTtl(String message, Channel channel, long deliveryTag);

rabbitmq-consumer模块的RabbitMQConsumerServiceImpl类实现上面的接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
@RabbitListener(queues = {RabbitMQConfig.TTL_QUEUE})
public void receiveMessageTtl(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("接收到的消息:" + message);
//模拟异常,进入catch方法
System.out.println(1 / 0);
} catch (Exception e) {
e.printStackTrace();
try {
//拒绝的消息不重新入列,会进入死信队列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

测试

先启动rabbitmq-producer模块发送20条ttl消息,会有10条消息直接变成死信,然后启动rabbitmq-consumer模块消费消息,10条消息都会消费失败变成死信而不会等到消息有效期过期才变成死信