简介

跟订阅模式类似,只不过在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列。

使用场景:只将一些错误log存到文件中,把所有的log都打印到控制台里

实现过程

发送指定类型消息

为了方便测试,在rabbitmq-producer模块下添加一个发送指定类型消息功能,消息类型有info、error、warning

首先在sendInformationPage.html中添加一个form标签:

1
2
3
4
5
6
7
8
9
10
11
12
<form action="/sendRoutingMessage">
<h2>发送指定类型的消息</h2>
消息:
<input type="text" name="message" required="required"><br>
类型:
<select name="routingKey">
<option value="info">info</option>
<option value="error">error</option>
<option value="warning">warning</option>
</select><br>
<input type="submit" value="发送">
</form>

在rabbitmq-common模块的RabbitMQConfig类添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* direct路由模式交换机名称
*/
public static final String DIRECT_ROUTING_EXCHANGE = "directRoutingExchange";

/**
* direct路由模式队列1
*/
public static final String DIRECT_ROUTING_QUEUE1 = "directRoutingQueue1";

/**
* direct路由模式队列2
*/
public static final String DIRECT_ROUTING_QUEUE2 = "directRoutingQueue2";

/**
* 定义一个direct路由模式交换机
*
* @return
*/
@Bean
public DirectExchange directRoutingExchange() {
return new DirectExchange(DIRECT_ROUTING_EXCHANGE);
}

/**
* 定义一个路由模式队列1
*
* @return
*/
@Bean
public Queue routingQueue1() {
return new Queue(DIRECT_ROUTING_QUEUE1);
}

/**
* 定义一个路由模式队列2
*
* @return
*/
@Bean
public Queue routingQueue2() {
return new Queue(DIRECT_ROUTING_QUEUE2);
}

/**
* 路由模式队列1绑定路由模式交换机,routingKey为error
*
* @return
*/
@Bean
public Binding bindingRouting1() {
return BindingBuilder.bind(routingQueue1()).to(directRoutingExchange()).with("error");
}

/**
* 路由模式队列2绑定路由模式交换机,routingKey为error
*
* @return
*/
@Bean
public Binding bindingRouting2() {
return BindingBuilder.bind(routingQueue2()).to(directRoutingExchange()).with("error");
}

/**
* 路由模式队列2绑定路由模式交换机,routingKey为info
*
* @return
*/
@Bean
public Binding bindingRouting3() {
return BindingBuilder.bind(routingQueue2()).to(directRoutingExchange()).with("info");
}

/**
* 路由模式队列2绑定路由模式交换机,routingKey为warning
*
* @return
*/
@Bean
public Binding bindingRouting4() {
return BindingBuilder.bind(routingQueue2()).to(directRoutingExchange()).with("warning");
}

在RabbitMQProducerService类添加一个接口方法:

1
2
3
4
5
6
7
/**
* 发送路由模式消息
*
* @param message
* @param routingKey
*/
void sendRoutingMessage(String message, String routingKey);

在RabbitMQProducerServiceImpl类中实现上面的接口方法:

1
2
3
4
@Override
public void sendRoutingMessage(String message, String routingKey) {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_ROUTING_EXCHANGE,routingKey,message);
}

在IndexController类添加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 发送路由模式消息
*
* @param message
* @param routingKey
* @return
*/
@ResponseBody
@RequestMapping("/sendRoutingMessage")
public String sendRoutingMessage(String message, String routingKey) {
rabbitMQProducerService.sendRoutingMessage(message+"("+routingKey+")", routingKey);
return "发送路由模式消息成功<a href='/toSendInformationPage'><button>继续发送</button></a>";
}

消费消息

在rabbitmq-consumer模块的RabbitMQConsumerService中添加以下接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 监听队列接收路由模式消息1
*
* @param message
*/
void receiveRoutingMessage1(String message);

/**
* 监听队列接收路由模式消息2
*
* @param message
*/
void receiveRoutingMessage2(String message);

在rabbitmq-consumer模块的RabbitMQConsumerServiceImpl中实现上面的接口方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_ROUTING_QUEUE1})
public void receiveRoutingMessage1(String message) {
System.out.println("消费者1---接收到的消息:" + message);
}

@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_ROUTING_QUEUE2})
public void receiveRoutingMessage2(String message) {
System.out.println("消费者2---接收到的消息:" + message);
}

测试

启动rabbitmq-consumer模块再启动rabbitmq-producer模块,,然后在浏览器地址栏输入:http://localhost/toSendInformationPage 进入发送消息页面,分别发送info、error、warning类型的路由模式消息,然后去rabbitmq-consumer模块启动控制台查看结果,可以看到:①消费者1只消费了error类型的消息②消费者2消费了info、error、warning类型的消息