`
iwinit
  • 浏览: 455121 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

服务框架HSF分析之二Provider启动和处理

 
阅读更多

 

上篇文章,简单介绍了HSF框架的初始化。这一篇,小编将为大家带来HSF provider的启动和服务细节。主要关注点:Server的启动,服务的注册,调用处理主流程(IO线程,业务线程)。

一.  Server的启动

在某一个HSFSpringProviderBean初始化时,启动HSF Server。在HSF默认协议使用TBRemoting作为RPC框架,TBRemotingRPCProtocolComponent.registerProvider中:

providerServer.startHSFServer();

其代码如下:

 

            Server server = new DefaultServer(configService.getHSFServerPort());
            server.start();
            ……
 

DefaultServer构造:

 

acceptor = new SocketAcceptor(processorCount, IO_THREAD_POOL);
        SocketAcceptorConfig cfg =(SocketAcceptorConfig)acceptor.getDefaultConfig();
        cfg.setBacklog(BACKLOG_SIZE);
        cfg.setThreadModel(ThreadModel.MANUAL);
        cfg.getFilterChain().addLast(CODEC_FILTER_NAME, CODEC_FILTER);
 

 

1.  可以看到是使用了mina作为NIO框架,启动一个server的SocketAcceptor,并配置

2.  关键的一点是在filter的最后加了一个CODEC_FILTER,做为序列化的filter。后续会单独分析。

Server的start:

 

IoHandlerioHandler = newDefaultIoHandler(this);
        try {
            acceptor.bind(serverSocket,ioHandler);
            started= true;
            LOGGER.warn("服务器已启动:" + serverSocket);
        }
 

 

1.  构造一个默认handler,作为接受RPC请求的处理入口

2.      绑定端口

 

已上就完成了HSF的Server启动。此时端口已经打开,可以接受请求了。

二.  服务的注册

在启动server之后,HSF会将这个provider信息注册到configserver上。入口是: metadataService.publish(metadata);

具体实现:

 

privatePublisher<String> doPublish(ServiceMetadata metadata) {
        final String serviceUniqueName =metadata.getUniqueName();
        final String data =HSFServiceTargetUtil.getTarget(configService.getHSFServerPort(), metadata);
 
        final String publisherId =PUBLISHER_PREFIX + serviceUniqueName;
        PublisherRegistration<String>registration = new PublisherRegistration<String>(publisherId,serviceUniqueName);
        registration.setGroup(metadata.getGroup());
 
        Publisher<String> publisher =PublisherRegistrar.register(registration);
        publisher.publish(data);
        return publisher;
}
 

 

1.      拼成TB Remoting格式的Target地址信息: 10.7.42.161:12200?CLIENTRETRYCONNECTIONTIMES=3&CLIENTRETRYCONNECTIONTIMEOUT=1000&_SERIALIZETYPE=hessian&_IDLETIMEOUT=10&_TIMEOUT=5000

2.      构造PublisherRegistration数据实体,代表一次发布,设置provider的分组,给分组调用用。

3.      注册一个新的发布者身份以发布数据

a.      默认使用IsolatedPublisherRegistrar进行注册

b.      构造默认发布者DefaultPublisher

 

     DefaultPublisher(PublisherRegistration<T>registration, ConfigClientWorker worker) {
        super(registration, worker);
        ......
        this.regRequest = newPublisherRegReqPacket(registration.getDataId(), registration.getClientId(),
                datumId != null ? datumId :UUID.randomUUID().toString());            //UUID as default datum ID.
        .......
}
 

 

在父类DefaultDataClient中初始化ConfigClientWorker:

this.worker= (worker != null) ? worker : ConfigClientWorker.getDefaultWorker();

ConfigClientWorker是config server客户端的核心类,负责管理客户端task的异步执行,当然发布provider也是一个task。初始化ConfigClientWorker时,会初始化其内部的WorkThread线程和连接管理ConfigClientConnection。

可见ConfigClientWorker管理着业务线程和IO线程之间的交互,非常重要。此时,work线程还没启动,和Config server的连接也没有连接,只是初始化而已。

4.      发布数据

a.      先看看data是否可序列化,使用java序列化data

 

newObjectOutputStream(new OutputStream() {
                @Override public void write(intb) throws IOException {}
            }).writeObject(datum);
 

 

b.      通过之前创建的ConfigClientWorker提交一个任务 worker.schedule(this);

c.       将task放入阻塞队列 tasks.put(task);

d.      唤醒work线程,workerThread.signal();

e.      如果是第一次,则启动work线程

 

synchronized(bell){
                if (isAlive()) {
                    bell.notifyAll();
                } else {
                    try {
                        start();                   // Lazy start
                    }catch(IllegalThreadStateException e) {
                       log.fatal("[Internal] Worker thread is dead.");
                    }
                }
            }
 

 

       可以发现发布数据是一个异步操作,中间通过ConfigClientWorker隔开了,而WorkThread是处理task的线程,task包括发送和接受,对应ConfigClientWorker的2个属性:

 

       // Tokeep messages from configServer. Access is shared between deliverer thread andcommunication thread.
    finalQueue<ProtocolPackage> mailbox = newLinkedBlockingQueue<ProtocolPackage>(MAX_MAILBOX_SIZE);
    // Access isshared between user thread and deliverer thread.
private final TaskQueue tasks =new TaskQueue();
 

 

f.       Work线程执行发布任务,创建和ConfigServer的连接

 

privatevoid ensureConnected() throws InterruptedException {
            if (connection.isConnected())return;
            // Block until connected to server.
            while(! connection.connect()) {
               Thread.sleep(GLOBAL_RECONNECTING_DELAY);
            }
        }     
 

 

之前创建的Connection在这里同步初始化连接,使用TB remoting框架,这里最终是通过一个共享的资源器拿到连接,有兴趣的同学可以单独研究哈。最终的mina client代码:

 

ConnectFutureminaConnectFuture = connector.connect(
               ConnectionUrl.socketAddr(targetUrl),
                localAddress,
                new ClientIoHandler(this,clientKey, copyListeners),
                cfg);
 

 

g.     连接创建好之后,开始发送数据 resp= sendReceive(packagee);此处最终会调用mina的session.write(connectionMsg, wfl); 发送数据是异步+wait的过程,发送完成之后会起一个定时timer到超时点时运行,返回客户端超时异常。

 

TimeoutHandle timeoutHandle = new TimeoutHandle();
                timeoutFuture =DefaultClientManager.timer.schedule(timeoutHandle,connRequest.getRespTimeout(), TimeUnit.MILLISECONDS);
 

 

三.  服务调用

Provider注册完之后,consumer就可以通过configserver拿到地址了,发起调用了。

在server端通过mina框架的handler接受请求并处理。在mina的filter端有一个codecfilter类RemotingProtocolCodecFilter。对应的decoder和encoder为RemotingProtocolEncoder和RemotingProtocolDecoder。

在Decoder关键点:

a.     使用session保存半包的请求,二进制协议的关键

b.      按协议对ByteBuffer进行解析,输出到ProtocolDecoderOutput

 

/*
         * 新版报文组成:
         *  Header(1B): 报文版本
         *  Header(1B): 请求/响应
         *  Header(1B): 报文协议(HESSIAN/JAVA)
         *  Header(1B): 单向/双向
         *  Header(1B): Reserved
         *  Header(4B): 通信层对象长度
         *  Header(1B): 应用层对象类名长度
         *  Header(4B): 应用层对象长度
         *  Body:       通信层对象
         *  Body:       应用层对象类名
         *  Body:       应用层对象
         */
 

 

 Codec之后会调用DefaultIoHandler.messageReceived进行处理:

 

 

DefaultConnection conn =DefaultConnection.findBySession(session);
       conn.getMsgReceivedListener().messageReceived(conn, message);
 

 

 

DefaultMsgListener端会执行doRequest方法:

a.      构造响应的ConnectionResponse对象

b.      拿之前注册的processor即ProviderProcessor

c.       拿之前注册的处理请求的线程池

d.      启动一个runnable,扔到线程池中执行

e.      Mina io线程返回

业务线程端:

a.      调用ProviderProcessor. handleRequest进行处理

b.     切换classloader到app的classloader,不然会找不到app的类

 

Thread.currentThread().setContextClassLoader(servicePOJO.getClass().getClassLoader());
 

 

c.      反射调用服务方法

Object appResp =workerMethod.invoke(servicePOJO, methodArgs);
 

d.      返回执行结果

 

四.  小结

本文简单小结了HSF的服务启动和调用,典型的RPC应用,中间的一些知识点,序列化,nio,异步调用等都是java的基础知识,希望通过本文让大家增加对rpc框架的认识~~

HSF由于使用了OSGI容器,导致对app容器的依赖,扩展性上确实做的不够,tb remoting基本和mina绑死了,这也是淘宝应用的一个特点,能跑能解决业务问题就是王道~~

分享到:
评论

相关推荐

    Springboot+HSF分布式服务框架+EDAS注册中心,实现发布订阅模式

    Spring Boot、HSF(High Speed Service Framework)以及EDAS(Enterprise Distributed Application Service)是阿里巴巴提供的强大工具,帮助开发者快速构建和部署分布式服务。让我们深入探讨这些技术,并了解如何...

    HSF服务框架共28页.pdf.zip

    HSF(High Speed Service Framework)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建...因此,我们主要关注HSF服务框架的相关内容,期望从"HSF服务框架共28页.pdf.zip"中获得深入的理论知识和实践经验。

    HSF项目例子IDEA 与 eclipse 开发环境说明

    4. **服务启动与停止脚本(Start & Stop Scripts)**:用于启动和停止HSF服务的shell或bat脚本。 5. **示例客户端(Sample Client)**:演示如何作为服务消费者调用HSF服务。 通过以上介绍,你对HSF的开发环境设置...

    分布式服务框架原理与实践(Dubbo,HSF)_李林锋著

    《分布式服务框架原理与实践(Dubbo,HSF)_李林锋著》这本书深入探讨了分布式服务框架的关键技术和实际应用,主要聚焦于两个知名的开源框架——Dubbo和HSF。这两者都是实现高效率、可扩展的分布式服务的核心工具,尤其...

    taobao-hsf.sar

    HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的服务治理框架,主要用于构建分布式服务系统。它为开发者提供了简单易用的API,使得在Java环境中开发、发布、调用分布式服务...

    HSF and 编码规范

    HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建分布式服务系统。HSF使得应用可以像调用本地方法一样调用远程服务,极大地提升了开发效率和系统的可...

    taobao-hsf.tgz

    淘宝HSF(High Speed Service Framework)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建大规模分布式服务系统。HSF全称为High Speed Service Framework,它基于Java语言开发,旨在提高服务调用的效率...

    阿里云HSF用户开发指导手册

    阿里云HSF(High Speed Service Framework)用户开发指导手册是一份专门为Java开发者设计的文档,旨在帮助他们理解和高效地使用HSF服务框架。HSF是阿里巴巴开源的一款高性能、轻量级的服务治理框架,主要用于构建...

    EDAS-HSF-BOOT

    HSF(High Speed Service Framework)是阿里云EDAS中的核心组件,它是一个高性能、轻量级的Java服务框架,用于构建微服务架构。HSF-BOOT则是一个帮助开发者快速启动HSF服务的引导项目,它简化了HSF服务的开发流程,...

    阿里hsf接口 sar包

    HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的分布式服务框架,主要用于构建大规模分布式服务系统。它基于Java语言,提供了包括服务注册、发现、调用、负载均衡、容错、...

    taobao-hsf

    【标题】"taobao-hsf" 是一个与阿里巴巴云服务相关的技术组件,它主要用于构建分布式服务框架。在阿里云的生态系统中,HSF(High Speed Service Framework)是为大规模分布式服务设计的一种高性能、高可用的服务化...

    taobao-HSF的两种安装方案

    标题中的“taobao-HSF”指的是淘宝的High Speed Service(HSF)框架,它是一个高性能、高可用的服务治理框架,主要用于阿里巴巴集团内部的分布式服务调用。HSF提供了包括服务发现、服务调用、负载均衡、熔断保护等一...

    HSF实现原理讲解

    HSF 框架的原理讲解,主要包含了知识点:rpc,动态代理,HSF可以作为微服务的基础框架进行二次开发

    淘宝好舒服 taobao-hsf

    "淘宝好舒服"(Taobao-HSF)是一个专为阿里巴巴集团内部设计的高性能服务框架,全称为High Speed Service。这个框架主要用于实现企业级的服务化架构,提供高效、稳定、灵活的服务调用解决方案。HSF是淘宝核心的服务...

    HSF用户手册

    HSF(High-Speed Service Framework)是阿里巴巴集团内部广泛使用的一个高性能、轻量级的RPC框架。它主要应用于分布式系统中,提供服务间的高效通信机制。HSF的目标是简化分布式应用的开发过程,通过隐藏底层的复杂...

    aliyunHSF文档.zip

    阿里云HSF(High Speed Service Framework)是一款高性能、轻量级的服务框架,专为构建分布式服务而设计。HSF是阿里巴巴内部广泛使用的微服务框架,它提供了服务注册、发现、调用、负载均衡、熔断、限流等功能,极大...

    淘宝HSF使用说明

    淘宝的HSF框架,用户手册,有兴趣的人欢迎下载~内部文档哦~

Global site tag (gtag.js) - Google Analytics