大家好,我是指北君。
今天指北君带领大家接着学习RabbitMQ,了解RabbitMQ的五大通信模型之一的Work模型;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~
回顾
上一篇文章中,简单的介绍了一下RabbitMQ,以及安装和hello world。
有的小伙伴留言说看不懂其中的方法参数,这里先解释一下几个基本的方法参数。
1 |
|
1 |
|
Work模型
work模型称为工作队列或者竞争消费者模式,多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的削峰。
演示
写个简单的测试:
-
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class Producer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes()); } channel.close(); connection.close(); } }
-
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 消费者1 public class Consumer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消费者1接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// 消费者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消费者2接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); // 这里加了个延迟,表示处理业务时间 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
-
结果
可以看出来:100条消息,消费者之间是平分的,消费者1 几乎是瞬间完成,消费者2 则是慢慢吞吞的运行完毕,消费者1大量时间处于空闲状态,消费者2则一直忙碌。这显然是不适用于实际开发中。
我们需要遵从一个原则,就是 能者多劳 ,消费越快的人,消费的越多;
现在我们把消费者1和2的代码中 // channel.basicQos(0, 1, false);
这行代码取消注释,再次运行;
现在的结果就比较符合能者多劳,虽然你干的多,但是工资是一样的呀~
work模型的一个主要的方法是basicQos()
;这里也解释一下其参数:
1 |
|
小结
本文到这里就结束了,主要介绍了RabbitMQ通信模型中的work模型,适用于限流、削峰等应用场景。
后续指北君还会继续更新RabbitMQ的系列文章,感兴趣的小伙伴持续关注哦~