简介
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列,延时时间到了消息才会进入队列中等待着被消费
使用场景
那么什么时候需要用延时队列呢?考虑一下以下场景:
订单在十分钟之内未支付则自动取消
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
账单在一周内未支付,则自动结算
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
实现方式
实现方式有两种:
- 使用TTL+死信队列
- 利用RabbitMQ插件,推荐使用此方式
TTL+死信队列
生产者生产一条消息发送到TTL交换机,然后分发到TTL队列,消息在队列中待到过期后才会发送到死信交换机,然后分发到死信队列供消费者消费
这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦,所以不推荐使用
利用RabbitMQ插件
生产者生产一条消息发送到延时交换机,延时时间结束消息才会进入队列供消费者消费
下载插件
RabbitMQ插件地址:Community Plugins — RabbitMQ ,进入GitHub下载页面插件
下载最新版本,直接点击链接即可下载
安装插件
先将插件上传到虚拟机的 /home/data 目录下,然后将插件复制到RabbitMQ容器的 /plugins 目录下
1
| docker cp /home/data/rabbitmq_delayed_message_exchange-3.9.0.ez RabbitMQ容器id或名称:/plugins
|
进入RabbitMQ容器
1
| docker exec -it RabbitMQ容器id或名称 /bin/bash
|
安装命令
1
| rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
重启Rabbit容器
1
| docker restart RabbitMQ容器id或名称
|
实现过程
rabbitmq-common模块的RabbitMQConfig添加以下配置创建delayed交换机、队列,以及队列和交换机绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
public static final String DELAYED_EXCHANGE = "delayedExchange";
public static final String DELAYED_QUEUE = "delayedQueue";
public static final String DELAYED_ROUTING_KEY = "delayedRoutingKey";
@Bean public CustomExchange delayedExchange() { Map<String, Object> map = new HashMap<>(16); map.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map); }
@Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); }
@Bean public Binding delayedBinding() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs(); }
|
rabbitmq-producer模块的RabbitMQProducerService类添加一个接口方法:
1 2 3 4 5 6 7
|
void sendMessageDelayed(String message, Integer delayTime);
|
rabbitmq-producer模块的RabbitMQProducerServiceImpl类实现上面的接口方法:
1 2 3 4 5 6 7
| @Override public void sendMessageDelayed(String message, Integer delayTime) { rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE,RabbitMQConfig.DELAYED_ROUTING_KEY,message,a->{ a.getMessageProperties().setDelay(delayTime); return a; }); }
|
修改rabbitmq-producer模块的启动类,实现启动就发送两条延时消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.ledao.producer;
import com.ledao.producer.service.RabbitMQProducerService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext;
@SpringBootApplication public class ProducerApplication {
public static void main(String[] args) { ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args); RabbitMQProducerService rabbitMQProducerService = (RabbitMQProducerService) ac.getBean("rabbitMQProducerService"); rabbitMQProducerService.sendMessageDelayed("延时消息1",10000); rabbitMQProducerService.sendMessageDelayed("延时消息2",20000); } }
|
rabbitmq-consumer模块的RabbitMQConsumerService类添加一个接口方法:
1 2 3 4 5 6
|
void receiveMessageDelayed(String message);
|
rabbitmq-consumer模块的RabbitMQConsumerServiceImpl类实现上面的接口方法:
1 2 3 4 5
| @Override @RabbitListener(queues = {RabbitMQConfig.DELAYED_QUEUE}) public void receiveMessageDelayed(String message) { System.out.println("接收到的延时消息:"+message+" , 当前时间:"+new Date()); }
|
修改rabbitmq-consumer模块application.yml配置文件,主要是将监听策略改为自动确认,方便测试
1 2 3 4 5
| spring: rabbitmq: listener: simple: acknowledge-mode: none
|
到此实现了监听队列接收delayed消息的功能
测试
启动rabbitmq-consumer模块再启动rabbitmq-producer模块,然后去rabbitmq-consumer模块的启动控制台窗口结果,两次消费消息的间隔时间为10秒