Thrift IDL
Flume Thrift IDL在client包里面,定义如下:
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.
Thrift Service
Flume的Source分两种:
?实现PollableSource接口
通过SinkRunner管理Source
?实现EventDrivenSource接口
可以自己接受数据、发送到channel。比如ThriftSource
Flume Thrift Service的实现类在core包
public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = "threads";
public static final String CONFIG_BIND = "bind";
public static final String CONFIG_PORT = "port";
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
//创建工作者线程池
...
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
//ThriftSourceProtocol是Flume Thrift Service的真正实现
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
...
}Flume Thrift Service真正的实现类是内部类ThriftSourceHandler
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
//传给channel
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn("Thrift source " + getName() + " could not append events " +
"to the channel.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn("Thrift source %s could not append events to the " +
"channel.", getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
分享到:
相关推荐
1. 支持多种数据源:包括HTTP、JDBC、Avro、Thrift等,允许Flume从各种来源收集数据。 2. 支持多种接收器:可以将数据写入HDFS、HBase、Cassandra、Solr等,提供灵活的数据存储选择。 3. 支持多级路由和转换:通过...
在 Flume 中使用 Thrift Source,可以使得外部应用程序(如 Python)能够将数据推送到 Flume。Python 应用程序将扮演 Flume Source 的角色,通过 Thrift 协议与 Flume 进行通信。Thrift 提供了 IDL(接口定义语言)...
1. **Source**:Flume提供了多种类型的Source,如Avro Source、Thrift Source、HTTP Source等,用于从各种数据源接收数据。 2. **Channel**:Channel是数据的缓冲区,可以是内存型或文件型,确保数据在传输过程中的...
AvroSource 支持通过 Avro 协议从外部系统接收数据,ThriftSource 使用 Thrift 协议,JDBCSource 则可以从数据库中抽取数据。 2. **Channels**: 提供多种数据存储方式,如 MemoryChannel(内存存储)、FileChannel...
1. **Sources**: Flume 提供了多种 Sources,如 HTTP、Avro、Thrift、JDBC 等,用于从不同来源收集数据。例如,HTTP Source 可以接收 Web 服务器的日志,Avro Source 可以从其他 Flume 实例接收数据。 2. **...
Flume-chd支持多种数据源,如HTTP、Avro、Thrift、Kafka等。例如,你可以配置一个Source来监听网络端口,接收来自Web服务器的日志数据。 6. **数据通道(Channels)** Channels作为数据的临时存储,可以是内存...
3. **Flume Source类型**:Flume支持多种源类型,包括`TaildirSource`(监控文件系统中的新文件或文件更新),`AvroSource`(接收Avro数据),`ThriftSource`(通过Thrift协议接收数据),以及`JMSSource`(从Java...
在解压并配置好 Apache Flume 后,用户需要编辑配置文件(通常位于 `conf/flume.conf`),定义数据流的源(Source)、通道(Channel)和接收器(Sink)。例如,你可以设置一个HTTP源来接收来自Web服务器的日志,一个...
在1.5.2版本中,它支持多种内置源,如 Avro Source、Thrift Source 和 HTTP Source 等。 2. **通道(Channels)**:是 Flume 内部用于临时存储事件的组件,它确保数据在传输过程中的持久性和可靠性。常见的通道类型...
NettyAvroRpcClient 和 ThriftRpcClient 都实现了 RpcClient 接口,用户需要知道目标 Flume Agent 的主机名和端口号来创建客户端实例。 为了提高容错性和可用性,Flume 还提供了 Failover Client。此客户端能够在与...
1. Flume源(Source)接收外部源(如Web服务器)传递的事件,并将事件存储到一个或多个通道(Channel)中。 2. 通道是一个被动存储,保存事件直到它们被Flume接收器使用。 3. 接收器(Sink)从通道中移除事件,并将其放入...
Flume支持多种源,如Avro Source, Thrift Source, Exec Source等。 - **通道(Channel)**:通道是介于源和接收器之间的存储设施。数据一旦被源捕获,它将被发送到通道中。通道对数据提供事务性支持,保证数据的...
- Source:负责数据的收集,支持多种类型和格式的日志数据,如Avro、Thrift、Exec、JMS等。Source将数据采集后,通过Event发送给Channel。 - Channel:作为数据的临时存储,对采集的数据进行简单缓存。Channel可以...
3. **数据源(Source)**:Flume支持多种数据源,如Avro,Thrift,HTTP,Spooling Directory等。例如,Spooling Directory Source可以监控指定目录下的新文件并读取其内容。 4. **数据接收器(Sink)**:数据接收器...
2. 在`test.conf`文件中,你需要定义一个Flume Agent,这个Agent包含了Source、Channel和Sink三个部分。对于HTTP源,可以配置如下: ``` agent.sources = http-source agent.channels = memory-channel agent....
Flume提供了多种类型的Source,如Avro Source、Thrift Source、Netcat Source等,可以根据实际需求选择合适的Source。 2. **Channels**:Channels是Flume内部的数据存储机制,用于暂时存储从Source接收到的事件,...
Flume提供了多种类型的Source,包括Avro Source、Thrift Source、Exec Source等。 - **Channel(通道)**:作为Source和Sink之间的缓冲区,用于临时存储数据。Flume支持Memory Channel、File Channel等多种类型。 - ...
Flume 的工作原理基于简单的架构,由源(Source)、通道(Channel)和接收器(Sink)三个主要部分组成。源负责从各种数据源(如网络套接字、syslog、JMS 消息等)收集数据,而通道作为临时存储,用于在源和接收器...
例如,Flume 内置了多种源,如 Avro、Thrift、JMS 和 HTTP 等。通道则是一个临时存储区域,用于在源和Sink之间安全地传输数据。Flume 提供了内存通道和文件通道等不同实现,以适应不同的性能和持久性需求。Sink 负责...
7. `libthrift-0.x.jar`:Thrift 库,Flume 使用 Thrift 协议与 HDFS 进行通信。 8. `zookeeper-3.x.jar`:ZooKeeper 服务的客户端库,Hadoop 集群管理依赖 ZooKeeper 来协调分布式操作。 在配置 Flume 以将数据...