`
sheungxin
  • 浏览: 105481 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ使用场景练习:入门实例(一)

    博客分类:
  • MQ
阅读更多
  • 注意要点

同一队列多次创建
//此处声明队列为了防止接收者先运行,队列还不存在时创建队列(同一队列只会创建一次)
channel.queueDeclare(queue, false, false, false, null);

  • 消息发送类

package com.demo.mq.rabbitmq.example01;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.demo.mq.rabbitmq.UserBean;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 发送消息类
 * @author sheungxin
 *
 */
public class Send{

	/**
	 * 单发送、单接收场景,无特别处理,用于接收消息
	 * @param queue 队列名称
	 * @param object 消息主体
	 * @throws IOException
	 */
	public static void sendAToB(String queue,Serializable object) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		channel.queueDeclare(queue, false, false, false, null);
		channel.basicPublish("", queue, null, SerializationUtils.serialize(object));
		System.out.println("A Send :'"+object+"'");
		channel.close();
		conn.close();
	}
	
	public static void main(String[] args) throws Exception {
		String channel="hello";
//		sendAToB(channel, new String("Hello World!".getBytes(),"UTF-8"));
		UserBean user=new UserBean();
		user.setId("0001");
		user.setName("测试001");
		sendAToB(channel, user);
	}
}


  • 消息接收类

package com.demo.mq.rabbitmq.example01;

import java.io.IOException;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.demo.mq.rabbitmq.UserBean;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 接收消息类
 * @author sheungxin
 *
 */
public class Recv {
	
	/**
	 * 单发送、单接收场景,无特别处理,用于接收消息
	 * 注意:同时多个接收实体,依次接收消息,同一消息只有一个实体接收
	 * @param queue
	 * @throws Exception
	 */
	public static void recvAToB(String queue) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//此处声明队列为了防止接收者先运行,队列还不存在时创建队列(同一队列只会创建一次)
		channel.queueDeclare(queue, false, false, false, null);
		Consumer consumer=new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
//				String mes=SerializationUtils.deserialize(body);
				UserBean userBean=SerializationUtils.deserialize(body);
				System.out.println("B Received :'"+userBean.getId()+","+userBean.getName()+"'");
			}
		};
		channel.basicConsume(queue, true, consumer);
	}
	
	public static void main(String[] args) throws Exception {
		recvAToB("hello");
	}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics