`
sheungxin
  • 浏览: 105476 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ使用场景练习:RPC(七)

    博客分类:
  • MQ
阅读更多
  • RPC,同步消息
     RabbitMQ默认的consumer为异步监听,RPC应用需要实现consumer的同步,可以使用QueueingConsumer(继承与DefaultConsumer,定义了一个堵塞队列LinkedBlockingQueue)实现同步
     实际上就是服务端、客户端各定义一个消息队列,相互发送消息,客户端发送消息后同步等待结果返回

  • 注意要点

发送消息时指定回复队列、corrId
//发送消息,指定回复消息所在队列
BasicProperties props=new BasicProperties.Builder().correlationId(corrId).replyTo(queueName).build();
channel.basicPublish("", "rpc_queue", props, SerializationUtils.serialize(mes));

监听队列,等待服务端反馈(堵塞)
 //监听队列,接收服务端回复消息
 while(true){
     QueueingConsumer.Delivery delivery=consumer.nextDelivery();//堵塞	  
     //do something
     break;
 }

服务端接收消息处理后回写处理结果
channel.basicPublish("", properties.getReplyTo() ,new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(), 
						SerializationUtils.serialize("response:"+mes));

服务端开启手动应答
//关闭自动应答机制,默认开启;这时候需要手动进行应该
channel.basicConsume("rpc_queue", false, consumer);
channel.basicAck(envelope.getDeliveryTag(), false);

限制服务端最大处理量
channel.basicQos(1);

  • RPC服务端

package com.demo.mq.rabbitmq.example06;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * RPC服务端
 * @author sheungxin
 *
 */
public class RPCServer{

	/**
	 * RPC服务端,声明一个服务处理队列,接收消息处理后,把结果回写给客户端
	 * 需要开启手动应答机制,确保服务执行完成
	 * @param object 消息主体
	 * @throws IOException
	 */
	public static void sendAToB(Serializable object) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		channel.queueDeclare("rpc_queue", false, false, false, null);
		channel.basicQos(1);
		
		Consumer consumer=new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
				String mes=SerializationUtils.deserialize(body);
				System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done");
				//接收到消息后,向发送方回写消息:指定发送方所在队列(properties.getReplyTo())、correlationId
				channel.basicPublish("", properties.getReplyTo() , 
						new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(), 
						SerializationUtils.serialize("response:"+mes));
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		//关闭自动应答机制,默认开启;这时候需要手动进行应该
		channel.basicConsume("rpc_queue", false, consumer);
		System.out.println("*********8");
	}
	
	public static void main(String[] args) throws Exception {
		sendAToB("Hello World !");
	}
}

  • RPC客户端

package com.demo.mq.rabbitmq.example06;

import java.util.UUID;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

/**
 * RPC客户端
 * @author sheungxin
 *
 */
@SuppressWarnings("deprecation")
public class RPCClient {
	
	/**
	 * RPC客户端:向服务端处理队列发送消息,发送时指定correlationId、回复队列名称,同时在回复队列上建立consumer,进行监听接收回复消息
	 * @param mes
	 * @throws Exception
	 */
	public static String call(String mes) throws Exception{
		String response=null;
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//创建一个临时队列,用于接收回复消息
		String queueName=channel.queueDeclare().getQueue();
		QueueingConsumer consumer=new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		
		String corrId=UUID.randomUUID().toString();
		//发送消息,指定回复消息所在队列
		BasicProperties props=new BasicProperties.Builder().correlationId(corrId).replyTo(queueName).build();
		channel.basicPublish("", "rpc_queue", props, SerializationUtils.serialize(mes));
		
		//监听队列,接收服务端回复消息
		while(true){
			QueueingConsumer.Delivery delivery=consumer.nextDelivery();
			if(delivery.getProperties().getCorrelationId().equals(corrId)){
				response=SerializationUtils.deserialize(delivery.getBody());
				break;
			}
		}
		return response;
	}
	
	public static void main(String[] args) throws Exception {
		System.out.println(call("hello world"));
		System.out.println("waiting for rpc is over");
	}

}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics