Trident API
partition本地操作,无需网络io
等同于pig的generate
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
等同于pig的filter
mystream.each(new Fields("b", "a"), new MyFilter())
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
}
}
partitionAggregate
等同于pig的combine操作(三种aggregate接口)
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
@@@
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
@@@
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
//最底层的aggregate,每个方法都有collector
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
---------------------
stateQuery and partitionPersist
--------------------------
projection
mystream.project(new Fields("b", "d"))
---------------------------
Repartitioning operations
shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.
partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping
----------------------------
Aggregation operations
mystream.aggregate(new Count(), new Fields("count"))
----------------------------
等同pig group by
Operations on grouped streams
groupBy(new Fields("word"))
--------------------------------
不同于sql的joins,做的是一个batch的join
Merges and joins
Here's an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
相关推荐
《Storm Trident API 使用详解》 Storm Trident API 是 Apache Storm 框架中用于构建实时大数据处理应用程序的关键组件。它的核心概念是"Stream",一种无界的数据序列,它被分割成一系列批次(Batch),以便在...
而Trident是Storm的一个高级API,提供了可靠且精确一次的消息处理语义,常用于大规模实时数据处理任务,如日志分析、网站点击流分析、社交媒体数据处理等。本实战案例将重点介绍如何使用Storm Trident来计算网站的...
Trident-GCD是一个开源项目,它为Apache Storm的Trident API提供了一种集成Google Cloud Datastore的状态管理实现。Trident是Storm的一个高级接口,用于构建复杂的数据处理管道,而Google Cloud Datastore则是一个...
- 掌握Storm Trident API使用,它为高级流处理提供了状态管理和事务处理机制。 - 了解Storm的事务拓扑和状态管理机制,以及它们对运维调优的影响。 7. 实时数据分析: - 利用Storm的实时分析能力进行数据聚合、...
Apache Storm 是一个开源的分布式...总结,Apache Storm-0.9.1 API 参考文档为开发者提供了详细指导,涵盖了从基本概念、组件实现、拓扑构建到集群管理和性能调优的全方位知识,帮助开发者高效地构建实时数据处理系统。
Trident是基于Storm的一个高层次抽象框架,它简化了事务处理和状态管理的过程,为用户提供了一个更易于理解和使用的API接口。 **3.1 Trident的特点** - **Batch处理**:Trident将Tuple以Batch的形式进行处理,提供...
Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 **2. Storm的全面讲解** - **深度解析**:课程不仅覆盖了Storm的基础概念和架构,还深入探讨了其...
"trident-elasticsearch"项目是将这两者结合的产物,它提供了一个Storm Trident的集成层,使得在Storm中处理的数据能够无缝地流入和流出Elasticsearch。 首先,让我们深入了解一下Elasticsearch。Elasticsearch基于...
总结来说,Trident为大规模实时流数据处理提供了高效且可靠的解决方案,通过其高级的API和优化的处理模式,使得处理复杂的数据流变得更加简单,特别适合像雅虎这样需要实时分析用户行为的大公司。在大数据时代,...
5. ** Trident API**:Trident是Storm提供的高级API,它支持精确一次的语义,可以更方便地构建复杂的实时处理任务。 6. **Zookeeper整合**:Storm利用Zookeeper进行集群协调,保证系统的稳定运行。 7. **Java编程*...
Bolt/Trident API 实现 该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>...
9. **进阶主题**:如多语言支持、 Trident API的使用、与其他大数据组件(如Hadoop、Kafka)的集成等。 通过这个压缩包的学习,用户可以从基础到进阶全面了解并掌握Apache Storm,从而在实际工作中有效地利用这一...
这个项目是 Storm's Trident 的游乐场。 在这个项目中,您可以找到我用于柏林的 Trident hackaton @ Big Data Beers #4 的指南: : 在 Demo.java 文件中。 您还可以在示例子包中找到我解释的概念的说明性示例。 ...
5. ** Trident API**:Trident是Storm提供的高级API,用于构建高精度、可扩展的实时数据处理应用程序,它保证了每个数据流的完全准确性和至少一次处理。 6. **与其他技术的集成**:Storm可以轻松集成Hadoop、...
在 Storm 0.9 源码包中,我们可以深入理解其内部工作原理,以及如何利用 Storm 的 API 进行实时流处理应用开发。源码分析对于开发者来说,是提升技能和优化应用的关键步骤。 1. **核心组件** - **Bolt**: Bolt 是 ...
2. **事务处理**:通过 Trident API 提供了可靠的消息处理能力,确保消息的处理一致性。 3. **并发与通信机制**:每个Spout和Bolt可以有多个实例并发运行,通过TCP/IP进行进程间的通信。 **六、Storm应用场景** 1....
JStorm支持动态调整Topology、动态扩缩容、Drainer模式(用于批量处理)、Trident API(支持复杂的状态计算)等高级特性。这些特性使得JStorm在应对各种实时计算场景时更具灵活性。 通过理解和熟练运用上述API,...
5. ** Trident API**:0.9.7版本中包含了Trident,这是一个高级接口,用于构建复杂的、状态ful的处理逻辑,它保证了每个数据流的完全精确一次处理(Exactly-once semantics)。 在解压后的 "apache-storm-0.9.7" ...
例如,0.10.x 版本引入了 Trident API 的改进,而 1.0.0 及以后版本进一步优化了稳定性及易用性。 通过深入学习和理解 Apache Storm 0.9.4 的源码,开发者可以更好地定制化其功能,优化性能,并为构建高性能实时...