简介

主题模式和路由模式很像,但是路由模式是精确匹配,主题模式是模糊匹配,更加灵活,更加强大

主题模式使用的是topic类型的交换机,使用通配符方式实现模糊匹配,匹配符有:*和#

通配符 匹配方式
* 匹配一个单词
# 匹配零个或者多个单词

对上图的3中路由key进行解释:

*.orange.* :orange前后各有一个单词,例如:lazy.orange.rabbit、quick.orange.dog等情况

*.*.rabbit :rabbit前面后两个单词,例如:lazy.green.rabbit、quick.orange.rabbit等情况

lazy.# :lazy后面有0个或无数个单词,例如:lazy,lazy.rabbit、lazy.red.rabbit、lazy.red.old.rabbit等情况

实现过程

发送topic消息

为了方便测试,在rabbitmq-producer模块下添加一个发送指定类型消息功能,首先在sendInformationPage.html中添加一个form标签:

1
2
3
4
5
<form action="/sendTopicMessage" method="post">
<h2>发送topic消息</h2>
消息:<input type="text" name="message" required="required">
<input type="submit" value="发送">
</form>

在rabbitmq-common模块的RabbitMQConfig类添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* topic交换机名称
*/
public static final String TOPIC_EXCHANGE = "topicExchange";

/**
* topic队列1名称
*/
public static final String TOPIC_QUEUE1 = "topicQueue1";

/**
* topic队列2名称
*/
public static final String TOPIC_QUEUE2 = "topicQueue2";

/**
* 定义一个topic交换机
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

/**
* 定义一个topic队列1
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1);
}

/**
* 定义一个topic队列2
*
* @return
*/
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2);
}

/**
* topic队列1绑定topic交换机,routingKey为 *.orange.*
*
* @return
*/
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");
}

/**
* topic队列1绑定topic交换机,routingKey为 *.*.rabbit
*
* @return
*/
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
}

/**
* topic队列1绑定topic交换机,routingKey为 lazy.#
*
* @return
*/
@Bean
public Binding topicBinding3() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");
}

在RabbitMQProducerService类添加一个接口方法:

1
2
3
4
5
6
7
/**
* 发送topic消息
*
* @param message
* @param routingKey
*/
void sendTopicMessage(String message, String routingKey);

在RabbitMQProducerServiceImpl类中实现上面的接口方法:

1
2
3
4
@Override
public void sendTopicMessage(String message,String routingKey) {
amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);
}

在IndexController类添加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 发送topic消息
*
* @param message
* @return
*/
@ResponseBody
@RequestMapping("/sendTopicMessage")
public String sendTopicMessage(String message) {
rabbitMQProducerService.sendTopicMessage(message, message);
return "发送topic消息成功<a href='/toSendInformationPage'><button>继续发送</button></a>";
}

消费消息

在rabbitmq-consumer模块的RabbitMQConsumerService中添加以下接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 监听队列接收topic消息1
*
* @param message
*/
void receiveTopicMessage1(String message);

/**
* 监听队列接收topic消息2
*
* @param message
*/
void receiveTopicMessage2(String message);

在rabbitmq-consumer模块的RabbitMQConsumerServiceImpl中实现上面的接口方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE1})
public void receiveTopicMessage1(String message) {
System.out.println("消费者1---接收到的消息:" + message);
}

@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})
public void receiveTopicMessage2(String message) {
System.out.println("消费者2---接收到的消息:" + message);
}

测试

启动rabbitmq-consumer模块再启动rabbitmq-producer模块,消费者1消费的是topicQueue1队列的消息,消费者2消费的是topicQueue2队列的消息,交换机和队列绑定情况如下:

然后在浏览器地址栏输入:http://localhost/toSendInformationPage 进入发送消息页面,分别发送topic消息:lazy.orange.rabbit、lazy.red.old.rabbit、lazy.rabbit、orange.rabbit、quick.orange.dog,然后去rabbitmq-consumer模块启动控制台查看结果