简介
当消息成为Dead Message后,可以重新被发送到另一个交换机,这个交换机就是死信交换机DLX(死信交换机),然后这个Dead Message会发送到与之绑定的死信队列。
消息成为Dead Message的三种情况:
- 消息由于消息有效期(per-message TTL)过期,队列的有效期并不会导致其中的消息过期
- 消息由于队列超过其长度限制而被丢弃
- 消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
实现过程
情况一
消息由于消息有效期(per-message TTL)过期
rabbitmq-common模块的RabbitMQConfig类添加配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public static final String DLX_DIRECT_EXCHANGE = "dlxExchange";
public static final String DLX_QUEUE = "dlxQueue";
public static final String DLX_ROUTING_KEY = "dlxRoutingKey";
@Bean public DirectExchange dlxExchange() { return new DirectExchange(DLX_DIRECT_EXCHANGE); }
@Bean public Queue dlxQueue() { return new Queue(DLX_QUEUE); }
@Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); }
|
修改rabbitmq-common模块的RabbitMQConfig类的ttlQueue方法,主要实现TTL队列和死信交换机的绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Bean public Queue ttlQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 4000); map.put("x-dead-letter-exchange", DLX_DIRECT_EXCHANGE); map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(TTL_QUEUE, true, false, false, map); }
|
测试
首先删除ttlQueue不然该队列的新配置不会生效,向ttlQueue队列发送一个TTL消息后,如果过了消息的有效期,该消息就会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列
情况二
消息由于队列超过其长度限制而被丢弃,比如:队列可容纳的最大消息数为10,但是我们一下子向里面发送20条消息,后面的10条消息会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列
rabbitmq-common模块的RabbitMQConfig类的ttlQueue添加配置x-max-length
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@Bean public Queue ttlQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 10000); map.put("x-dead-letter-exchange", DLX_DIRECT_EXCHANGE); map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); map.put("x-max-length", 10); return new Queue(TTL_QUEUE, true, false, false, map); }
|
直接修改rabbitmq-producer模块的启动类,实现批量发送20条ttl消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.ledao.producer;
import com.ledao.producer.service.RabbitMQProducerService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication public class ProducerApplication {
public static void main(String[] args) { ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args); RabbitMQProducerService rabbitMQProducerService = (RabbitMQProducerService) ac.getBean("rabbitMQProducerService"); for (int i = 0; i < 20; i++) { rabbitMQProducerService.sendMessageTll("消息" + i + 1); } } }
|
测试
首先删除ttlQueue不然该队列的新配置不会生效,批量向ttlQueue队列发送20条TTL消息后,后面进入的10条消息就会变成死信Dead Message,进入死信交换机,最后进入与之绑定的死信队列,过了消息的有效期,ttlQueue队列的10条消息最后也会变成死信
情况三
消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
rabbitmq-consumer模块的RabbitMQConsumerService类添加一个接口方法:
1 2 3 4 5 6 7 8
|
void receiveMessageTtl(String message, Channel channel, long deliveryTag);
|
rabbitmq-consumer模块的RabbitMQConsumerServiceImpl类实现上面的接口方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Override @RabbitListener(queues = {RabbitMQConfig.TTL_QUEUE}) public void receiveMessageTtl(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { System.out.println("接收到的消息:" + message); System.out.println(1 / 0); } catch (Exception e) { e.printStackTrace(); try { channel.basicNack(deliveryTag, false, false); } catch (IOException ioException) { ioException.printStackTrace(); } } }
|
测试
先启动rabbitmq-producer模块发送20条ttl消息,会有10条消息直接变成死信,然后启动rabbitmq-consumer模块消费消息,10条消息都会消费失败变成死信而不会等到消息有效期过期才变成死信