- 浏览: 211667 次
- 性别:
- 来自: 哈尔滨
文章分类
- 全部博客 (267)
- java.lang (8)
- 问题汇总 (21)
- 异常记录 (20)
- 功能实现 (19)
- 面试总结 (25)
- 技巧总结 (8)
- 常用代码 (4)
- 编程习惯 (3)
- 编码规则 (3)
- java.util (10)
- java.io (1)
- JavaWeb (9)
- MySQL (16)
- SVN (3)
- MyBatis (11)
- Velocity (7)
- 其他知识 (10)
- 人生哲理 (1)
- 人生故事 (1)
- 自我感悟 (1)
- shiro (3)
- 基础知识 (0)
- 问题总结 (1)
- Spring 标签 (1)
- Spring (3)
- 点滴生活 (1)
- DOS (1)
- CAS (4)
- Linux (9)
- Storm (6)
- Shell (1)
- regex (1)
- Collection (4)
- poi (1)
- 经典语句 (1)
- NIO (5)
- concurrent (14)
- RPC (1)
- zookeeper (3)
- 待整理 (2)
- Hadoop (9)
- RabbitMq (2)
- flume (1)
- hive (7)
- hbase (4)
- kafka (1)
- scala (1)
- GC (0)
- java.util.concurrent.atomic (1)
- java.lang.ref (6)
- JVM (2)
- algorithm (1)
- conception (1)
- java key word (1)
- sun.misc (1)
最新评论
Trident
一、Storm 保证性
1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制
2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理
3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理
二、Storm 保证性实现
1.逐个发送,逐个处理
如果这样处理,则原有的并行处理会变成穿行处理,不可取
2.批量发送,批量处理
如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的
3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式
三、Trident
1.Spout
package com.study.storm.trident.wordcount; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * @description * 数据来源 * 模拟批量数据发送 * <br/> * @remark * Storm 的保证及实现 * 1.数据一定被发送 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送 * 2.数据只被处理一次 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理 * 3.数据被按照一定的顺序处理 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待 * * <br/> * * Trident 处理批量数据 * */ public class SentenceSpout extends BaseRichSpout { /** * */ private static final long serialVersionUID = 2122598284858356171L; private SpoutOutputCollector collector = null ; /** * 模拟批量数据发送 * key : name * value : sentence */ private Values [] valuesArray = new Values[] { new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888") }; @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector ; } // 发送的顺序,即数据组合的下标,标识数据发送到哪个位置 private int index = 0 ; @Override public void nextTuple() { if(index >= valuesArray.length){ return ; } index = index == valuesArray.length ? 0 : index++ ; this.collector.emit(valuesArray[index]); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","sentence")); } }
简化实现
package com.study.storm.trident.wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.trident.Stream; import storm.trident.TridentTopology; import storm.trident.testing.FixedBatchSpout; public class TridentTopologyDemo { public static void main(String[] args) { // 相当于原有的 Spout 实现 @SuppressWarnings("unchecked") FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"), 1, new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888")); // 是否循环发送,false 不 tridentSpout.setCycle(false); TridentTopology topology = new TridentTopology(); /** * 1.本地过滤器设置 */ // 设置数据源 Stream initStream = topology.newStream("tridentSpout", tridentSpout); // 设置过滤器 -- 过滤name : d 的数据 initStream = initStream.each(new Fields("name"),new RemovePartDataFilter()); // 添加函数,输出字母对应的位置 initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum")); // 设置过滤器 -- 拦截数据并打印 Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter()); //--提交Topology给集群运行 Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology", conf, topology.build()); //--运行10秒钟后杀死Topology关闭集群 Utils.sleep(1000 * 10); cluster.killTopology("MyTopology"); cluster.shutdown(); } }
package com.study.storm.trident.wordcount; import java.util.Iterator; import backtype.storm.tuple.Fields; import storm.trident.operation.BaseFilter; import storm.trident.tuple.TridentTuple; /** * @description * 打印:key 与 value ,fields 与 fields 对应传输的内容 */ public class PrintFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 4393484291178519442L; @Override public boolean isKeep(TridentTuple tuple) { Fields fields = tuple.getFields(); Iterator<String> iterator = fields.iterator(); while(iterator.hasNext()){ String key = iterator.next(); Object valueByField = tuple.getValueByField(key); System.out.println("fields : "+ key + " values : "+valueByField); } return true; } }
package com.study.storm.trident.wordcount; import storm.trident.operation.BaseFilter; import storm.trident.tuple.TridentTuple; /** * 过滤name = d 的数据 * return false 过滤 * return true 继续传递 */ public class RemovePartDataFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 8639858690618579558L; @Override public boolean isKeep(TridentTuple tuple) { String stringByField = tuple.getStringByField("name"); return !stringByField.equals("d"); } }
package com.study.storm.trident.wordcount; import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class NameIndexFunction extends BaseFunction { /** * */ private static final long serialVersionUID = 9085021905838331812L; static Map<String,Integer> indexMap = new HashMap<String,Integer>(); static { indexMap.put("a", 1); indexMap.put("b", 2); indexMap.put("c", 3); indexMap.put("d", 4); indexMap.put("e", 5); indexMap.put("f", 6); indexMap.put("g", 7); indexMap.put("h", 8); indexMap.put("i", 9); } @Override public void execute(TridentTuple tuple, TridentCollector collector) { String name = tuple.getStringByField("name"); collector.emit(new Values(indexMap.get(name))); } }
相关推荐
该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>storm-elasticsearch ...
这个项目是 Storm's Trident 的游乐场。 在这个项目中,您可以找到我用于柏林的 Trident hackaton @ Big Data ... 包含 hackaton 会话内容的博客文章: ://www.datasalt.com/2013/04/an-storms-trident-api-overview/
三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)
在实际开发中,你可能还需要了解 Storm 的关键概念,如 Trident(一种高级接口,提供更强大的状态管理和事务支持),以及 Storm 的容错机制,比如 tuple 重试和故障恢复策略。 总之,Apache Storm 是一个强大的工具...
风暴三叉戟示例 Clojure 库旨在......好吧,那部分取决于您。 用法 整我 执照 版权所有 :copyright: 2014 FIXME 根据 Eclipse 公共许可证分发 1.0 版或(由您选择)任何更高版本。
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
###必读把大数进行分片,根据数据中某个字段分组Origin_Stream.partitionAggregate(new Fields("a","b") , new Test(),new Fields("A1","B1")).partitionPersist(new LocationDBFactory(), new Fields("A1","B1"), ...
8. **Trident**:Trident是Storm提供的一种高级抽象,用于构建强一致性的数据处理应用。它提供了类似于数据库事务的保证,使得处理结果在分布式环境中也能保持一致性。 在安装和部署Apache Storm 2.1.0时,你需要...
Trident是Storm的一个高级接口,用于构建强一致性的流处理应用。在项目中,Trident被用来实现更复杂的处理逻辑,如分区和状态管理。在"项目1-地区销售额-Trident代码开发一"中,初步建立了Trident拓扑结构,而在...
此外,Storm还支持Trident API,提供更高级别的抽象,保证每个事件的精确一次处理。 在实际应用中,Apache Storm常与Hadoop、Kafka等大数据技术结合,构建复杂的数据处理管道。例如,你可以从Kafka中读取数据流,...
例如在项目1中,通过Trident API实现的Spout可以提高数据处理的效率和一致性,同时利用HBase的State功能存储中间状态,保证数据的准确无误。项目2和项目3进一步展示了如何利用storm-kafka处理更复杂的数据分析任务,...
5. ** Trident API**:0.9.7版本中包含了Trident,这是一个高级接口,用于构建复杂的、状态ful的处理逻辑,它保证了每个数据流的完全精确一次处理(Exactly-once semantics)。 在解压后的 "apache-storm-0.9.7" ...
6. ** Trident**:Trident是Storm提供的高级API,用于构建强一致性的数据处理应用。它通过细粒度的事务模型确保数据处理的准确性和可靠性。 7. ** Zookeeper**:Apache Storm依赖Zookeeper进行集群协调,存储元数据...
7. **Trident**:Trident是Storm提供的高级抽象,用于构建强一致性的分布式计算系统。它将数据流分割为一系列小的、确定性的事务,从而提供高准确性和低延迟。 在Apache Storm 1.2.3中,开发者可能会关注以下特性:...
Trident是Storm的一个高级接口,它提供了一种更抽象的方式来定义拓扑,使代码更简洁。Trident将复杂的数据流操作封装为一系列简单的步骤,如map、filter、reduce等。每个Trident操作都保证了每条记录的完全处理,...
而Trident是Storm的一个高级API,提供了可靠且精确一次的消息处理语义,常用于大规模实时数据处理任务,如日志分析、网站点击流分析、社交媒体数据处理等。本实战案例将重点介绍如何使用Storm Trident来计算网站的...
**Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...
Storm保证了每个消息至少被处理一次,而 Trident(Storm的一个抽象层)则提供了每个消息只被处理一次的语义。 在容错方面,Storm通过定期发送心跳信号,检测工作节点的故障,并自动重分配任务到其他节点上,以确保...
### 大数据开发高级就业指导课程——Storm及Trident理论与实战 #### 一、Storm并发机制 在Storm中,为了提高数据处理的性能和效率,设计了一套完整的并发机制。这一机制涉及到Topology的组件配置、并发度设置等多...
5. ** Trident API**:除了基本API外,Storm还提供了Trident,这是一个高级接口,支持状态管理和精确一次处理语义,适合需要强一致性的应用场景。 6. **配置和优化**:在实际使用中,你还需要了解如何根据需求调整...