`
wbj0110
  • 浏览: 1613049 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Storm-源码分析- Thrift的使用

阅读更多

1 IDL

首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service 
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

比如对于nimbus service 
在IDL的定义为,

service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // returns json
  string getNimbusConf();
  // stats functions
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  //returns json
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

而对应在Nimbus.java的Java代码如下,

public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public String beginFileUpload() throws org.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throws org.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

 

2 Client

1. 首先Get Client,

NimbusClient client = NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的逻辑, 
只是从配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {
        try {
            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
            int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
            return new NimbusClient(conf, nimbusHost, nimbusPort);
        } catch (TTransportException ex) {
            throw new RuntimeException(ex);
        }
    }

NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote 
这里看出Thrift对Transport和Protocol的封装 
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port) 
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
        try {
            //locate login configuration 
            Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

            //construct a transport plugin
            ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);

            //create a socket with server
            if(host==null) {
                throw new IllegalArgumentException("host is not set");
            }
            if(port<=0) {
                throw new IllegalArgumentException("invalid port: "+port);
            }            
            TSocket socket = new TSocket(host, port);
            if(timeout!=null) {
                socket.setTimeout(timeout);
            }
            final TTransport underlyingTransport = socket;

            //establish client-server transport via plugin
            _transport =  transportPlugin.connect(underlyingTransport, host); 
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        _protocol = null;
        if (_transport != null)
            _protocol = new  TBinaryProtocol(_transport);
    }

 

2. 调用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
    {
      send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
      recv_submitTopologyWithOpts();
    }

分两步, 
首先send_submitTopologyWithOpts, 调用sendBase 
接着, recv_submitTopologyWithOpts, 调用receiveBase 

  protected void sendBase(String methodName, TBase args) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }

  protected void receiveBase(TBase result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();
    if (msg.type == TMessageType.EXCEPTION) {
      TApplicationException x = TApplicationException.read(iprot_);
      iprot_.readMessageEnd();
      throw x;
    }
    if (msg.seqid != seqid_) {
      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
    }
    result.read(iprot_);
    iprot_.readMessageEnd();
  }

可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定 

 

3 Server

Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现 
下面看看在nimbus server端, 是用clojure来写的 
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单 
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                    (THsHaServer$Args.)
                    (.workerThreads 64)
                    (.protocolFactory (TBinaryProtocol$Factory.))
                    (.processor (Nimbus$Processor. service-handler))
                    )
       server (THsHaServer. options)]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")
    (.serve server)))
分享到:
评论

相关推荐

    spring-cloud-starter-thrift:spring-cloud-starter-thrift提供SpringCloud对可伸缩的跨语言服务调用框架Apache Thrift的封装和集成

    spring-cloud-starter-thrift包括客户端spring-cloud-starter-thrift-client和服务端spring-cloud-starter-thrift-server两个模块。服务端:支持Apache Thrift的各种原生服务线程模型,包括单线程阻塞模型(simple)、...

    apache-storm-0.9.5源码

    总的来说,分析Apache Storm 0.9.5的源码,我们可以深入了解其设计思想、内部机制和实现细节,这对于开发者在实际项目中优化性能、解决故障、定制功能都有极大的帮助。同时,这也为理解后续版本的改进和发展提供了...

    libthrift-1.0.0.jar,thrift例子,thrift源码

    libthrift-1.0.0.jar,thrift例子,thrift源码 里面有你想要java版的thrift全部文件

    thrift-0.9.1.exe和thrift-0.9.2.exe

    标题中的"thrift-0.9.1.exe"和"thrift-0.9.2.exe"是Thrift框架的不同版本。这些文件是Windows平台上的可执行程序,用于生成与Thrift相关的代码。0.9.1和0.9.2分别代表了Thrift的两个发行版本,每个版本可能包含了新...

    maven-thrift-plugin-0.1.11.jar

    maven-thrift-plugin-0.1.11.jar

    maven-thrift-plugin:将 thrift 文件生成为源代码的 Maven Thrift 插件

    欢迎使用 Maven Thrift 插件 Maven Thrift Plugin 用于编译你项目的 thrift 文件。 注意:确保已安装 Thrift。 有关 Thrift 安装指南,请参阅: 目标概述 generate-java绑定到 generate-sources 阶段,用于编译 ...

    spark-hive-thriftserver_2.11-2.1.3-SNAPSHOT-123456.jar

    spark-hive-thriftserver_2.11-2.1.spark-hive-thrift

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0...spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0.jar

    gradle-thrift-plugin:Gradle的Thrift插件

    classpath 'co.tomlee.gradle.plugins:gradle-thrift-plugin:0.0.6' } } repositories { mavenCentral() } dependencies { // // adjust for your Thrift version ... // compile 'org.apac

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    对HBase的API做了一层抽象,统一了HBase1.x和HBase2.x的实现,并提供了读写HBase的ORM的支持,同时,sdk还对HBase thrift 的客户端API进行了池化封装,(类似JedisPool),消除了直接使用原生API的各种问题,使之...

    thrift-0.9.1.exe和thrift-0.9.2.exe 含使用说明

    在"thrift-0.9.1.exe"和"thrift-0.9.2.exe"这两个版本中,主要包含了Thrift编译器和相关的库文件。编译器是一个命令行工具,用于将IDL文件转换为实际编程语言的代码。例如,你可以编写一个.thrift文件定义服务接口,...

    cassandra-thrift

    cassandra-thrift cassandra-thrift cassandra-thrift cassandra-thrift cassandra-thrift cassandra-thrift

    apache-storm-0.9.6

    这个名为 "apache-storm-0.9.6" 的压缩包包含了 Apache Storm 的0.9.6版本,该版本发布于2014年,是Storm发展中的一个重要里程碑。 在Storm中,数据流被分割成多个“tuple”(元组),这些元组在集群中的工作节点...

    windows下vs2010编译的thrift,包含lib和cpp源码

    2. 下载Thrift源码,包括thrift-0.11.0版本。 3. 配置项目设置,指定编译目标为C++和Windows。 4. 修改和配置编译选项,确保与你的环境兼容。 5. 使用Thrift编译器生成服务端和客户端代码。 6. 将生成的代码和库文件...

    thrift环境配置方法

    为了使用 Thrift,需要配置好相关的环境,这篇文章将介绍 Thrift 环境配置的方法。 Thrift 环境配置的重要性 ------------------------- Thrift 是一个强大的 RPC 框架,但是在使用前需要配置好相关的环境。配置...

    thrift源码

    thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码thrift源码...

    php-thrift-package:Thrift php api composer软件包

    Thrift PHP软件库 执照 根据一项或多项贡献者许可协议获得了Apache Software Foundation(ASF)的许可。 有关版权拥有权的其他信息,请参见随此作品... 要在您PHP代码库中使用Thrift,请执行以下步骤: 将所有thrift

    netty+thrift高并发高性能

    综上所述,通过结合使用Netty和Thrift,可以有效地解决传统RPC框架存在的性能瓶颈问题,实现高并发和高性能的服务。这种组合不仅可以大幅提高系统的吞吐量,还能降低延迟,是现代分布式系统中不可或缺的技术栈之一。

    stm32f4-thrift-源码.rar

    STM32F4-Thrift源码项目可能包括以下部分: 1. **Cortex-M4内核特性**:了解STM32F4处理器的基础,如FPU(浮点运算单元)、DSP指令集以及高级中断系统,这些都是高性能实时应用的关键。 2. **STM32CubeMX配置**:...

    maven-thrift-server

    【 Maven-Thrift-Server:构建Thrift服务的Maven实践】 在软件开发中,Thrift是一种高效的跨语言服务开发框架,由Facebook开发并开源。它允许定义数据类型和服务接口,然后自动生成各种编程语言的代码,使得不同...

Global site tag (gtag.js) - Google Analytics