`

Storm - Trident

阅读更多
Trident


一、Storm 保证性

1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制

2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理

3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理

二、Storm 保证性实现

1.逐个发送,逐个处理

如果这样处理,则原有的并行处理会变成穿行处理,不可取

2.批量发送,批量处理

如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的

3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式

三、Trident

1.Spout

package com.study.storm.trident.wordcount;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @description
 * 数据来源
 * 模拟批量数据发送
 * <br/>
 * @remark
 * Storm 的保证及实现
 * 1.数据一定被发送
 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送
 * 2.数据只被处理一次
 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理
 * 3.数据被按照一定的顺序处理
 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待
 * 
 * <br/>
 * 
 * Trident 处理批量数据
 * 
 */
public class SentenceSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 2122598284858356171L;

	private SpoutOutputCollector collector = null ;
	
	/**
	 * 模拟批量数据发送
	 * key : name 
	 * value : sentence 
	 */
	private Values [] valuesArray = new Values[] {
			new Values("a","111111111111"),
			new Values("b","222222222222"),
			new Values("c","333333333333"),
			new Values("d","444444444444"),
			new Values("e","555555555555"),
			new Values("f","666666666666"),
			new Values("g","777777777777"),
			new Values("h","888888888888")
	};
	
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector ;
	}

	// 发送的顺序,即数据组合的下标,标识数据发送到哪个位置
	private int index = 0 ;
	
	@Override
	public void nextTuple() {

		if(index >= valuesArray.length){
			return ;
		}
		index = index == valuesArray.length ? 0 : index++ ;
		this.collector.emit(valuesArray[index]);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("name","sentence"));
	}

}





简化实现
package com.study.storm.trident.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.testing.FixedBatchSpout;

public class TridentTopologyDemo {

	public static void main(String[] args) {

		// 相当于原有的 Spout 实现
		@SuppressWarnings("unchecked")
		FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"),
				1,
				new Values("a","111111111111"),
				new Values("b","222222222222"),
				new Values("c","333333333333"),
				new Values("d","444444444444"),
				new Values("e","555555555555"),
				new Values("f","666666666666"),
				new Values("g","777777777777"),
				new Values("h","888888888888"));
		// 是否循环发送,false 不
		tridentSpout.setCycle(false);
		
		TridentTopology topology = new TridentTopology();
		/**
		 *  1.本地过滤器设置
		 */
		// 设置数据源
		Stream initStream = topology.newStream("tridentSpout", tridentSpout);
		// 设置过滤器  -- 过滤name : d 的数据  
		initStream = initStream.each(new Fields("name"),new RemovePartDataFilter());
		// 添加函数,输出字母对应的位置
		initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum"));

		// 设置过滤器  -- 拦截数据并打印
		Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter());
		
		
		
		
		
		
		
		
		//--提交Topology给集群运行
		Config conf = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("MyTopology", conf, topology.build());
		
		//--运行10秒钟后杀死Topology关闭集群
		Utils.sleep(1000 * 10);
		cluster.killTopology("MyTopology");
		cluster.shutdown();
	}

}




package com.study.storm.trident.wordcount;

import java.util.Iterator;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

/**
 * @description 
 * 打印:key 与 value ,fields 与  fields 对应传输的内容
 */
public class PrintFilter extends BaseFilter {

	/**
	 * 
	 */
	private static final long serialVersionUID = 4393484291178519442L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		Fields fields = tuple.getFields();
		Iterator<String> iterator = fields.iterator();
		while(iterator.hasNext()){
			String key = iterator.next();
			Object valueByField = tuple.getValueByField(key);
			System.out.println("fields : "+ key + " values : "+valueByField);
		}
		
		return true;
	}

}


package com.study.storm.trident.wordcount;

import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

/**
 * 过滤name = d 的数据
 * return false 过滤
 * return true  继续传递
 */
public class RemovePartDataFilter extends BaseFilter {

	/**
	 * 
	 */
	private static final long serialVersionUID = 8639858690618579558L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		String stringByField = tuple.getStringByField("name");
		return !stringByField.equals("d");
	}

}


package com.study.storm.trident.wordcount;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class NameIndexFunction extends BaseFunction {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9085021905838331812L;

	static Map<String,Integer> indexMap = new HashMap<String,Integer>();
	static {
		indexMap.put("a", 1);
		indexMap.put("b", 2);
		indexMap.put("c", 3);
		indexMap.put("d", 4);
		indexMap.put("e", 5);
		indexMap.put("f", 6);
		indexMap.put("g", 7);
		indexMap.put("h", 8);
		indexMap.put("i", 9);
	}
	
	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String name = tuple.getStringByField("name");
		collector.emit(new Values(indexMap.get(name)));
	}

}


分享到:
评论

相关推荐

    storm-trident-elasticsearch:基于 Elasticsearch 的 Trident State 实现

    该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 &lt; groupId&gt;com.github.fhuss&lt;/ groupId&gt; &lt; artifactId&gt;storm-elasticsearch ...

    storm-trident-examples:三叉戟API

    这个项目是 Storm's Trident 的游乐场。 在这个项目中,您可以找到我用于柏林的 Trident hackaton @ Big Data ... 包含 hackaton 会话内容的博客文章: ://www.datasalt.com/2013/04/an-storms-trident-api-overview/

    storm-trident:《风暴蓝图》

    三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)

    apache-storm-1.2.3.tar.gz

    在实际开发中,你可能还需要了解 Storm 的关键概念,如 Trident(一种高级接口,提供更强大的状态管理和事务支持),以及 Storm 的容错机制,比如 tuple 重试和故障恢复策略。 总之,Apache Storm 是一个强大的工具...

    storm-trident-example

    风暴三叉戟示例 Clojure 库旨在......好吧,那部分取决于您。 用法 整我 执照 版权所有 :copyright: 2014 FIXME 根据 Eclipse 公共许可证分发 1.0 版或(由您选择)任何更高版本。

    Storm流计算项目:1号店电商实时数据分析系统-23.项目1-地区销售额-Trident代码开发一.pptx

    【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...

    storm-trident-partitionPersist:storm-trident demo,此demo应用场景 是大数据hash分组,本地组合后把数据放入State存储

    ###必读把大数进行分片,根据数据中某个字段分组Origin_Stream.partitionAggregate(new Fields("a","b") , new Test(),new Fields("A1","B1")).partitionPersist(new LocationDBFactory(), new Fields("A1","B1"), ...

    apache-storm-2.1.0.tar.gz

    8. **Trident**:Trident是Storm提供的一种高级抽象,用于构建强一致性的数据处理应用。它提供了类似于数据库事务的保证,使得处理结果在分布式环境中也能保持一致性。 在安装和部署Apache Storm 2.1.0时,你需要...

    Storm流计算项目:1号店电商实时数据分析系统-24.项目1-地区销售额-Trident代码开发二.pptx

    Trident是Storm的一个高级接口,用于构建强一致性的流处理应用。在项目中,Trident被用来实现更复杂的处理逻辑,如分区和状态管理。在"项目1-地区销售额-Trident代码开发一"中,初步建立了Trident拓扑结构,而在...

    apache-storm-1.0.3.tar.gz

    此外,Storm还支持Trident API,提供更高级别的抽象,保证每个事件的精确一次处理。 在实际应用中,Apache Storm常与Hadoop、Kafka等大数据技术结合,构建复杂的数据处理管道。例如,你可以从Kafka中读取数据流,...

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    例如在项目1中,通过Trident API实现的Spout可以提高数据处理的效率和一致性,同时利用HBase的State功能存储中间状态,保证数据的准确无误。项目2和项目3进一步展示了如何利用storm-kafka处理更复杂的数据分析任务,...

    apache-storm-0.9.7.tar.gz

    5. ** Trident API**:0.9.7版本中包含了Trident,这是一个高级接口,用于构建复杂的、状态ful的处理逻辑,它保证了每个数据流的完全精确一次处理(Exactly-once semantics)。 在解压后的 "apache-storm-0.9.7" ...

    apache-storm-1.1.0.tar.gz

    6. ** Trident**:Trident是Storm提供的高级API,用于构建强一致性的数据处理应用。它通过细粒度的事务模型确保数据处理的准确性和可靠性。 7. ** Zookeeper**:Apache Storm依赖Zookeeper进行集群协调,存储元数据...

    apache-storm-1.2.3.rar

    7. **Trident**:Trident是Storm提供的高级抽象,用于构建强一致性的分布式计算系统。它将数据流分割为一系列小的、确定性的事务,从而提供高准确性和低延迟。 在Apache Storm 1.2.3中,开发者可能会关注以下特性:...

    Apache Storm-0.9.1 API 参考文档

    Trident是Storm的一个高级接口,它提供了一种更抽象的方式来定义拓扑,使代码更简洁。Trident将复杂的数据流操作封装为一系列简单的步骤,如map、filter、reduce等。每个Trident操作都保证了每条记录的完全处理,...

    Storm Trident实战之计算网站PV.rar

    而Trident是Storm的一个高级API,提供了可靠且精确一次的消息处理语义,常用于大规模实时数据处理任务,如日志分析、网站点击流分析、社交媒体数据处理等。本实战案例将重点介绍如何使用Storm Trident来计算网站的...

    Storm_Trident

    **Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...

    Storm - Distributed and fault-tolerant realtime computation

    Storm保证了每个消息至少被处理一次,而 Trident(Storm的一个抽象层)则提供了每个消息只被处理一次的语义。 在容错方面,Storm通过定期发送心跳信号,检测工作节点的故障,并自动重分配任务到其他节点上,以确保...

    大数据分析架构师顶级培训课程storm课程 Trident理论与应用 Trident基础理论与实战 共35页.pptx

    ### 大数据开发高级就业指导课程——Storm及Trident理论与实战 #### 一、Storm并发机制 在Storm中,为了提高数据处理的性能和效率,设计了一套完整的并发机制。这一机制涉及到Topology的组件配置、并发度设置等多...

    storm-starter:来自 github 的 Storm starter,添加我的新类

    5. ** Trident API**:除了基本API外,Storm还提供了Trident,这是一个高级接口,支持状态管理和精确一次处理语义,适合需要强一致性的应用场景。 6. **配置和优化**:在实际使用中,你还需要了解如何根据需求调整...

Global site tag (gtag.js) - Google Analytics