RabbitMQ-队列

RabbitMQ-队列下面创建一个工作队列,向多个消费者分发耗时的任务图是这样子的代码学习生产者packagecom.tgb.kwy.workqueues;impor

欢迎大家来到IT世界,在知识的湖畔探索吧!

写了一个小demo, 用于发送和接收消息,下面创建一个工作队列,向多个消费者分发耗时的任务

图是这样子的

RabbitMQ-队列

代码学习

生产者

package com.tgb.kwy.workqueues;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.MessageProperties;

/**

* Description

*生产

* @author kongwy @163.com

* @version 1.0

* @date 2018-07-08-20 -58

*/

public class NewTask {

private static final String TASK_QUEUE_NAME=”task_queue”;

public static void main(String[] args) throws Exception{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost(“192.168.159.132”);/*设置rabbitmq所在主机ip或主机名*/

/*指定用户名和密码*/

factory.setUsername(“admin”);

factory.setPassword(“admin”);

Connection connection=factory.newConnection();

Channel channel=connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

String message = getMessage(args);

channel.basicPublish(“”, TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(“UTF-8”));

System.out.println(” [x] Sent ‘” + message + “‘”);

channel.close();

connection.close();

}

private static String getMessage(String[] strings){

if(strings.length<1){

return “Hello World!”;

}

return joinStrings(strings,” “);

}

private static String joinStrings(String[] strings,String delimiter){

int length=strings.length;

if(length==0){

return “”;

}

StringBuilder words=new StringBuilder(strings[0]);

for (int i = 0; i < length; i++) {

words.append(delimiter).append(strings[i]);

}

return words.toString();

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

消费者代码

package com.tgb.kwy.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* Description

*

* @author kongwy @163.com

* @version 1.0

* @date 2018-07-08-21 -03

*/

public class Worker {

private static final String TASK_QUEUE_NAME=”task_queue”;

public static void main(String[] args) throws Exception{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost(“192.168.159.132”);/*设置rabbitmq所在主机ip或主机名*/

/*指定用户名和密码*/

factory.setUsername(“admin”);

factory.setPassword(“admin”);

final Connection connection=factory.newConnection();

final Channel channel=connection.createChannel();

//声明队列,主要为了防止消息接受者先运行此程序,队列还不存在时创建队列

channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);

//channel.basicQos(1); // accept only one unack-ed message at a time (see below)

int prefetchCount=1;

channel.basicQos(prefetchCount);

final Consumer consumer=new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumeTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{

String message=new String(body,”UTF-8″);

System.out.println(” [x] Received ‘” + message + “‘”);

try{

doWork(message);

}finally {

System.out.println(“[x] Done”);

channel.basicAck(envelope.getDeliveryTag(),false);

}

}

};

boolean autoAck = true; // acknowledgment is covered below

channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

}

private static void doWork(String task){

for(char ch: task.toCharArray()){

if(ch == ‘.’){

try {

Thread.sleep(1000);

}catch (InterruptedException _ignored){

Thread.currentThread().interrupt();

}

}

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

运行结果

RabbitMQ-队列

RabbitMQ-队列

任务分发

轮询分发

rabbitmq将逐个发送消息到序列中的写一个消费者,平均每个消费者获得的数量是相同的

Fair dispatch

如果这里有两个消费者的话,可能一个消费者非常的忙碌,但是另一个消费者,几乎一直闲着.但是rabbitmq,还是会均匀地发送消息. 因为当消息进入队列后, rabbitmq就会分配消息,不会看消费者未确认消息的数量.所以为了解决这个问题,就使用了basicQos(prefetchCount=1)方法,来限制rabbitmq只发不超过1条的消息给同一个消费者.当消费者处理完毕后,得到反馈,再发送下一次

注:代码来自官网.官网对于概念性的内容,讲解的还是很清楚的

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/69500.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信