`

rabbitmq 学习-14- 官方rabbitmq+spring进行远程接口调用

阅读更多

到http://github.com/momania/spring-rabbitmq下载其示例程序

实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,

RabbitRpcClient.java(对RpcClient的简单封装,添加了发送消息时的选项:
mandatory--是否强制发送,immediate--是否立即发送,timeOutMs--超时时间)




发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;

import static java.lang.String.format;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;

public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
        implements InitializingBean, DisposableBean, ShutdownListener {

    private final Log log = LogFactory
            .getLog(RabbitInvokerServiceExporter.class);

    private RabbitChannelFactory channelFactory;
    private String exchange;
    private ExchangeType exchangeType;
    private String queueName;
    private String routingKey;

    private Object proxy;
    private List<RpcServer> rpcServerPool;
    private int poolsize = 1;

    public void afterPropertiesSet() {
        // 检查exchange type类型不能为fanout
        if (exchangeType.equals(ExchangeType.FANOUT)) {
            throw new InvalidRoutingKeyException(String.format(
                    "Exchange type %s not allowed for service exporter",
                    exchangeType));
        }
        exchangeType.validateRoutingKey(routingKey);

        // 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
        proxy = getProxyForService();

        // 初始化rpcServer池
        rpcServerPool = new ArrayList<RpcServer>(poolsize);

        // 初始化RpcServer,并开始接收请求
        startRpcServer();
    }

    // 初始化RpcServer,并开始接收请求
    private void startRpcServer() {
        try {
            log.info("Creating channel and rpc server");

            // 创建临时的channel,用来定义queue,exchange,并进行bind
            // 这里有两个用处:
            // 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
            // 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
            Channel tmpChannel = channelFactory.createChannel();
            tmpChannel.getConnection().addShutdownListener(this);
            tmpChannel.queueDeclare(queueName, false, false, false, true, null);
            if (exchange != null) {
                tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
                tmpChannel.queueBind(queueName, exchange, routingKey);
            }

            // 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
            for (int i = 1; i <= poolsize; i++) {
                try {
                    // 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
                    Channel channel = channelFactory.createChannel();
                    String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
                    log.info(String.format(format, i, exchange, exchangeType,
                            queueName, routingKey));

                    // 使用当前的channel创建一个RpcServer去处理请求
                    final RpcServer rpcServer = createRpcServer(channel);
                    rpcServerPool.add(rpcServer);

                    // 创建一个线程让当前的RpcServer去处理请求
                    Runnable main = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // rpcServer开始处理请求
                                throw rpcServer.mainloop();
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    };
                    // 线程开始
                    new Thread(main).start();
                } catch (IOException e) {
                    log.warn("Unable to create rpc server", e);
                }
            }
        } catch (Exception e) {
            log.error("Unexpected error trying to start rpc servers", e);
        }
    }

    // 创建RpcServer对象
    private RpcServer createRpcServer(Channel channel) throws IOException {
        return new RpcServer(channel, queueName) {

            // 重写处理接收到的消息的方法
            public byte[] handleCall(byte[] requestBody,
                    AMQP.BasicProperties replyProperties) {
                // 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
                // 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
                RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
                        .deserialize(requestBody);

                // 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
                RemoteInvocationResult result = invokeAndCreateResult(
                        invocation, proxy);

                // 将执行结果序列化为byte数据,然后返回给客户端
                return SerializationUtils.serialize(result);

            }
        };
    }

    public void setChannelFactory(RabbitChannelFactory channelFactory) {
        this.channelFactory = channelFactory;
    }

    @Required
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public Object getProxy() {
        return proxy;
    }

    @Override
    public void destroy() throws Exception {
        clearRpcServers();
    }

    // 清除所有的RpcServer
    private void clearRpcServers() {
        if (log.isInfoEnabled()) {
            log.info(format("Closing %d rpc servers", rpcServerPool.size()));
        }

        for (RpcServer rpcServer : rpcServerPool) {
            try {
                // 中止处理请求
                rpcServer.terminateMainloop();
                rpcServer.close();
            } catch (Exception e) {
                log.warn("Error termination rpcserver loop", e);
            }
        }
        rpcServerPool.clear();
        if (log.isInfoEnabled()) {
            log.info("Rpc servers closed");
        }

    }

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        if (log.isInfoEnabled()) {
            log.info(String.format("Channel connection lost for reason [%s]",
                    cause.getReason()));
            log.info(String.format("Reference [%s]", cause.getReference()));
        }

        if (cause.isInitiatedByApplication()) {
            if (log.isInfoEnabled()) {
                log.info("Sutdown initiated by application");
            }
        } else if (cause.isHardError()) {
            log
                    .error("Shutdown is a hard error, trying to restart the RPC server...");
            startRpcServer();
        }
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    @Required
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public void setPoolsize(int poolsize) {
        this.poolsize = poolsize;
    }

    @Required
    public void setExchangeType(ExchangeType exchangeType) {
        this.exchangeType = exchangeType;
    }
}

分享到:
评论

相关推荐

    rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用

    NULL 博文链接:https://wubin850219.iteye.com/blog/1076093

    rabbitmq-server windows安装包

    2. **环境变量**:安装过程中可能需要设置环境变量,例如添加RabbitMQ的bin目录到PATH变量中,以便于命令行调用相关工具。 3. **服务启动与管理**:安装完成后,RabbitMQ服务将作为Windows服务运行。用户可以通过...

    RabbitMQ学习-实战.docx

    3. 安装RabbitMQ:按照官方指南进行安装,并确保服务器启动并运行。 4. 配置RabbitMQ:可以通过管理界面或者配置文件来设置用户、虚拟主机和权限。 接下来,我们学习RabbitMQ的队列。队列是RabbitMQ的核心概念,它...

    2020年最新版--Java+最常见的+200++面试题汇总+答案总结汇总

    RabbitMQ和Kafka是消息中间件,面试中可能讨论它们的使用场景、特性以及如何在Java中集成。Zookeeper在分布式系统中的角色,如配置管理、服务发现等,也是考察点。 【网络编程】 网络编程的基础,如TCP/IP协议、...

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    RabbitMQ作为一款流行的开源消息中间件,广泛应用于Spring Boot项目中。本教程将详细介绍如何在Spring Boot应用中结合RabbitMQ实现消息确认、消息持久化、备用交换机以及死信交换机等功能。 首先,让我们理解这些...

    Spring-Boot-Game是基于SpringBoot+SpringCloud的开发系统.zip

    这个项目可能还涉及到微服务间的通信,如使用RabbitMQ或Kafka作为消息中间件,或者利用Spring Cloud Stream进行消息传递。此外,可能会有Dockerfile和docker-compose.yml文件,用于将应用容器化部署。 总之,...

    spring-cloud-learning:Springcloud + rabbitmq + kafka + slueth + zipkin + lcn + redis

    本项目"spring-cloud-learning:Springcloud + rabbitmq + kafka + slueth + zipkin + lcn + redis"是针对Spring Cloud的一系列实践教程,涉及到的关键技术包括: 1. **Spring Cloud Eureka**:注册与发现服务,它是...

    rabbitmq实战-rabbitmq-action.zip

    2.5 RPC模式:远程过程调用,通过RabbitMQ实现客户端与服务端的异步通信。 三、RabbitMQ实战应用 3.1 Spring整合:使用Spring框架集成RabbitMQ,包括配置、生产者和消费者组件的创建,以及消息模板的使用。 3.2 ...

    SpringBoot+SpringCloud+nacos+gateway+mybatis搭建微服务

    在微服务之间进行远程调用时,通常有两种方式:RESTful API和Feign。RESTful API是通过HTTP协议进行调用,而Feign是Spring Cloud提供的声明式Web服务客户端,它使得编写Web服务客户端变得非常简单,只需要定义一个...

    spring整合rabbitmq需要的jar包(spring版本4.2.0)

    1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和配置。这个库包含了Spring AMQP项目的核心功能,如连接工厂、模板类、监听容器等,使得开发者能够方便...

    spring-rabbitmq-demo

    当我们结合Spring与RabbitMQ时,可以创建一个高效且灵活的异步通信和远程过程调用(RPC)系统。下面,我们将深入探讨"spring-rabbitmq-demo"项目中的关键知识点。 首先,`pom.xml`文件是Maven项目的核心配置文件,...

    (spring-cloud-sleuth-stream+mybatis+rabbit)+(kafka+elasticsearch+zipkin)

    Sleuth Stream 模块则利用 Spring Cloud Stream 实现服务间的通信,通过消息代理(如 RabbitMQ)传递追踪数据,确保高可用性和可伸缩性。 2. **MyBatis**: MyBatis 是一个优秀的持久层框架,它支持自定义 SQL、存储...

    springCloud+rabbitMq

    标题 "springCloud+rabbitMq" 暗示了这篇内容是关于如何在Spring Cloud框架中集成并使用RabbitMQ的教程。Spring Cloud是微服务架构中的一个组件集合,提供了服务发现、配置管理、断路器等核心功能,而RabbitMQ则是一...

    springcloud微服务技术栈-个人笔记文档(基础篇)

    它允许开发者以类似接口的方式定义服务调用,降低了远程调用的复杂性。 6. **Config Server**:Spring Cloud Config 用于实现分布式系统中的配置管理,支持 Git 存储配置,允许服务在运行时动态获取或更新配置。 7...

    spring+springmvc+rabbitmq实例代码

    本文将深入探讨如何使用Spring、Spring MVC和RabbitMQ进行集成,以及它们在实际项目中的应用。 首先,Spring AMQP是Spring框架的一个模块,它提供了一种与AMQP(Advanced Message Queuing Protocol)兼容的抽象层,...

    spring rabbitmq rpc 测试代码

    总结一下,Spring RabbitMQ RPC的核心在于利用RabbitMQ作为中间人,通过定义交换机、队列和绑定,实现在客户端和服务端之间进行异步的远程调用。这种方式可以很好地扩展系统,同时保持组件间的解耦。通过配置和编程...

    springboot+rabbitmq项目demo(亲测,可正常运行)

    为了验证项目是否正常运行,我们可以创建一个简单的REST接口,通过调用`MessageProducer`发送消息。在`Controller`类中添加以下代码: ```java import org.springframework.beans.factory.annotation.Autowired; ...

    基于Vue+SpringCloud博客的设计与实现 有论文

    基于Vue+SpringCloud博客的设计与实现---微服务基础版本组件1.0版本 博客采用Vue+SpringCloud前后分离的方式。博客采用了高可用Eureka(可以替换成其他微服务组件)以及高可用Zuul,使用以Es搜索引擎作为Zpkin的存储...

    rabbitmq学习10:使用spring-amqp发送消息及异步接收消息

    标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...

Global site tag (gtag.js) - Google Analytics