`

ES-Hadoop学习笔记-Storm交互

 
阅读更多

elasticsearch-Hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。

依赖版本信息:

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>


Strom的extlib目录下jar包


 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.Map;  
  2.   
  3. import org.apache.storm.task.OutputCollector;  
  4. import org.apache.storm.task.TopologyContext;  
  5. import org.apache.storm.topology.OutputFieldsDeclarer;  
  6. import org.apache.storm.topology.base.BaseRichBolt;  
  7. import org.apache.storm.tuple.Fields;  
  8. import org.apache.storm.tuple.Tuple;  
  9. import org.apache.storm.tuple.Values;  
  10.   
  11. public class HandleBolt extends BaseRichBolt {  
  12.   
  13.     private static final long serialVersionUID = 1L;  
  14.   
  15.     private OutputCollector collector = null;  
  16.       
  17.     @SuppressWarnings("rawtypes")  
  18.     @Override  
  19.     public void prepare(Map stormConf, TopologyContext context,  
  20.             OutputCollector collector) {  
  21.         this.collector = collector;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void execute(Tuple input) {  
  26.         String name = "NA";  
  27.         if (input.contains("name")) {  
  28.             name = input.getStringByField("name");  
  29.         }  
  30.         String phone = "NA";  
  31.         if (input.contains("phone")) {  
  32.             phone = input.getStringByField("phone");  
  33.         }  
  34.         String rcall = "NA";  
  35.         if (input.contains("rcall")) {  
  36.             rcall = input.getStringByField("rcall");  
  37.             rcall = null == rcall || "null".equals(rcall) ? "NA" : rcall;  
  38.         }  
  39.         String address = "NA";  
  40.         if (input.contains("address")) {  
  41.             address = input.getStringByField("address");  
  42.             address = null == address || "null".equals(address) ? "NA" : address;  
  43.         }  
  44.         String email = "NA";  
  45.         if (input.contains("email")) {  
  46.             email = input.getStringByField("email");  
  47.             email = null == email || "null".equals(email) ? "NA" : email;  
  48.         }  
  49.         String idCard = "NA";  
  50.         if (input.contains("idCard")) {  
  51.             idCard = input.getStringByField("idCard");  
  52.             idCard = null == idCard || "null".equals(idCard) ? "NA" : idCard;  
  53.         }  
  54.         this.collector.emit(new Values(name, phone, rcall, address, email, idCard));  
  55.         this.collector.ack(input);  
  56.     }  
  57.   
  58.     @Override  
  59.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  60.         declarer.declare(new Fields("name""phone""rcal""address""email""idCard"));  
  61.     }  
  62.   
  63. }  

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.hdfs.bolt.HdfsBolt;  
  8. import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;  
  9. import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;  
  10. import org.apache.storm.hdfs.bolt.format.FileNameFormat;  
  11. import org.apache.storm.hdfs.bolt.format.RecordFormat;  
  12. import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;  
  13. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;  
  14. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;  
  15. import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;  
  16. import org.apache.storm.hdfs.bolt.sync.SyncPolicy;  
  17. import org.apache.storm.starter.bolt.PrinterBolt;  
  18. import org.apache.storm.topology.TopologyBuilder;  
  19. import org.apache.storm.utils.Utils;  
  20.   
  21. public class ES2StormTopology {  
  22.   
  23.     private static final String TOPOLOGY_NAME = "es-storm-topology";  
  24.       
  25.     public static void main(String[] args) {  
  26.         if (args.length != 1) {  
  27.             System.exit(0);  
  28.         }  
  29.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  30.           
  31.         TopologyBuilder builder = new TopologyBuilder();  
  32.         String target = "operator/telecom";  
  33.         String query = "?q=*";  
  34.         Map<Object, Object> configuration = new HashMap<Object, Object>();  
  35.         configuration.put("es.nodes""192.168.10.20:9200");  
  36.         configuration.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  37.         configuration.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  38.         builder.setSpout("es-storm-spout"new ESSpout(target, query, configuration), 1);  
  39.           
  40.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  41.           
  42.         builder.setBolt("storm-handle-bolt"new HandleBolt()).shuffleGrouping("es-storm-spout");  
  43.           
  44.         RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(":");  
  45.         SyncPolicy syncPolicy = new CountSyncPolicy(10);  
  46.         FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);  
  47.         FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/")  
  48.                 .withPrefix("es_").withExtension(".log");  
  49.         HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://centos.host1:9000")  
  50.                 .withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)  
  51.                 .withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);  
  52.         builder.setBolt("storm-hdfs-bolt", hdfsBolt).globalGrouping("storm-handle-bolt");  
  53.           
  54.         Config config = new Config();  
  55.         config.setDebug(true);  
  56.         if (isCluster) {  
  57.             try {  
  58.                 config.setNumWorkers(3);  
  59.                 StormSubmitter.submitTopologyWithProgressBar(  
  60.                         TOPOLOGY_NAME, config, builder.createTopology());  
  61.             } catch (Exception e) {  
  62.                 e.printStackTrace();  
  63.             }  
  64.         } else {  
  65.             LocalCluster cluster = new LocalCluster();  
  66.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  67.             Utils.sleep(100000);  
  68.             cluster.killTopology(TOPOLOGY_NAME);  
  69.             cluster.shutdown();  
  70.         }  
  71.           
  72.     }  
  73.       
  74. }  

 

注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.ES2StormTopology false

 

 

 

[java] view plain copy
 
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.starter.bolt.PrinterBolt;  
  8. import org.apache.storm.topology.TopologyBuilder;  
  9. import org.apache.storm.utils.Utils;  
  10. import org.platform.storm.elasticsearch.bolt.ESBolt;  
  11. import org.platform.storm.elasticsearch.spout.ESSpout;  
  12.   
  13. public class Storm2ESTopology {  
  14.   
  15.     private static final String TOPOLOGY_NAME = "storm-es-topology";  
  16.       
  17.     public static void main(String[] args) {  
  18.         if (args.length != 1) {  
  19.             System.exit(0);  
  20.         }  
  21.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  22.           
  23.         TopologyBuilder builder = new TopologyBuilder();  
  24.           
  25.         String target = "operator/telecom";  
  26.         String query = "?q=*";  
  27.         Map<Object, Object> spoutConf = new HashMap<Object, Object>();  
  28.         spoutConf.put("es.nodes""192.168.10.20:9200");  
  29.         spoutConf.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  30.         spoutConf.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  31.         builder.setSpout("es-storm-spout"new ESSpout(target, query, spoutConf), 1);  
  32.           
  33.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  34.           
  35.         Map<Object, Object> boltConf = new HashMap<Object, Object>();  
  36.         boltConf.put("es.nodes""192.168.10.20:9200");  
  37.         boltConf.put("es.index.auto.create""true");  
  38.         boltConf.put("es.ser.writer.bytes.class""org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter");  
  39.         //boltConf.put("es.input.json", "true");  
  40.         builder.setBolt("storm-es-bolt"new ESBolt("data/telecom", boltConf))  
  41.             .globalGrouping("es-storm-spout");  
  42.           
  43.         Config config = new Config();  
  44.         config.setDebug(true);  
  45.         if (isCluster) {  
  46.             try {  
  47.                 config.setNumWorkers(3);  
  48.                 StormSubmitter.submitTopologyWithProgressBar(  
  49.                         TOPOLOGY_NAME, config, builder.createTopology());  
  50.             } catch (Exception e) {  
  51.                 e.printStackTrace();  
  52.             }  
  53.         } else {  
  54.             LocalCluster cluster = new LocalCluster();  
  55.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  56.             Utils.sleep(100000);  
  57.             cluster.killTopology(TOPOLOGY_NAME);  
  58.             cluster.shutdown();  
  59.         }  
  60.           
  61.     }  
  62.       
  63. }  

 

 

注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.Storm2ESTopology false

 

文献出自:http://blog.csdn.net/fighting_one_piece/article/details/52228641

分享到:
评论

相关推荐

    大数据服务框架学习笔记.zip

    "大数据服务框架学习笔记.zip"很可能是包含一系列关于大数据服务框架的学习资料,如Hadoop、Spark、Flink等主流框架的讲解。下面,我们将深入探讨这些关键组件和框架。 首先,Hadoop是大数据处理的基础框架,由...

    大数据图标大全.docx

    12. **Logstash**: 数据收集、处理和转发工具,通常与Elasticsearch和Kibana一起构建ELK日志分析栈。 13. **Kibana**: 数据可视化工具,用于探索和展示Elasticsearch中的数据。 14. **Ranger**: 集中式安全管理...

    大数据、数据分析领域工具笔记

    除了上述工具,大数据生态系统还包括Elasticsearch(搜索和分析引擎)、Kafka(流处理平台)、Hue(Hadoop界面工具)等,它们共同构成了大数据处理的完整链条。 总结,这份笔记内容广泛,不仅介绍了大数据存储和...

    基于Simulink的风火水储联合调频系统中储能SOC对ACE影响的技术分析

    内容概要:本文详细探讨了在Simulink环境中构建的风火水储联合调频系统中,储能系统的荷电状态(SOC)对区域控制偏差(ACE)的影响。文中通过具体案例和MATLAB代码展示了储能系统在不同SOC水平下的表现及其对系统稳定性的作用。同时,文章比较了储能单独调频与风火水储联合调频的效果,强调了储能系统在应对风电波动性和提高系统响应速度方面的重要作用。此外,作者提出了针对SOC变化率的参数整定方法以及多电源协同工作的优化策略,旨在减少ACE波动并确保系统稳定运行。 适合人群:从事电力系统调频研究的专业人士,尤其是熟悉Simulink仿真工具的研究人员和技术人员。 使用场景及目标:适用于希望深入了解储能系统在电力系统调频中作用的研究者和技术人员,目标是通过合理的SOC管理和多电源协同工作,优化调频效果,提高系统稳定性。 其他说明:文章提供了详细的MATLAB代码片段,帮助读者更好地理解和应用所讨论的概念。同时,文中提到的实际案例和仿真结果为理论分析提供了有力支持。

    欧姆龙PLC NJ中大型程序案例:结构化与面向对象编程的深度融合及应用

    内容概要:本文深入探讨了欧姆龙PLC NJ系列中大型程序中结构化编程与面向对象编程的结合及其应用。首先介绍了结构化编程作为程序框架的基础,通过功能块(FB)实现清晰的程序结构和流程控制。接着阐述了面向对象编程的理念,将现实世界的对象映射到程序中,利用类的概念实现模块化和可扩展性。两者结合提高了程序的容错率,增强了程序的稳定性和可维护性。文中通过多个实际案例展示了如何在工业自动化领域中应用这两种编程方法,如电机控制、设备类的创建、异常处理机制、接口实现多态性、配方管理和报警处理等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些希望提升PLC编程技能的人群。 使用场景及目标:适用于需要优化PLC程序结构、提高程序可靠性和可维护性的场合。目标是帮助工程师掌握结构化编程和面向对象编程的技巧,从而写出更加高效、稳定的PLC程序。 其他说明:文章强调了在实际项目中灵活运用两种编程方法的重要性,并提醒读者注意实时性要求高的动作控制应采用结构化编程,而工艺逻辑和HMI交互则更适合面向对象编程。

    matlab与聚类分析

    matlab与聚类分析。根据我国历年职工人数(单位:万人),使用有序样品的fisher法聚类。

    卡尔曼滤波生成航迹测量程序

    卡尔曼滤波生成航迹测量程序

    基于格子玻尔兹曼方法(LBM)的多孔电极浸润特性研究及其Python实现

    内容概要:本文详细介绍了利用格子玻尔兹曼方法(LBM)对多孔电极浸润特性的模拟研究。首先阐述了LBM的基本原理,包括碰撞和迁移两个关键步骤,并提供了相应的Python伪代码。接着讨论了如何处理多孔介质中的固体边界,特别是通过随机算法生成孔隙结构以及结合CT扫描数据进行三维重构的方法。文中还探讨了表面张力、接触角等因素对浸润过程的影响,并给出了具体的数学表达式。此外,文章提到了并行计算的应用,如使用CUDA加速大规模网格计算,以提高模拟效率。最后,作者分享了一些实用技巧,如通过调整松弛时间和润湿性参数来优化模拟效果,并强调了LBM在处理复杂几何结构方面的优势。 适合人群:从事电池研发、材料科学领域的研究人员和技术人员,尤其是关注多孔电极浸润性和电解液扩散机制的人群。 使用场景及目标:适用于希望深入了解多孔电极内部流体动力学行为的研究者,旨在帮助他们更好地理解和预测电极材料的浸润特性,从而改进电池设计和性能。 其他说明:尽管LBM在处理多孔介质方面表现出色,但在某些极端条件下仍需引入额外的修正项。同时,参数的选择和边界条件的设定对最终结果有着重要影响,因此需要谨慎对待。

    基于FPGA和W5500的TCP网络通信:Zynq扩展口开发测试平台(使用Vivado 2019.2纯Verilog实现)

    内容概要:本文详细介绍了在Zynq扩展口上使用FPGA和W5500实现TCP网络通信的过程。作者通过一系列实验和技术手段,解决了多个实际问题,最终实现了稳定的数据传输。主要内容包括:硬件搭建(SPI接口配置)、数据回环处理、压力测试及优化、多路复用扩展以及上位机测试脚本的编写。文中提供了大量Verilog代码片段,展示了如何通过状态机控制SPI通信、优化数据缓存管理、处理中断等问题。 适合人群:对FPGA开发和网络通信感兴趣的工程师,尤其是有一定Verilog编程基础的研发人员。 使用场景及目标:适用于需要在嵌入式系统中实现高效、稳定的TCP通信的应用场景。目标是帮助读者掌握FPGA与W5500结合进行网络通信的具体实现方法和技术细节。 其他说明:文章不仅提供了详细的代码实现,还分享了许多实践经验,如硬件连接注意事项、信号完整性问题的解决方案等。此外,作者还提到了未来的工作方向,如UDP组播和QoS优先级控制的实现。

    python3.10以上 可安装pyside6(类似pyqt),具体安装操作步骤

    python3.10以上 可安装pyside6(类似pyqt),具体安装操作步骤

    基于FDTD仿真的可调谐石墨烯超材料吸收体设计与实现

    内容概要:本文详细介绍了利用有限差分时域法(FDTD)进行可调谐石墨烯超材料吸收体的设计与仿真。文中解释了石墨烯超材料的基本结构(三层“三明治”结构)、关键参数(如化学势、周期、厚度等)及其对吸收性能的影响。同时展示了如何通过调整石墨烯的化学势来实现吸收峰的位置和强度的变化,以及如何优化结构参数以获得最佳的吸收效果。此外,还提供了具体的代码示例,帮助读者理解和重现相关实验结果。 适合人群:从事纳米光子学、超材料研究的专业人士,尤其是对石墨烯基超材料感兴趣的科研工作者和技术开发者。 使用场景及目标:适用于希望深入了解石墨烯超材料的工作原理及其潜在应用场景的研究人员;旨在探索新型可调谐光学器件的设计思路和发展方向。 其他说明:文中不仅分享了理论知识,还包括了许多实践经验,如避免常见错误、提高仿真相关效率的小技巧等。对于想要将研究成果应用于实际产品的团队来说,这些细节非常有价值。

    随机生成2字到10字的中文词组

    随机生成2字,3字,4字,5字,6字,7字,8字,9字,10字的中文词组20个

    【汽车电子电气架构】智能座舱域控平台设计:基于双片龍鷹一号SoC芯片的高性能硬件架构与多模态交互系统构建

    内容概要:本文详细探讨了智能座舱域控设计的发展历程和技术趋势。首先介绍了智能座舱从被动式交互到主动式交互的技术演变,包括硬件和交互方式的进步。随后,文章重点讨论了智能座舱功能发展趋势,涵盖车载显示技术的多屏化、大屏化和高端化,以及SoC芯片的多核异构架构和算力融合,强调了其在智能座舱中的核心作用。此外,还阐述了电子电气架构从分布式向集成化的转型,分析了其面临的挑战和未来趋势。最后,基于当前智能座舱的发展需求,提出了一种基于双片龍鷹一号芯片的新域控平台设计方案,详细描述了其硬件设计实现方案,旨在提供高性能、高可靠性的智能座舱解决方案。 适合人群:汽车电子工程师、智能座舱研发人员及相关领域的技术人员。 使用场景及目标:①帮助读者理解智能座舱的技术发展历程及其未来发展方向;②为智能座舱域控平台的设计和开发提供参考和技术支持;③探讨电子电气架构的转型对汽车行业的影响及应对策略。 其他说明:文章结合实际案例和技术数据,深入浅出地解释了智能座舱的各项技术细节,不仅提供了理论指导,还具有较强的实践意义。通过对智能座舱域控平台的全面剖析,有助于推动智能座舱技术的创新发展,提升用户体验。

    多智能体协同编队控制:无人机编队背后的Python实现与关键技术解析

    内容概要:本文详细介绍了多智能体协同编队控制的技术原理及其应用实例。首先通过生动形象的例子解释了编队控制的核心概念,如一致性算法、虚拟结构法和Leader-Follower模式。接着深入探讨了如何用Python实现基础的一致性控制,以及如何通过调整参数(如Kp、Ka)来优化编队效果。文中还讨论了实际工程中常见的问题,如通信延迟、避障策略和动态拓扑变化,并给出了相应的解决方案。最后,强调了参数调试的重要性,并分享了一些实用技巧,如预测补偿、力场融合算法和分布式控制策略。 适合人群:对多智能体系统、无人机编队控制感兴趣的科研人员、工程师和技术爱好者。 使用场景及目标:适用于希望深入了解多智能体协同编队控制理论并能够将其应用于实际项目的研究人员和开发者。目标是帮助读者掌握编队控制的关键技术和实现方法,提高系统的稳定性和可靠性。 其他说明:文章不仅提供了详细的理论讲解,还附有具体的代码示例,便于读者理解和实践。同时,作者结合自身经验分享了许多宝贵的调试技巧和注意事项,有助于读者在实际应用中少走弯路。

    评估管线钢环焊缝质量及其对氢脆的敏感性.pptx

    评估管线钢环焊缝质量及其对氢脆的敏感性.pptx

    C盘清理bat脚本自动清理C盘垃圾文件

    C盘清理bat脚本自动清理C盘垃圾文件

    GBT21266-2007 辣椒及辣椒制品中辣椒素类物质测定及辣度表示方法

    GBT21266-2007 辣椒及辣椒制品中辣椒素类物质测定及辣度表示方法

    弹跳球 XNA 游戏项目 演示如何使用 C# 在 Visual Studio XNA 中构建类似 arkanoiddx-ball 的游戏

    弹跳球 XNA 游戏项目。演示如何使用 C# 在 Visual Studio XNA 中构建类似 arkanoiddx-ball 的游戏。

    【人形机器人领域】宇树科技人形机器人:技术实力、市场炒作与应用前景分析

    内容概要:文章全面解析了宇树科技人形机器人的发展现状、技术实力、市场炒作现象及其应用前景和面临的挑战。宇树科技成立于2016年,凭借春晚舞台的惊艳亮相和社交媒体的热议迅速走红,其人形机器人具备先进的运动控制算法、传感器技术和仿生结构设计。然而,市场炒作现象如高价租赁、二手市场炒作和虚假宣传等影响了市场秩序。尽管存在炒作,人形机器人在工业、服务和家庭领域仍具广阔前景,但也面临技术升级、成本控制、安全性和政策监管等挑战。 适合人群:对机器人技术、人工智能以及科技发展趋势感兴趣的读者,包括科技爱好者、投资者和相关行业的从业者。 使用场景及目标:①帮助读者了解宇树科技人形机器人的技术特点和发展历程;②揭示市场炒作现象及其影响;③探讨人形机器人的应用前景和面临的挑战。 其他说明:文章强调了宇树科技人形机器人在技术上的突破和市场上的表现,同时也提醒读者关注市场炒作现象带来的风险,呼吁各方共同努力推动人形机器人产业健康发展。

    msvcp140.dll

    msvcp140.dll丢失怎样修复

Global site tag (gtag.js) - Google Analytics