简介

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列,延时时间到了消息才会进入队列中等待着被消费

使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

订单在十分钟之内未支付则自动取消

新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒

账单在一周内未支付,则自动结算

用户注册成功后,如果三天内没有登陆则进行短信提醒

用户发起退款,如果三天内没有得到处理则通知相关运营人员

预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

实现方式

实现方式有两种:

  1. 使用TTL+死信队列
  2. 利用RabbitMQ插件,推荐使用此方式

TTL+死信队列

生产者生产一条消息发送到TTL交换机,然后分发到TTL队列,消息在队列中待到过期后才会发送到死信交换机,然后分发到死信队列供消费者消费

这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦,所以不推荐使用

利用RabbitMQ插件

生产者生产一条消息发送到延时交换机,延时时间结束消息才会进入队列供消费者消费

下载插件

RabbitMQ插件地址:Community Plugins — RabbitMQ ,进入GitHub下载页面插件

下载最新版本,直接点击链接即可下载

安装插件

先将插件上传到虚拟机的 /home/data 目录下,然后将插件复制到RabbitMQ容器的 /plugins 目录下

1
docker cp /home/data/rabbitmq_delayed_message_exchange-3.9.0.ez RabbitMQ容器id或名称:/plugins

进入RabbitMQ容器

1
docker exec -it RabbitMQ容器id或名称 /bin/bash

安装命令

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启Rabbit容器

1
docker restart RabbitMQ容器id或名称

实现过程

rabbitmq-common模块的RabbitMQConfig添加以下配置创建delayed交换机、队列,以及队列和交换机绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* delayed交换机
*/
public static final String DELAYED_EXCHANGE = "delayedExchange";

/**
* delayed队列
*/
public static final String DELAYED_QUEUE = "delayedQueue";

/**
* delayed路由key
*/
public static final String DELAYED_ROUTING_KEY = "delayedRoutingKey";

/**
* 定义delayed交换机
*
* @return
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> map = new HashMap<>(16);
map.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map);
}

/**
* 定义delayed队列
*
* @return
*/
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}

/**
* delayed队列绑定delayed交换机
*
* @return
*/
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
}

rabbitmq-producer模块的RabbitMQProducerService类添加一个接口方法:

1
2
3
4
5
6
7
/**
* 发送延时消息
*
* @param message 消息内容
* @param delayTime 延时时间,单位为毫秒
*/
void sendMessageDelayed(String message, Integer delayTime);

rabbitmq-producer模块的RabbitMQProducerServiceImpl类实现上面的接口方法:

1
2
3
4
5
6
7
@Override
public void sendMessageDelayed(String message, Integer delayTime) {
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE,RabbitMQConfig.DELAYED_ROUTING_KEY,message,a->{
a.getMessageProperties().setDelay(delayTime);
return a;
});
}

修改rabbitmq-producer模块的启动类,实现启动就发送两条延时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ledao.producer;

import com.ledao.producer.service.RabbitMQProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

/**
* 提供者启动类
*
* @author LeDao
* @company
* @create 2021-09-04 12:26
*/
@SpringBootApplication
public class ProducerApplication {

public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
RabbitMQProducerService rabbitMQProducerService = (RabbitMQProducerService) ac.getBean("rabbitMQProducerService");
rabbitMQProducerService.sendMessageDelayed("延时消息1",10000);
rabbitMQProducerService.sendMessageDelayed("延时消息2",20000);
}
}

rabbitmq-consumer模块的RabbitMQConsumerService类添加一个接口方法:

1
2
3
4
5
6
/**
* 监听队列接收delayed消息
*
* @param message
*/
void receiveMessageDelayed(String message);

rabbitmq-consumer模块的RabbitMQConsumerServiceImpl类实现上面的接口方法:

1
2
3
4
5
@Override
@RabbitListener(queues = {RabbitMQConfig.DELAYED_QUEUE})
public void receiveMessageDelayed(String message) {
System.out.println("接收到的延时消息:"+message+" , 当前时间:"+new Date());
}

修改rabbitmq-consumer模块application.yml配置文件,主要是将监听策略改为自动确认,方便测试

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none #自动确认

到此实现了监听队列接收delayed消息的功能

测试

启动rabbitmq-consumer模块再启动rabbitmq-producer模块,然后去rabbitmq-consumer模块的启动控制台窗口结果,两次消费消息的间隔时间为10秒