`

flume-Thrift-source

阅读更多
Thrift IDL
Flume Thrift IDL在client包里面,定义如下:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.

Thrift Service
Flume的Source分两种:

?实现PollableSource接口
通过SinkRunner管理Source
?实现EventDrivenSource接口
可以自己接受数据、发送到channel。比如ThriftSource
Flume Thrift Service的实现类在core包

public class ThriftSource extends AbstractSource implements Configurable,
  EventDrivenSource {
  public static final String CONFIG_THREADS = "threads";
  public static final String CONFIG_BIND = "bind";
  public static final String CONFIG_PORT = "port";
  private Integer port;
  private String bindAddress;
  private int maxThreads = 0;
  private SourceCounter sourceCounter;
  private TServer server;
  private TServerTransport serverTransport;
  private ExecutorService servingExecutor;
  public void start() {
      //创建工作者线程池
      ...
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());

      //ThriftSourceProtocol是Flume Thrift Service的真正实现
      args.processor(new ThriftSourceProtocol
        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
     /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });
    ...
  }Flume Thrift Service真正的实现类是内部类ThriftSourceHandler

  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
        event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
        //传给channel
        getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
        logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
        return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
        getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
        logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
        return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
  }
分享到:
评论

相关推荐

    flume-ng-1.6.0 cdh5.7.0安装包

    1. 支持多种数据源:包括HTTP、JDBC、Avro、Thrift等,允许Flume从各种来源收集数据。 2. 支持多种接收器:可以将数据写入HDFS、HBase、Cassandra、Solr等,提供灵活的数据存储选择。 3. 支持多级路由和转换:通过...

    flume通过thrift协议收集日志-Python

    在 Flume 中使用 Thrift Source,可以使得外部应用程序(如 Python)能够将数据推送到 Flume。Python 应用程序将扮演 Flume Source 的角色,通过 Thrift 协议与 Flume 进行通信。Thrift 提供了 IDL(接口定义语言)...

    flume-ng-1.5.0-cdh5.3.6.tar.gz

    1. **Source**:Flume提供了多种类型的Source,如Avro Source、Thrift Source、HTTP Source等,用于从各种数据源接收数据。 2. **Channel**:Channel是数据的缓冲区,可以是内存型或文件型,确保数据在传输过程中的...

    flume-ng-1.6.0-cdh5.14.0源码

    AvroSource 支持通过 Avro 协议从外部系统接收数据,ThriftSource 使用 Thrift 协议,JDBCSource 则可以从数据库中抽取数据。 2. **Channels**: 提供多种数据存储方式,如 MemoryChannel(内存存储)、FileChannel...

    flume-1.9.0.tgz

    1. **Sources**: Flume 提供了多种 Sources,如 HTTP、Avro、Thrift、JDBC 等,用于从不同来源收集数据。例如,HTTP Source 可以接收 Web 服务器的日志,Avro Source 可以从其他 Flume 实例接收数据。 2. **...

    flume-chd版本

    Flume-chd支持多种数据源,如HTTP、Avro、Thrift、Kafka等。例如,你可以配置一个Source来监听网络端口,接收来自Web服务器的日志数据。 6. **数据通道(Channels)** Channels作为数据的临时存储,可以是内存...

    hadoop_apache-flume-1.7.0-bin可用.rar

    3. **Flume Source类型**:Flume支持多种源类型,包括`TaildirSource`(监控文件系统中的新文件或文件更新),`AvroSource`(接收Avro数据),`ThriftSource`(通过Thrift协议接收数据),以及`JMSSource`(从Java...

    apache-flume-1.6.0-bin.tar.gz.zip

    在解压并配置好 Apache Flume 后,用户需要编辑配置文件(通常位于 `conf/flume.conf`),定义数据流的源(Source)、通道(Channel)和接收器(Sink)。例如,你可以设置一个HTTP源来接收来自Web服务器的日志,一个...

    flume-1.5.2

    在1.5.2版本中,它支持多种内置源,如 Avro Source、Thrift Source 和 HTTP Source 等。 2. **通道(Channels)**:是 Flume 内部用于临时存储事件的组件,它确保数据在传输过程中的持久性和可靠性。常见的通道类型...

    04、日志收集系统Flume-实时计算4-4:flume自定义开发.pptx

    NettyAvroRpcClient 和 ThriftRpcClient 都实现了 RpcClient 接口,用户需要知道目标 Flume Agent 的主机名和端口号来创建客户端实例。 为了提高容错性和可用性,Flume 还提供了 Failover Client。此客户端能够在与...

    hadoop集群配置之————flume安装配置(详细版)

    1. Flume源(Source)接收外部源(如Web服务器)传递的事件,并将事件存储到一个或多个通道(Channel)中。 2. 通道是一个被动存储,保存事件直到它们被Flume接收器使用。 3. 接收器(Sink)从通道中移除事件,并将其放入...

    大数据教程-Flume安装使用实录.pdf

    Flume支持多种源,如Avro Source, Thrift Source, Exec Source等。 - **通道(Channel)**:通道是介于源和接收器之间的存储设施。数据一旦被源捕获,它将被发送到通道中。通道对数据提供事务性支持,保证数据的...

    尚硅谷大数据技术之 Flume 1

    - Source:负责数据的收集,支持多种类型和格式的日志数据,如Avro、Thrift、Exec、JMS等。Source将数据采集后,通过Event发送给Channel。 - Channel:作为数据的临时存储,对采集的数据进行简单缓存。Channel可以...

    flume开发相关工具包

    3. **数据源(Source)**:Flume支持多种数据源,如Avro,Thrift,HTTP,Spooling Directory等。例如,Spooling Directory Source可以监控指定目录下的新文件并读取其内容。 4. **数据接收器(Sink)**:数据接收器...

    大数据采集技术-flume监控httpsources.pdf

    2. 在`test.conf`文件中,你需要定义一个Flume Agent,这个Agent包含了Source、Channel和Sink三个部分。对于HTTP源,可以配置如下: ``` agent.sources = http-source agent.channels = memory-channel agent....

    Flume安装包、安装文档

    Flume提供了多种类型的Source,如Avro Source、Thrift Source、Netcat Source等,可以根据实际需求选择合适的Source。 2. **Channels**:Channels是Flume内部的数据存储机制,用于暂时存储从Source接收到的事件,...

    大数据Flume学习视频-免费

    Flume提供了多种类型的Source,包括Avro Source、Thrift Source、Exec Source等。 - **Channel(通道)**:作为Source和Sink之间的缓冲区,用于临时存储数据。Flume支持Memory Channel、File Channel等多种类型。 - ...

    apache-flume1.8

    Flume 的工作原理基于简单的架构,由源(Source)、通道(Channel)和接收器(Sink)三个主要部分组成。源负责从各种数据源(如网络套接字、syslog、JMS 消息等)收集数据,而通道作为临时存储,用于在源和接收器...

    apache-flume.tar.gz

    例如,Flume 内置了多种源,如 Avro、Thrift、JMS 和 HTTP 等。通道则是一个临时存储区域,用于在源和Sink之间安全地传输数据。Flume 提供了内存通道和文件通道等不同实现,以适应不同的性能和持久性需求。Sink 负责...

    flume+hdfs所需jar.rar

    7. `libthrift-0.x.jar`:Thrift 库,Flume 使用 Thrift 协议与 HDFS 进行通信。 8. `zookeeper-3.x.jar`:ZooKeeper 服务的客户端库,Hadoop 集群管理依赖 ZooKeeper 来协调分布式操作。 在配置 Flume 以将数据...

Global site tag (gtag.js) - Google Analytics