`

转-项目中用到rabbitMQ的地方2

 
阅读更多

1.消费者端

Java代码 复制代码 收藏代码
  1. package com.guoxin.rabbitmq;
  2. import java.io.IOException;
  3. import com.guoxin.parser.download.CreateLog;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.QueueingConsumer;
  8. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  9. publicclass Consumer1 {
  10. privatefinal String EXCHANGE_NAME = "direct_logs";
  11. privatefinal String QUEUE_NAME = "StaticQueue";
  12. boolean flag = true;
  13. // 创建链接工厂
  14. privatestatic ConnectionFactory factory;
  15. // 创建链接
  16. Connection connection = null;
  17. Channel channel = null;
  18. String task = "";
  19. privatestaticint a=0;
  20. public String consumer() {
  21. try {
  22. factory=Consumer1.getConnectionFactory();
  23. factory.setRequestedChannelMax(20);
  24. factory.setRequestedFrameMax(0);
  25. // factory=new ConnectionFactory();
  26. factory.setHost("192.168.1.22");
  27. connection = factory.newConnection();
  28. // 创建信息通道
  29. channel = connection.createChannel();
  30. try {
  31. // 声明Exchange非持久化
  32. channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
  33. // 参数说明:第�?��参数为队列的名字。第二个参数为:设置为true,则rabbitmq在重启之后会重新建立
  34. // 第三个参数:设置为true,标识只有创建了该队列的消费者才可以去连接该队列
  35. // 第四个参数:设置为false为最后一个消费�?断开后不会自动删除�?
  36. // 第五个参数为:队列的其他参数,没有就可以设置为空
  37. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  38. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "StaticState");
  39. DeclareOk a = channel.queueDeclarePassive(QUEUE_NAME);
  40. //获取队列中的消息个数
  41. int c = a.getMessageCount();
  42. // System.out.println(c);
  43. // System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  44. // 声明一个消费者
  45. QueueingConsumer consumer = new QueueingConsumer(channel);
  46. // 新消息到达的回调
  47. channel.basicConsume(QUEUE_NAME, false, consumer);
  48. channel.basicQos(1);
  49. while (flag) {
  50. // QueueingConsumer.Delivery封装一个消息,和bean的结构相同
  51. QueueingConsumer.Delivery delivery = consumer
  52. .nextDelivery();
  53. // 获取消息
  54. String message = new String(delivery.getBody());
  55. task = message;
  56. channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
  57. false);
  58. // System.out.println(task);
  59. break;
  60. }
  61. } catch (Exception e) {
  62. e.printStackTrace(); }
  63. } catch (IOException e1) {
  64. e.printStackTrace(); } finally {
  65. try {
  66. channel.close();
  67. connection.close();
  68. } catch (IOException e) {
  69. e.printStackTrace(); }
  70. }
  71. return task;
  72. }
  73. publicsynchronizedstatic ConnectionFactory getConnectionFactory(){
  74. if(factory==null){
  75. a++;
  76. System.out.println("获取次数:"+a);
  77. factory=new ConnectionFactory();
  78. }
  79. return factory;
  80. }
  81. publicstaticvoid main(String[] argv) throws Exception {
  82. Consumer1 consumer1 = new Consumer1();
  83. consumer1.consumer();
  84. }
  85. }
package com.guoxin.rabbitmq;

import java.io.IOException;


import com.guoxin.parser.download.CreateLog;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;

public class Consumer1 {

	private final String EXCHANGE_NAME = "direct_logs";
	private final String QUEUE_NAME = "StaticQueue";
	boolean flag = true;
	// 创建链接工厂
	private static ConnectionFactory factory;
	// 创建链接
	Connection connection = null;
	Channel channel = null;
	String task = "";
	private static int  a=0;
	public String consumer() {
		try {
			factory=Consumer1.getConnectionFactory();
			factory.setRequestedChannelMax(20);
			factory.setRequestedFrameMax(0);

//			factory=new ConnectionFactory();
			factory.setHost("192.168.1.22");
			connection = factory.newConnection();
			// 创建信息通道
			channel = connection.createChannel();
			try {
			// 声明Exchange非持久化
			channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
			// 参数说明:第�?��参数为队列的名字。第二个参数为:设置为true,则rabbitmq在重启之后会重新建立
			// 第三个参数:设置为true,标识只有创建了该队列的消费者才可以去连接该队列
			// 第四个参数:设置为false为最后一个消费�?断开后不会自动删除�?
			// 第五个参数为:队列的其他参数,没有就可以设置为空
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "StaticState");
			DeclareOk a = channel.queueDeclarePassive(QUEUE_NAME);
			//获取队列中的消息个数
			int c = a.getMessageCount();
//			System.out.println(c);
//			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			// 声明一个消费者
			QueueingConsumer consumer = new QueueingConsumer(channel);
			// 新消息到达的回调
			channel.basicConsume(QUEUE_NAME, false, consumer);
			channel.basicQos(1);
			while (flag) {
			           //  QueueingConsumer.Delivery封装一个消息,和bean的结构相同
			           QueueingConsumer.Delivery delivery = consumer
							.nextDelivery();
				// 获取消息
				String message = new String(delivery.getBody());
				task = message;
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
						false);
//				System.out.println(task);
				break;
				}
			} catch (Exception e) {
				e.printStackTrace();			}
		} catch (IOException e1) {
			e.printStackTrace();		} finally {
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();			}
		}
		return task;
	}
    public synchronized  static ConnectionFactory getConnectionFactory(){
    	
    	if(factory==null){
    		a++;
    		System.out.println("获取次数:"+a);
    		factory=new ConnectionFactory();
    		    	}
		return factory;
    }
	public static void main(String[] argv) throws Exception {
		Consumer1 consumer1 = new Consumer1();
		consumer1.consumer();
	}
}

2.生产者端

Java代码 复制代码 收藏代码
  1. package com.guoxin.util;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import org.quartz.JobExecutionContext;
  5. import org.quartz.JobExecutionException;
  6. import org.springframework.scheduling.quartz.QuartzJobBean;
  7. import com.guoxin.bean.Block;
  8. import com.guoxin.common.util.Config;
  9. import com.guoxin.common.webservice.BmsMailSend;
  10. import com.guoxin.log.CreateLog;
  11. import com.guoxin.service.Iservice.GetBlockUrlService;
  12. import com.rabbitmq.client.Channel;
  13. import com.rabbitmq.client.Connection;
  14. import com.rabbitmq.client.ConnectionFactory;
  15. import com.rabbitmq.client.MessageProperties;
  16. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  17. import com.sun.codemodel.JNullType;
  18. @SuppressWarnings("deprecation")
  19. publicclass DMSMethod extends QuartzJobBean{
  20. privatestaticfinal String EXCHANGE_NAME = "direct_logs";
  21. privatefinal String QUEUE_NAME = "StaticQueue";
  22. private GetBlockUrlService blockservice;
  23. public DMSMethod(){
  24. blockservice = (GetBlockUrlService)MyApplicationContextUtil.getContext().getBean("blockService");
  25. }
  26. protectedvoid executeInternal(JobExecutionContext arg0)
  27. throws JobExecutionException {
  28. ConnectionFactory factory;
  29. Connection connection = null;
  30. Channel channel = null;
  31. try {
  32. factory= new ConnectionFactory();
  33. factory.setHost("192.168.1.22");
  34. // factory.setRequestedChannelMax(20);
  35. // factory.setRequestedFrameMax(0);
  36. connection= factory.newConnection();
  37. channel=connection.createChannel();
  38. channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
  39. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  40. DeclareOk a = channel.queueDeclarePassive(QUEUE_NAME);
  41. int c = a.getMessageCount();
  42. //获得静态版块的链接
  43. List<Block> staticList=blockservice.queryBlockUrlStatic();
  44. int abc=0;
  45. for(Block block:staticList){
  46. String task=block.getBlock_url();
  47. if(c<15){
  48. channel.basicPublish(EXCHANGE_NAME,"StaticState", MessageProperties.PERSISTENT_TEXT_PLAIN, task.getBytes());
  49. System.out.println("队列中剩余:"+c);
  50. }
  51. }
  52. } catch (Exception e) {
  53. BmsMailSend.sendMail(Short.valueOf(Config.getAttribute("systemNo")),"亲,又发生异常了!" ,(new Throwable().getStackTrace()[0]).toString()+ e.toString());
  54. CreateLog.getCreateLogInstance("errorLog").CreateLogs(e,
  55. "com.guoxin.util.DMSMethod");
  56. }finally{
  57. try {
  58. channel.close();
  59. connection.close();
  60. } catch (IOException e) {
  61. System.out.println("异常发生");
  62. BmsMailSend.sendMail(Short.valueOf(Config.getAttribute("systemNo")),"亲,又发生异常了!" ,(new Throwable().getStackTrace()[0]).toString()+ e.toString());
  63. CreateLog.getCreateLogInstance("errorLog").CreateLogs(e,
  64. "com.guoxin.util.DMSMethod");
  65. }
  66. }
  67. }
  68. public GetBlockUrlService getBlockservice() {
  69. return blockservice;
  70. }
  71. }
package com.guoxin.util;

import java.io.IOException;
import java.util.List;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import com.guoxin.bean.Block;
import com.guoxin.common.util.Config;
import com.guoxin.common.webservice.BmsMailSend;
import com.guoxin.log.CreateLog;
import com.guoxin.service.Iservice.GetBlockUrlService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.sun.codemodel.JNullType;

@SuppressWarnings("deprecation")
public class DMSMethod extends QuartzJobBean{
	private static final String EXCHANGE_NAME = "direct_logs";
	private final String QUEUE_NAME = "StaticQueue";
	private GetBlockUrlService blockservice;
	
	public DMSMethod(){
	   blockservice = (GetBlockUrlService)MyApplicationContextUtil.getContext().getBean("blockService");
	}


	protected void executeInternal(JobExecutionContext arg0)
			throws JobExecutionException {
		ConnectionFactory factory;
		Connection connection = null;
		Channel channel = null;
		try {
			factory= new ConnectionFactory();
        	factory.setHost("192.168.1.22");
//        	factory.setRequestedChannelMax(20);
//			factory.setRequestedFrameMax(0);
        	connection= factory.newConnection();
        	channel=connection.createChannel();
			channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			DeclareOk a = channel.queueDeclarePassive(QUEUE_NAME);
			int c = a.getMessageCount();

			//获得静态版块的链接
			List<Block> staticList=blockservice.queryBlockUrlStatic();
			int abc=0;
			for(Block block:staticList){
		    	  String task=block.getBlock_url();
		    	  if(c<15){
		    		channel.basicPublish(EXCHANGE_NAME,"StaticState", MessageProperties.PERSISTENT_TEXT_PLAIN, task.getBytes());
					System.out.println("队列中剩余:"+c);
					}
		    	}  
    	} catch (Exception e) {
    		BmsMailSend.sendMail(Short.valueOf(Config.getAttribute("systemNo")),"亲,又发生异常了!" ,(new Throwable().getStackTrace()[0]).toString()+ e.toString());
    		CreateLog.getCreateLogInstance("errorLog").CreateLogs(e,
			"com.guoxin.util.DMSMethod");
		}finally{
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				System.out.println("异常发生");
				BmsMailSend.sendMail(Short.valueOf(Config.getAttribute("systemNo")),"亲,又发生异常了!" ,(new Throwable().getStackTrace()[0]).toString()+ e.toString());
	    		CreateLog.getCreateLogInstance("errorLog").CreateLogs(e,
				"com.guoxin.util.DMSMethod");
			}
		   
		}
		
	}

	public GetBlockUrlService getBlockservice() {
		return blockservice;
	}



}

以上是在项目中的实际应用,DMSMethod类是采用了spring的定时器来加载的,定时向队列中插入数据,当队列中的数据个数小于15就往队列中放入新的数据。同时查询是调用的service层的接口。在消费者端,我采用的是将一个链接取出来然后就跳出。一般是消费者端是一个死循环,时时刻刻接收生产者往队列中放入得数据。

 

2.以下是几个不同交换机类型的小例子

1).第一种:发布式的,此种类型交换机忽略路由键值,会将消息发送到它所能知道的队列中

Java代码 复制代码 收藏代码
  1. package com.mq.PublishSubscribe;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5. /**
  6. * 把消息分发给所有的消费者fanout交换器很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。这正是我们的日志系统所需要的。
  7. * @author Administrator
  8. *
  9. */
  10. publicclass EmitLog {
  11. privatestaticfinal String EXCHANGE_NAME = "logs";
  12. publicstaticvoid main(String[] argv) throws Exception {
  13. //创建链接工厂
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost("192.168.1.22");
  16. //创建链接
  17. Connection connection = factory.newConnection();
  18. //创建信息通道
  19. Channel channel = connection.createChannel();
  20. //声明Exchange非持久化
  21. //channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);这是一个持久化的交换机
  22. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  23. //消息
  24. String message = getMessage(argv);
  25. String message1="删除";
  26. String message2="shanchu1";
  27. //发送消息,这个发送消息是忽略路由键值的。它会将消息发送给所有与这个交换机绑定的队列上
  28. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  29. channel.basicPublish(EXCHANGE_NAME, "", null, message1.getBytes());
  30. channel.basicPublish(EXCHANGE_NAME, "", null, message2.getBytes());
  31. System.out.println(" [x] Sent '" + message + "'");
  32. channel.close();
  33. connection.close();
  34. }
  35. privatestatic String getMessage(String[] strings){
  36. if (strings.length < 1)
  37. return"info: Hello World!";
  38. return joinStrings(strings, " ");
  39. }
  40. privatestatic String joinStrings(String[] strings, String delimiter) {
  41. int length = strings.length;
  42. if (length == 0) return"";
  43. StringBuilder words = new StringBuilder(strings[0]);
  44. for (int i = 1; i < length; i++) {
  45. words.append(delimiter).append(strings[i]);
  46. }
  47. return words.toString();
  48. }
  49. }
package com.mq.PublishSubscribe;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
/**
 * 把消息分发给所有的消费者fanout交换器很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。这正是我们的日志系统所需要的。


 * @author Administrator
 *
 */
public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    //创建链接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.22");
    //创建链接
    Connection connection = factory.newConnection();
    //创建信息通道
    Channel channel = connection.createChannel();
    //声明Exchange非持久化
    //channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);这是一个持久化的交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    //消息
    String message = getMessage(argv);
    String message1="删除";
    String message2="shanchu1";
    //发送消息,这个发送消息是忽略路由键值的。它会将消息发送给所有与这个交换机绑定的队列上
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    channel.basicPublish(EXCHANGE_NAME, "", null, message1.getBytes());
    channel.basicPublish(EXCHANGE_NAME, "", null, message2.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
    	    return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

 

Java代码 复制代码 收藏代码
  1. package com.mq.PublishSubscribe;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Envelope;
  7. import com.rabbitmq.client.QueueingConsumer;
  8. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  9. import com.rabbitmq.client.AMQP.Queue.DeleteOk;
  10. publicclass ReceiveLogs {
  11. privatestaticfinal String EXCHANGE_NAME = "logs";
  12. publicstaticvoid main(String[] argv) throws Exception {
  13. //创建链接工厂
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost("192.168.1.22");
  16. //创建链接
  17. Connection connection = factory.newConnection();
  18. //创建信息通道
  19. Channel channel = connection.createChannel();
  20. //声明Exchange非持久化
  21. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  22. //采用系统的自动声明的消息队列名
  23. String queueName = channel.queueDeclare().getQueue();
  24. //将队列绑定到交换机上,没有设定路由键值的,也就是说与这个交换机绑定的所有的队列都会接收到交换机中的消息
  25. channel.queueBind(queueName, EXCHANGE_NAME, "");
  26. // 参数说明:第一个参数为队列的名字。第二个参数为:设置为true,则rabbitmq在重启之后会重新建立
  27. //第三个参数:设置为true,标识只有创建了该队列的消费者才可以去连接该队列
  28. //第四个参数:设置为false为最后一个消费者断开后不会自动删除。
  29. //第五个参数为:队列的其他参数,没有就可以设置为空
  30. channel.queueDeclare("queue123", true, false, false, null);
  31. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  32. //声明一个消费者
  33. QueueingConsumer consumer = new QueueingConsumer(channel);
  34. //新消息到达的回调
  35. channel.basicConsume(queueName, true, consumer);
  36. while (true) {
  37. //QueueingConsumer.Delivery封装一个消息,和bean的结构相同
  38. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  39. //获取消息
  40. String message = new String(delivery.getBody());
  41. //获取消息信封
  42. Envelope message1 =delivery.getEnvelope();
  43. //获取消息属性
  44. AMQP.BasicProperties message2 =delivery.getProperties();
  45. System.out.println(" [x] Received '" + message + "'");
  46. System.out.println(" [x] Received Envelope'" + message1 + "'");
  47. System.out.println(" [x] Received AMQP.BasicProperties'" + message2 + "'");
  48. }
  49. }
  50. }
package com.mq.PublishSubscribe;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.AMQP.Queue.DeleteOk;

public class ReceiveLogs {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
	//创建链接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.22");
    //创建链接
    Connection connection = factory.newConnection();
    //创建信息通道
    Channel channel = connection.createChannel();
    //声明Exchange非持久化
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    //采用系统的自动声明的消息队列名
    String queueName = channel.queueDeclare().getQueue();
    //将队列绑定到交换机上,没有设定路由键值的,也就是说与这个交换机绑定的所有的队列都会接收到交换机中的消息
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    // 参数说明:第一个参数为队列的名字。第二个参数为:设置为true,则rabbitmq在重启之后会重新建立
    //第三个参数:设置为true,标识只有创建了该队列的消费者才可以去连接该队列
    //第四个参数:设置为false为最后一个消费者断开后不会自动删除。
    //第五个参数为:队列的其他参数,没有就可以设置为空
    channel.queueDeclare("queue123", true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    //声明一个消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //新消息到达的回调
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      //QueueingConsumer.Delivery封装一个消息,和bean的结构相同
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      //获取消息
      String message = new String(delivery.getBody());
      //获取消息信封
      Envelope message1 =delivery.getEnvelope();
      //获取消息属性
      AMQP.BasicProperties message2 =delivery.getProperties();
      
      System.out.println(" [x] Received '" + message + "'");  
      System.out.println(" [x] Received Envelope'" + message1 + "'");
      System.out.println(" [x] Received AMQP.BasicProperties'" + message2 + "'");
    }
  }
}

 

2).特定类型的direct

Java代码 复制代码 收藏代码
  1. package com.mq.Routing;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5. publicclass EmitLogDirect {
  6. privatestaticfinal String EXCHANGE_NAME = "direct_logs";
  7. publicstaticvoid main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.1.22");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  13. String severity = getSeverity(argv);
  14. String message = getMessage(argv);
  15. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
  16. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  17. channel.close();
  18. connection.close();
  19. }
  20. privatestatic String getSeverity(String[] strings){
  21. if (strings.length < 1)
  22. return"info";
  23. return strings[0];
  24. }
  25. privatestatic String getMessage(String[] strings){
  26. if (strings.length < 2)
  27. return"Hello World!";
  28. return joinStrings(strings, " ", 1);
  29. }
  30. privatestatic String joinStrings(String[] strings, String delimiter, int startIndex) {
  31. int length = strings.length;
  32. if (length == 0 ) return"";
  33. if (length < startIndex ) return"";
  34. StringBuilder words = new StringBuilder(strings[startIndex]);
  35. for (int i = startIndex + 1; i < length; i++) {
  36. words.append(delimiter).append(strings[i]);
  37. }
  38. return words.toString();
  39. }
  40. }
package com.mq.Routing;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.22");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String severity = getSeverity(argv);
    String message = getMessage(argv);
    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    channel.close();
    connection.close();
  }
  private static String getSeverity(String[] strings){
    if (strings.length < 1)
    	    return "info";
    return strings[0];
  }
  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
    	    return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

 

Java代码 复制代码 收藏代码
  1. package com.mq.Routing;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. publicclass ReceiveLogsDirect {
  7. privatestaticfinal String EXCHANGE_NAME = "direct_logs";
  8. publicstaticvoid main(String[] argv) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.1.22");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  14. String queueName = channel.queueDeclare().getQueue();
  15. String queueName1 = channel.queueDeclare().getQueue();
  16. channel.queueBind(queueName, EXCHANGE_NAME, "info");
  17. channel.queueBind(queueName1, EXCHANGE_NAME, "info1");
  18. // if (argv.length < 1){
  19. // System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
  20. // System.exit(1);
  21. // }
  22. //
  23. // for(String severity : argv){
  24. // channel.queueBind(queueName, EXCHANGE_NAME, severity);
  25. // }
  26. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  27. QueueingConsumer consumer = new QueueingConsumer(channel);
  28. channel.basicConsume(queueName, true, consumer);
  29. channel.basicConsume(queueName1, true, consumer);
  30. while (true) {
  31. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  32. String message = new String(delivery.getBody());
  33. String routingKey = delivery.getEnvelope().getRoutingKey();
  34. String message1 = new String(delivery.getBody());
  35. String routingKey1 = delivery.getEnvelope().getRoutingKey();
  36. System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
  37. System.out.println(" [x] Received '" + routingKey1 + "':'" + message1 + "'");
  38. }
  39. }
  40. }
package com.mq.Routing;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.22");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();
    String queueName1 = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "info");
    channel.queueBind(queueName1, EXCHANGE_NAME, "info1");
//    if (argv.length < 1){
//      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
//      System.exit(1);
//    }
//    
//    for(String severity : argv){    
//      channel.queueBind(queueName, EXCHANGE_NAME, severity);
//    }
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);
    channel.basicConsume(queueName1, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      String routingKey = delivery.getEnvelope().getRoutingKey();
      
      String message1 = new String(delivery.getBody());
      String routingKey1 = delivery.getEnvelope().getRoutingKey();

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
      System.out.println(" [x] Received '" + routingKey1 + "':'" + message1 + "'");   
    }
  }
}

 

这个例子中由于队列和info绑定的,所以发送过来的消息只能放到队列queueName中。

3).匹配类型的交换机

Java代码 复制代码 收藏代码
  1. package com.mq.Topics;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.MessageProperties;
  6. publicclass EmitLogTopic {
  7. privatestaticfinal String EXCHANGE_NAME = "topic_logs";
  8. publicstaticvoid main(String[] argv) {
  9. Connection connection = null;
  10. Channel channel = null;
  11. try {
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.1.22");
  14. connection = factory.newConnection();
  15. channel = connection.createChannel();
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. String routingKey = getRouting(argv);
  18. String message = getMessage(argv);
  19. channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  20. System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
  21. }
  22. catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. finally {
  26. if (connection != null) {
  27. try {
  28. connection.close();
  29. }
  30. catch (Exception ignore) {}
  31. }
  32. }
  33. }
  34. privatestatic String getRouting(String[] strings){
  35. if (strings.length < 1)
  36. return"anonymous.info.abc";
  37. return strings[0];
  38. }
  39. privatestatic String getMessage(String[] strings){
  40. if (strings.length < 2)
  41. return"Hello World!";
  42. return joinStrings(strings, " ", 1);
  43. }
  44. privatestatic String joinStrings(String[] strings, String delimiter, int startIndex) {
  45. int length = strings.length;
  46. if (length == 0 ) return"";
  47. if (length < startIndex ) return"";
  48. StringBuilder words = new StringBuilder(strings[startIndex]);
  49. for (int i = startIndex + 1; i < length; i++) {
  50. words.append(delimiter).append(strings[i]);
  51. }
  52. return words.toString();
  53. }
  54. }
package com.mq.Topics;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.1.22");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      String routingKey = getRouting(argv);
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
  
  private static String getRouting(String[] strings){
    if (strings.length < 1)
    	    return "anonymous.info.abc";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
    	    return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

 

Java代码 复制代码 收藏代码
  1. package com.mq.Topics;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. publicclass ReceiveLogsTopic {
  7. privatestaticfinal String EXCHANGE_NAME = "topic_logs";
  8. privatestaticfinal String QUEUE_NAME = "DL";
  9. publicstaticvoid main(String[] argv) {
  10. Connection connection = null;
  11. Channel channel = null;
  12. try {
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost("192.168.1.22");
  15. connection = factory.newConnection();
  16. channel = connection.createChannel();
  17. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  18. // String queueName = channel.queueDeclare().getQueue();
  19. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  20. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "anonymous.*");
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. //消费者接受到一个消息并作长时间处理时,有可能发生意外状况,
  23. //如运行“Consumer”的机器突然关闭,这时这个消息所要执行的任务
  24. //可能没有得到正确处理。
  25. //RabbitMQ会一直等到处理某个消息的“Consumer”的链接失去之后,
  26. //才确定这个消息没有正确处理,从而RabbitMQ重发这个消息
  27. //初始化“Consumer”时有个auto参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。
  28. //我们的应用应该是在消息的任务处理完之后再ack,因此初始化“Consumer”时这个参数应该置为false
  29. QueueingConsumer consumer = new QueueingConsumer(channel);
  30. channel.basicConsume(QUEUE_NAME, false, consumer);
  31. //basicQos方法来限制“Consumer”没有ack的消息数目
  32. channel.basicQos(1);
  33. while (true) {
  34. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  35. String message = new String(delivery.getBody());
  36. String routingKey = delivery.getEnvelope().getRoutingKey();
  37. System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
  38. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  39. }
  40. }
  41. catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. finally {
  45. if (connection != null) {
  46. try {
  47. connection.close();
  48. }
  49. catch (Exception ignore) {}
  50. }
  51. }
  52. }
  53. }
package com.mq.Topics;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";
  private static final String QUEUE_NAME = "DL";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.1.22");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//    String queueName = channel.queueDeclare().getQueue();
      channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "anonymous.*");
    
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      //消费者接受到一个消息并作长时间处理时,有可能发生意外状况,
      //如运行“Consumer”的机器突然关闭,这时这个消息所要执行的任务
      //可能没有得到正确处理。
      //RabbitMQ会一直等到处理某个消息的“Consumer”的链接失去之后,
      //才确定这个消息没有正确处理,从而RabbitMQ重发这个消息
      //初始化“Consumer”时有个auto参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。
      //我们的应用应该是在消息的任务处理完之后再ack,因此初始化“Consumer”时这个参数应该置为false
      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(QUEUE_NAME, false, consumer);
      //basicQos方法来限制“Consumer”没有ack的消息数目
      channel.basicQos(1);
      while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        String routingKey = delivery.getEnvelope().getRoutingKey();
        System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}

 

分享到:
评论

相关推荐

    Java开发案例-springboot-06-整合RabbitMQ-源代码+文档.rar

    Java开发案例-springboot-06-整合RabbitMQ-源代码+文档.rar Java开发案例-springboot-06-整合RabbitMQ-源代码+文档.rar Java开发案例-springboot-06-整合RabbitMQ-源代码+文档.rar Java开发案例-springboot-06-整合...

    rabbitmq-server-3.11.13

    rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-...

    rabbitmq-c-master.rar_RabbitMQ c lib_cmake编译_rabbitmq_rabbitmq-c

    在`rabbitmq-c-master`项目中,CMake文件会指导编译器如何找到依赖项并构建目标库或可执行文件。 编译`rabbitmq-c-master`的步骤如下: 1. **获取源码**:首先,从官方仓库或者提供的压缩包中下载`rabbitmq-c-...

    spring-boot-mq-rabbitmq 一套打通rabbitmq 打开可用 有注释

    spring.rabbitmq.host=your-rabbitmq-host spring.rabbitmq.port=your-rabbitmq-port spring.rabbitmq.username=your-username spring.rabbitmq.password=your-password ``` 或者使用YAML格式: ```yaml # ...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    rabbitmq 3.6.5-1离线安装

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm systemctl status rabbitmq-server systemctl start rabbitmq-server systemctl stop rabbitmq-server systemctl restart rabbitmq-server systemctl enable ...

    rabbitmq-server-3.5.4.tar.gz

    在Linux环境下,RabbitMQ的安装通常涉及下载其发行版的tar.gz文件,例如"rabbitmq-server-generic-unix-3.5.4.tar.gz"。这个版本是专门为Linux系统设计的。 首先,我们来详细解释一下RabbitMQ的核心概念和功能。...

    rabbitmq-server-3.7.3百度网盘下载.txt

    这里提供了rabbitmq-server-3.7.3.exe百度网盘下载,官网下载实在是太慢了,亲测有效! rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe

    158-最简单的 RabbitMQ 监控方法1

    在安装了RabbitMQ的服务器上执行`rabbitmq-plugins enable rabbitmq_management`命令即可启用。接着,需要创建一个用户,例如`user_admin`,并设置相应的权限,以便登录管理控制台。 一旦登录Web控制台,最需要注意...

    rabbitmq-server-generic-unix-3.6.1.tar

    标题中的“rabbitmq-server-generic-unix-3.6.1.tar”是指RabbitMQ服务器的3.6.1版本的通用Unix安装包。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中,用于处理异步任务、消息传递和负载均衡...

    rabbitmq-server-3.6.1

    rabbitmq-server-3.6.1 的安装程序,rabbitmq-server-3.6.1 安装 rabbitmq-server

    最新版windows rabbitmq-server-3.8.5.zip

    2. **安装与配置**:`rabbitmq-server-3.8.5.exe`是RabbitMQ Windows版的安装程序,用户可以通过这个可执行文件进行安装。安装过程中会涉及服务启动设置、端口配置(默认5672用于AMQP,15672用于管理界面)、环境...

    rabbitmq-java (2).zip

    在这个名为"rabbitmq-java (2).zip"的压缩包中,我们可以看到几个关键文件,它们构成了一个使用Java与RabbitMQ交互的项目。 1. `rabbitmq-java.iml`:这是IntelliJ IDEA项目文件,包含了项目的模块设置和依赖关系。...

    RabbitMQ快速入门及API介绍(401M)

    【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...

    windows服务----基于RabbitMQ

    为了与RabbitMQ进行交互,我们需要使用RabbitMQ的C#客户端库,这可以通过NuGet包管理器添加到项目中。该库提供了创建连接、通道、声明交换机和队列,以及接收和发送消息的功能。在这个场景中,服务会在启动时建立...

    rabbitmq4.0.2离线安装

    rpm -ivh rabbitmq-server-4.0.2-1.el8.noarch.rpm systemctl status rabbitmq-server systemctl start rabbitmq-server systemctl stop rabbitmq-server systemctl restart rabbitmq-server systemctl enable ...

    apache-jmeter-rabbitMQ测试.zip

    这个压缩包可能包含这个插件,比如"rabbitmq-jms-sampler",它允许JMeter发送和接收JMS(Java Message Service)消息,而RabbitMQ支持JMS协议的一个实现。 2. **测试计划创建**:在JMeter中,你需要创建一个测试...

    docker-compose部署单机rabbitmq

    docker-compose部署单机rabbitmq,一键启动

    springboot集成各种技能 activiti security-cas-sso rabbitmq

    springboot-activiti springboot-actuator springboot-elasticsearch springboot-flowable springboot-rabbitmq springboot-security-oauth2-sso springboot-shiro ... 各种集成 纯手写 java 开发者可以看一下

    rabbitmq-server-3.7.7.exe

    RabbitMQ 3.7.7是该软件的一个稳定版本,而"rabbitmq-server-3.7.7.exe"就是这个版本的安装程序,适用于Windows平台。 在安装RabbitMQ之前,需要先安装其依赖环境——Erlang。Erlang是一种专门设计用于大规模并发...

Global site tag (gtag.js) - Google Analytics