`
农村外出务工男JAVA
  • 浏览: 105739 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm trident实战 filter,function的使用

阅读更多

一、Storm trident filter

      filter通过返回true和false。来判断是否对信息过滤。

     1.1 代码

	public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),
				1, new Values(1, 2), new Values(4, 1),
				new Values(3, 0));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("a"), new MyFilter())
				.each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_filter", config,
				topology.build());
	}

    MyFilter:

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

public class MyFilter extends BaseFilter {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		return tuple.getInteger(0) == 1;
	}
	
}

    PrintFilterBolt:

public class PrintFilterBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		System.out.println("after storm filter opertition change is : "
				+ list.toString());
	}

}

   运行结果:

2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]

二、Storm trident function

       函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。

    2.1 代码

public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),
				1, new Values(1, 2, 3), new Values(4, 1, 6),
				new Values(3, 0, 8));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("b"), new MyFunction(), new Fields("d"))
				.each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),
						new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_function", config,
				topology.build());
	}
	

   MyFunction:

public class MyFunction extends BaseFunction {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }

}

   PrintFunctionBolt:

public class PrintFunctionBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		int threeIndex = tuple.getInteger(2);
		int fourIndex = tuple.getInteger(3);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		list.add(threeIndex);
		list.add(fourIndex);
		System.out.println("after storm function opertition change is : " +list.toString());
	}

}

    运行效果:

2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000
2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]
2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]
1
1
分享到:
评论

相关推荐

    Storm Trident实战之计算网站PV.rar

    本实战案例将重点介绍如何使用Storm Trident来计算网站的页面浏览量(PV,Page View)。 页面浏览量是衡量一个网站受欢迎程度的重要指标,通常通过记录用户对各个页面的访问次数来计算。在传统的批处理场景下,这...

    Storm Trident API 使用详解.docx

    《Storm Trident API 使用详解》 Storm Trident API 是 Apache Storm 框架中用于构建实时大数据处理应用程序的关键组件。它的核心概念是"Stream",一种无界的数据序列,它被分割成一系列批次(Batch),以便在...

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    基于Storm流计算天猫双十一作战室项目实战

    - **Storm Trident项目**:特别值得一提的是,其中一个项目是完全使用Storm Trident开发完成的。Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 *...

    Storm_Trident

    - ** Trident Operation**:Trident定义了一系列操作,如map、filter、each、aggregate等,用于构建复杂的处理逻辑。 ### 3. Trident的处理模型 Trident的处理流程通常包括以下步骤: 1. **分区(Partitioning)*...

    Storm流计算之项目开发视频教程

    Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...

    大数据分析架构师顶级培训课程storm课程 Trident理论与应用 Trident基础理论与实战 共35页.pptx

    ### 大数据开发高级就业指导课程——Storm及Trident理论与实战 #### 一、Storm并发机制 在Storm中,为了提高数据处理的性能和效率,设计了一套完整的并发机制。这一机制涉及到Topology的组件配置、并发度设置等多...

    Storm实战:构建大数据实时计算

    阿里巴巴集团数据平台事业部商家数据业务部正是最早使用Storm的技术团队之一。  《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,...

    storm企业应用 实战 运维和调优

    - 掌握Storm Trident API使用,它为高级流处理提供了状态管理和事务处理机制。 - 了解Storm的事务拓扑和状态管理机制,以及它们对运维调优的影响。 7. 实时数据分析: - 利用Storm的实时分析能力进行数据聚合、...

    trident-tutorial:实用的Storm Trident教程

    本教程的结构浏览Part * .java,了解Trident的基础知识使用Skeleton.java实现自己的拓扑,或者看看其他示例├── environment ------ Vagrant resources to simulate a Storm cluster locally ├── src └── ...

    Storm 实战:构建大数据实时计算 PDF带书签完整版

    6. **Zookeeper协调**:Storm使用Zookeeper进行集群管理和故障恢复,Zookeeper是一个分布式协调服务,用于维护配置信息、命名服务、集群状态等。 7. ** Trident API**:Trident是Storm提供的一种高级API,它以更...

    trident-examples:用 Storm Trident 编写的一组应用程序

    三叉戟的例子一组用 Storm Trident 编写的应用程序。应用用法建造$ git clone git@github.com:mayconbordin/trident-examples.git$ cd trident-examples/$ mvn -P &lt; profile&gt; package 使用local配置文件以本地模式...

    storm_trident_state

    storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。

    Storm实战构建大数据实时计算

    《Storm实战构建大数据实时计算》是一本专注于大数据领域实时处理技术的专著,主要围绕Apache Storm这一开源流处理系统展开。Storm被广泛应用于实时数据分析、在线机器学习、持续集成、实时网站仪表板等多个场景,其...

    Storm流计算项目:1号店电商实时数据分析系统-01.Storm项目实战课程大纲.pptx

    【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...

    Storm实战构建大数据实时计算( 带书签目录 高清完整版)

    本书旨在帮助读者理解和掌握如何使用Storm构建实时数据处理系统,实现高效的数据分析。 1. **Apache Storm简介** Apache Storm是Twitter开源的分布式实时计算系统,可以持续处理无界数据流。与Hadoop等批处理系统...

    trident-elasticsearch:ElasticSearch 的 Storm Trident 集成层

    "trident-elasticsearch"项目将这两个强大的工具结合在一起,实现了在Apache Storm中使用Trident操作Elasticsearch的功能。这个集成层允许开发者在处理实时数据流时,直接将结果存入Elasticsearch,或者从Elastic...

    storm-trident:《风暴蓝图》

    三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)

    基于Trident构建大规模实时流数据处理系统.pdf

    1. **高级语言功能**: Trident支持类似SQL的操作,如连接(Join)、聚合(Aggregation)、分组(Grouping)、函数(Function)和过滤器(Filter)等,简化了实时计算的编程模型。 2. **有状态增量式处理**: Trident...

Global site tag (gtag.js) - Google Analytics