一、Storm trident filter
filter通过返回true和false。来判断是否对信息过滤。
1.1 代码
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"), 1, new Values(1, 2), new Values(4, 1), new Values(3, 0)); spout.setCycle(false); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("a"), new MyFilter()) .each(new Fields("a", "b"), new PrintFilterBolt(),new Fields("")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident_filter", config, topology.build()); }
MyFilter:
import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; public class MyFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 1L; @Override public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1; } }
PrintFilterBolt:
public class PrintFilterBolt extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void execute(TridentTuple tuple, TridentCollector collector) { int firstIndex = tuple.getInteger(0); int secondIndex = tuple.getInteger(1); List<Integer> list = new ArrayList<Integer>(); list.add(firstIndex); list.add(secondIndex); System.out.println("after storm filter opertition change is : " + list.toString()); } }
运行结果:
2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2) 2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]
二、Storm trident function
函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。
2.1 代码
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"), 1, new Values(1, 2, 3), new Values(4, 1, 6), new Values(3, 0, 8)); spout.setCycle(false); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("b"), new MyFunction(), new Fields("d")) .each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(), new Fields("")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident_function", config, topology.build()); }
MyFunction:
public class MyFunction extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } }
PrintFunctionBolt:
public class PrintFunctionBolt extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void execute(TridentTuple tuple, TridentCollector collector) { int firstIndex = tuple.getInteger(0); int secondIndex = tuple.getInteger(1); int threeIndex = tuple.getInteger(2); int fourIndex = tuple.getInteger(3); List<Integer> list = new ArrayList<Integer>(); list.add(firstIndex); list.add(secondIndex); list.add(threeIndex); list.add(fourIndex); System.out.println("after storm function opertition change is : " +list.toString()); } }
运行效果:
2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000 2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2) 2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0] 2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1] 2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]
相关推荐
《Storm Trident API 使用详解》 Storm Trident API 是 Apache Storm 框架中用于构建实时大数据处理应用程序的关键组件。它的核心概念是"Stream",一种无界的数据序列,它被分割成一系列批次(Batch),以便在...
1. **高级语言功能**: Trident支持类似SQL的操作,如连接(Join)、聚合(Aggregation)、分组(Grouping)、函数(Function)和过滤器(Filter)等,简化了实时计算的编程模型。 2. **有状态增量式处理**: Trident...
Struts2+Hibernate+mysql+eclipse在线考试系统_hy4.zip
【Java】基于rbac思想以及ssm框架(导入jar包的方式)的权限管理项目
AndrewNg机器学习对应PythonJupyterNotebook_hy4
玄武是针对影视特效和动画行业的项目流程管理系统,基于图形图像行业流行的Python开发语言,整合现有CG工作流程,通过_hy4
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
基于Nestjs的RBAC权限系统开发实战_hy4
【Python】基于rbac权限分配、动态二级菜单、面包屑、批量操作_pgj
SpringBlade3.0架构核心工具包,SpringBlade是一个由商业级项目升级优化而来的SpringCloud分布_hy4
该项目是一款基于Vue框架的uniapp分销商城小程序设计源码,包含1412个文件,涵盖471个JavaScript文件、292个Vue文件、253个Markdown文件、202个JSON文件、43个映射文件、34个SCSS文件、33个微信小程序样式文件、33个WXML文件、22个PNG图片文件、9个WXS文件。该项目适用于构建分销商城小程序,支持微信小程序平台,并集成了多种编程语言和文件类型,以实现高效和灵活的开发需求。
java教务管理系统教学管理系统,系统页面设计良好、内容丰富、功能齐全,适合用做课设学习。含有源码、数据库文件以及项目资料文档,感_hy4
【Python】四川大学微服务健康每日报自动打卡
设计模式学习笔记_hy5
该项目是一个以C++为主要编程语言的算法设计与实现学习资源,包含190个文件,涵盖131个头文件(.h)、41个C++源文件(.cpp)、12个其他文件(.a)、2个文本文件(.txt)、1个Git忽略文件(.gitignore)、1个Markdown文件(.md)、1个C编译配置文件(.cc)、1个Protocol Buffers描述文件(.proto)。这些文件共同构成了一个全面的学习材料,旨在帮助开发者深入理解算法设计原理及其在C++语言中的实现。
使用C++实现的23种设计模式_hy4
医疗器械检测系统源码分享
分布式事务tx-lcn_hy4
分布式事务demo_hy4