简介

消息过期时间

在指定的过期时间内,如果消息没有被消费,这个消息就会过期从队列中移除

实现方式

  1. 给消息设置过期时间
  2. 给队列设置过期时间,交换器分发到这个队列的消息都会有相同的过期时间,推荐使用这种方式

注意

如果队列设置了消息过期时间并且消息也设置了过期时间,那么数值小的过期时间生效,例如:

  1. 一个消息的过期时间为10秒,队列的消息过期时间为20秒,那么该消息的过期时间为10秒
  2. 一个消息的过期时间为20秒,队列的消息过期时间为10秒,那么该消息的过期时间为10秒。同一个队列的消息过期时间最好一样,千万不能让队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去

实现过程

给消息设置

创建交换机、队列,交换机绑定队列

在RabbitMQConfig类中添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* ttl交换机名称
*/
public static final String TTL_DIRECT_EXCHANGE = "ttlDirectExchange";

/**
* ttl队列名称
*/
public static final String TTL_QUEUE = "ttlQueue";

/**
* ttl路由key
*/
public static final String TTL_ROUTING_KEY = "ttlRoutingKey";

/**
* 定义一个ttl交换机
*
* @return
*/
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange(TTL_DIRECT_EXCHANGE);
}

/**
* 定义一个ttl队列绑定ttl交换机
*
* @return
*/
@Bean
public Queue ttlQueue() {
return new Queue(TTL_QUEUE);
}

/**
* ttl队列
*
* @return
*/
@Bean
public Binding ttlBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with(TTL_ROUTING_KEY);
}

实现发送ttl消息

在rabbitmq-producer模块的RabbitMQProducerService类中添加一个接口方法:

1
2
3
4
5
6
/**
* 发送ttl消息
*
* @param message
*/
void sendMessageTll(String message);

在rabbitmq-producer模块的RabbitMQProducerServiceImpl类中实现上面的接口方法:

1
2
3
4
5
6
7
8
@Override
public void sendMessageTll(String message) {
MessageProperties messageProperties = new MessageProperties();
//设置过期时间为10秒
messageProperties.setExpiration("10000");
Message msg = new Message(message.getBytes(), messageProperties);
rabbitTemplate.send(RabbitMQConfig.TTL_DIRECT_EXCHANGE, RabbitMQConfig.TTL_ROUTING_KEY, msg);
}

在sendInformationPage.html中添加一个form标签:

1
2
3
4
5
<form action="/sendTllMessage" method="post">
<h2>发送tll消息</h2>
消息:<input type="text" name="message" required="required">
<input type="submit" value="发送">
</form>

在rabbitmq-producer模块的IndexController类添加sendTllMessage方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 发送tll消息
*
* @param message
* @return
*/
@ResponseBody
@RequestMapping("/sendTllMessage")
public String sendTllMessage(String message) {
rabbitMQProducerService.sendMessageTll(message);
return "发送消息成功<a href='/toSendInformationPage'><button>继续发送</button></a>";
}

测试

先启动消费者模块再启动生产者模块,浏览器地址栏输入 http://localhost/toSendInformationPage ,发送ttl消息,然后去RabbitMQ管理系统中查看队列的消息,刚刚发送的消息过了10秒(从发送成功开始计算时间)后会自动从队列中删除

给队列设置

直接修改RabbitMQConfig类的ttlQueue()方法

修改前:

1
2
3
4
5
6
7
8
9
/**
* 定义一个ttl队列绑定ttl交换机
*
* @return
*/
@Bean
public Queue ttlQueue() {
return new Queue(TTL_QUEUE);
}

修改后:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 定义一个ttl队列绑定ttl交换机
*
* @return
*/
@Bean
public Queue ttlQueue() {
Map<String, Object> map = new HashMap<>(16);
map.put("x-message-ttl", 8000);
//参数一为队列名称,参数二为是否持久化,参数三是否独享、排外的,参数四为是否自动删除,参数五为队列的其他属性参数
return new Queue(TTL_QUEUE, true, false, false, map);
}
队列参数说明
  1. name: 队列的名称

  2. actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个

  3. durable: 是否持久化

  4. exclusive: 是否独享、排外的

  5. autoDelete: 是否自动删除

  6. arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments

(1)x-message-ttl:消息的过期时间,单位:毫秒

(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒

(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息

(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息

(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head

(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中

(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值

(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级

(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息

(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息

测试

先删除之前创建好的队列ttlQueue,启动消费者模块再启动生产者模块,浏览器地址栏输入 http://localhost/toSendInformationPage ,发送ttl消息,然后去RabbitMQ管理系统中查看队列的消息,刚刚发送的消息过了8秒(从发送成功开始计算时间)后会自动从队列中删除(消息的过期时间为10秒,消息所在队列的过期时间为8秒,所以消息的实际过期时间为8秒)