简介

生产者生产的消息,所有订阅过的消费者都能够接收到消息

Exchange采用Fanout类型,即广播方式

Fanout类型的交换机会把消息发送到所有绑定到该交换机的队列

适合场景:将同一条消息通过手机短信,APP,邮件等方式推送给用户

实现过程

批量发送订阅消息

为了方便测试,在rabbitmq-producer模块下添加一个批量发送消息功能,可以自定义批量发送的消息数量

首先在sendInformationPage.html中添加一个form标签:

1
2
3
4
5
<form action="/sendFanoutMessageBatch">
<h2>批量发送订阅消息</h2>
数量:<input type="text" name="messageCount">
<input type="submit" value="批量发送">
</form>

在rabbitmq-common模块的RabbitMQConfig类添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
`/**
* fanout交换机名称
*/
public static final String FANOUT_EXCHANGE = "fanoutExchange";

/**
* fanout订阅队列1名称
*/
public static final String SUB_QUEUE1 = "subQueue1";

/**
* fanout订阅队列2名称
*/
public static final String SUB_QUEUE2 = "subQueue2";

/**
* 定义一个fanout交换机,所有订阅了这个交换机的消息队列都会接收到消息
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

/**
* 定义一个fanout订阅队列1
*
* @return
*/
@Bean
public Queue subQueue1() {
return new Queue(SUB_QUEUE1);
}

/**
* 定义一个fanout订阅队列2
*
* @return
*/
@Bean
public Queue subQueue2() {
return new Queue(SUB_QUEUE2);
}

/**
* 定义一个fanout订阅队列1和fanout交换机的绑定
*
* @return
*/
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
}

/**
* 定义一个fanout订阅队列2和fanout交换机的绑定
*
* @return
*/
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
}

在RabbitMQProducerService类添加一个接口方法:

1
2
3
4
5
6
/**
* 发送fanout消息
*
* @param message
*/
void sendFanoutMessage(String message);

在RabbitMQProducerServiceImpl类中实现上面的接口方法:

1
2
3
4
@Override
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", message);
}

在IndexController类添加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 批量发送订阅消息
*
* @param messageCount
* @return
*/
@ResponseBody
@RequestMapping("/sendFanoutMessageBatch")
public String sendFanoutMessageBatch(int messageCount) {
for (int i = 1; i < messageCount + 1; i++) {
rabbitMQProducerService.sendFanoutMessage("订阅消息" + i);
}
return "批量发送消息到fanout交换机成功<a href='/toSendInformationPage'><button>继续发送</button></a>";
}

消费消息

在rabbitmq-consumer模块的RabbitMQConsumerService中添加以下接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 监听队列接收订阅消息
*
* @param message
*/
void receiveSubMessage1(String message);

/**
* 监听队列接收订阅消息
*
* @param message
*/
void receiveSubMessage2(String message);

在rabbitmq-consumer模块的RabbitMQConsumerServiceImpl中实现上面的接口方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
public void receiveSubMessage1(String message) {
System.out.println("订阅者1---接收到的消息:" + message);
}

@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
public void receiveSubMessage2(String message) {
System.out.println("订阅者2---接收到的消息:" + message);
}

测试

启动rabbitmq-consumer模块再启动rabbitmq-producer模块,,然后在浏览器地址栏输入:http://localhost/toSendInformationPage 进入发送消息页面,根据自己的需要填入批量发送的消息数量,然后点击发送订阅信息,最后去rabbitmq-consumer模块启动控制台查看结果,可以看到两个订阅者都收到了同一条消息