大家好,我是了不起。
RabbitMQ 是一个流行的开源消息队列软件,它提供了多种通信模型,例如发布/订阅模型、路由模型、work模型等。在前面的文章中我们已经介绍了前四种模型,本文将会学习 RabbitMQ 中的 Topic 模型;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~
往期传送门
Topic 模型
Topic 模型是 RabbitMQ 的高级模型之一,Topic 模型使用了通配符的概念,可以匹配更灵活的路由规则。topic模式相当于是对路由模式的一个升级,topic模式主要就是在匹配的规则上可以实现模糊匹配。
在 Topic 模型中,生产者将消息发送到交换机,交换机根据消息的 routing key 将消息转发到对应的队列中。与 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 ‘*’ 可以匹配一个单词,’#’ 可以匹配多个单词。例如,”order.*” 可以匹配 “order.create”,”order.delete” 等消息,而 “order.#” 可以匹配 “order.create.one”,”order.delete.two” 等消息。
适用场景
Topic 模型适用于需要灵活的消息路由规则的场景,例如:
- 新闻网站订阅分类消息;
- 电商网站订阅商品分类消息;
- 金融机构订阅股票市场消息等。
演示
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// 生产者 public class Producer { private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY1 = "topic.km"; private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); for (int i = 0; i < 100; i++) { // topic在路由模型的基础上,只有路由的key发生改变,其余的都不变 if (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes()); } else { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes()); } } channel.close(); connection.close(); } }
-
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 消费者1 public class Consumer1 { private static final String QUEUE_NAME = "queue_topic_1"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.*"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 消费者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_topic_2"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.#"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
测试
先启动2个消费者,再启动生产者
消费者1订阅的是 “order.*” 的消息,消费者2订阅的是 “order.#” 的消息,可以得到以下结果:
消费者1接收到的消息是:”Topic 模型发送的偶数条消息”
消费者2接收到的消息是:”Topic 模型发送的全部消息”
小结
本文介绍了 RabbitMQ 通信模型中的 Topic 模型的使用,通过交换机和 routing key 实现更灵活的消息路由。在实际使用过程中,需要注意以下几点:
- 路由键的格式应该是多个单词组成,用 ‘.’ 分隔;
- ’#’ 匹配多个单词,’*’ 匹配一个单词;
- 一个队列可以绑定多个 routing key;
- 如果交换机没有匹配到任何一个队列,则会抛弃该消息。
后续了不起还会继续更新RabbitMQ的系列文章,感兴趣的小伙伴持续关注哦~