4-RabbitMQ工作模式-Work queues工作队列模式

4-RabbitMQ工作模式-Work queues工作队列模式Work queues工作队列模式1. 模式说明Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队

欢迎大家来到IT世界,在知识的湖畔探索吧!

Work queues工作队列模式

1. 模式说明

4-RabbitMQ工作模式-Work queues工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

2. 案例

Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多个消费者进行多个消费者同时消费消息的测试。

1)生产者

编写往 rabbitmq 发送 10 条消息。

4-RabbitMQ工作模式-Work queues工作队列模式

package com.lijw.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Aron.li * @date 2022/3/2 8:26 */ public class Producer_WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5.创建队列 Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("work_queues", true, false, false, null); //6. 发送消息 channel.basicPublish /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ for (int i = 0; i < 10; i++) { String body = "hello rabbitmq~~~~ " + i; channel.basicPublish("", "work_queues", null, body.getBytes()); } //7. 释放资源 channel.close(); connection.close(); } }

欢迎大家来到IT世界,在知识的湖畔探索吧!


2)消费者1

4-RabbitMQ工作模式-Work queues工作队列模式

欢迎大家来到IT世界,在知识的湖畔探索吧!package com.lijw.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Aron.li * @date 2022/3/2 16:16 */ public class Consumer_WorkQueues1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("work_queues", true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume("work_queues",true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 } }


3)消费者2

4-RabbitMQ工作模式-Work queues工作队列模式

package com.lijw.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Aron.li * @date 2022/3/2 16:16 */ public class Consumer_WorkQueues2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("work_queues", true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume("work_queues",true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 } }

3. 测试

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

  • 消费者1收到的消息:
4-RabbitMQ工作模式-Work queues工作队列模式

  • 消费者2收到的消息
4-RabbitMQ工作模式-Work queues工作队列模式

4. 小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/69519.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信