`

ES-Hadoop学习笔记-Storm交互

 
阅读更多

elasticsearch-Hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。

依赖版本信息:

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>


Strom的extlib目录下jar包


 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.Map;  
  2.   
  3. import org.apache.storm.task.OutputCollector;  
  4. import org.apache.storm.task.TopologyContext;  
  5. import org.apache.storm.topology.OutputFieldsDeclarer;  
  6. import org.apache.storm.topology.base.BaseRichBolt;  
  7. import org.apache.storm.tuple.Fields;  
  8. import org.apache.storm.tuple.Tuple;  
  9. import org.apache.storm.tuple.Values;  
  10.   
  11. public class HandleBolt extends BaseRichBolt {  
  12.   
  13.     private static final long serialVersionUID = 1L;  
  14.   
  15.     private OutputCollector collector = null;  
  16.       
  17.     @SuppressWarnings("rawtypes")  
  18.     @Override  
  19.     public void prepare(Map stormConf, TopologyContext context,  
  20.             OutputCollector collector) {  
  21.         this.collector = collector;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void execute(Tuple input) {  
  26.         String name = "NA";  
  27.         if (input.contains("name")) {  
  28.             name = input.getStringByField("name");  
  29.         }  
  30.         String phone = "NA";  
  31.         if (input.contains("phone")) {  
  32.             phone = input.getStringByField("phone");  
  33.         }  
  34.         String rcall = "NA";  
  35.         if (input.contains("rcall")) {  
  36.             rcall = input.getStringByField("rcall");  
  37.             rcall = null == rcall || "null".equals(rcall) ? "NA" : rcall;  
  38.         }  
  39.         String address = "NA";  
  40.         if (input.contains("address")) {  
  41.             address = input.getStringByField("address");  
  42.             address = null == address || "null".equals(address) ? "NA" : address;  
  43.         }  
  44.         String email = "NA";  
  45.         if (input.contains("email")) {  
  46.             email = input.getStringByField("email");  
  47.             email = null == email || "null".equals(email) ? "NA" : email;  
  48.         }  
  49.         String idCard = "NA";  
  50.         if (input.contains("idCard")) {  
  51.             idCard = input.getStringByField("idCard");  
  52.             idCard = null == idCard || "null".equals(idCard) ? "NA" : idCard;  
  53.         }  
  54.         this.collector.emit(new Values(name, phone, rcall, address, email, idCard));  
  55.         this.collector.ack(input);  
  56.     }  
  57.   
  58.     @Override  
  59.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  60.         declarer.declare(new Fields("name""phone""rcal""address""email""idCard"));  
  61.     }  
  62.   
  63. }  

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.hdfs.bolt.HdfsBolt;  
  8. import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;  
  9. import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;  
  10. import org.apache.storm.hdfs.bolt.format.FileNameFormat;  
  11. import org.apache.storm.hdfs.bolt.format.RecordFormat;  
  12. import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;  
  13. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;  
  14. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;  
  15. import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;  
  16. import org.apache.storm.hdfs.bolt.sync.SyncPolicy;  
  17. import org.apache.storm.starter.bolt.PrinterBolt;  
  18. import org.apache.storm.topology.TopologyBuilder;  
  19. import org.apache.storm.utils.Utils;  
  20.   
  21. public class ES2StormTopology {  
  22.   
  23.     private static final String TOPOLOGY_NAME = "es-storm-topology";  
  24.       
  25.     public static void main(String[] args) {  
  26.         if (args.length != 1) {  
  27.             System.exit(0);  
  28.         }  
  29.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  30.           
  31.         TopologyBuilder builder = new TopologyBuilder();  
  32.         String target = "operator/telecom";  
  33.         String query = "?q=*";  
  34.         Map<Object, Object> configuration = new HashMap<Object, Object>();  
  35.         configuration.put("es.nodes""192.168.10.20:9200");  
  36.         configuration.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  37.         configuration.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  38.         builder.setSpout("es-storm-spout"new ESSpout(target, query, configuration), 1);  
  39.           
  40.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  41.           
  42.         builder.setBolt("storm-handle-bolt"new HandleBolt()).shuffleGrouping("es-storm-spout");  
  43.           
  44.         RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(":");  
  45.         SyncPolicy syncPolicy = new CountSyncPolicy(10);  
  46.         FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);  
  47.         FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/")  
  48.                 .withPrefix("es_").withExtension(".log");  
  49.         HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://centos.host1:9000")  
  50.                 .withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)  
  51.                 .withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);  
  52.         builder.setBolt("storm-hdfs-bolt", hdfsBolt).globalGrouping("storm-handle-bolt");  
  53.           
  54.         Config config = new Config();  
  55.         config.setDebug(true);  
  56.         if (isCluster) {  
  57.             try {  
  58.                 config.setNumWorkers(3);  
  59.                 StormSubmitter.submitTopologyWithProgressBar(  
  60.                         TOPOLOGY_NAME, config, builder.createTopology());  
  61.             } catch (Exception e) {  
  62.                 e.printStackTrace();  
  63.             }  
  64.         } else {  
  65.             LocalCluster cluster = new LocalCluster();  
  66.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  67.             Utils.sleep(100000);  
  68.             cluster.killTopology(TOPOLOGY_NAME);  
  69.             cluster.shutdown();  
  70.         }  
  71.           
  72.     }  
  73.       
  74. }  

 

注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.ES2StormTopology false

 

 

 

[java] view plain copy
 
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.starter.bolt.PrinterBolt;  
  8. import org.apache.storm.topology.TopologyBuilder;  
  9. import org.apache.storm.utils.Utils;  
  10. import org.platform.storm.elasticsearch.bolt.ESBolt;  
  11. import org.platform.storm.elasticsearch.spout.ESSpout;  
  12.   
  13. public class Storm2ESTopology {  
  14.   
  15.     private static final String TOPOLOGY_NAME = "storm-es-topology";  
  16.       
  17.     public static void main(String[] args) {  
  18.         if (args.length != 1) {  
  19.             System.exit(0);  
  20.         }  
  21.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  22.           
  23.         TopologyBuilder builder = new TopologyBuilder();  
  24.           
  25.         String target = "operator/telecom";  
  26.         String query = "?q=*";  
  27.         Map<Object, Object> spoutConf = new HashMap<Object, Object>();  
  28.         spoutConf.put("es.nodes""192.168.10.20:9200");  
  29.         spoutConf.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  30.         spoutConf.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  31.         builder.setSpout("es-storm-spout"new ESSpout(target, query, spoutConf), 1);  
  32.           
  33.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  34.           
  35.         Map<Object, Object> boltConf = new HashMap<Object, Object>();  
  36.         boltConf.put("es.nodes""192.168.10.20:9200");  
  37.         boltConf.put("es.index.auto.create""true");  
  38.         boltConf.put("es.ser.writer.bytes.class""org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter");  
  39.         //boltConf.put("es.input.json", "true");  
  40.         builder.setBolt("storm-es-bolt"new ESBolt("data/telecom", boltConf))  
  41.             .globalGrouping("es-storm-spout");  
  42.           
  43.         Config config = new Config();  
  44.         config.setDebug(true);  
  45.         if (isCluster) {  
  46.             try {  
  47.                 config.setNumWorkers(3);  
  48.                 StormSubmitter.submitTopologyWithProgressBar(  
  49.                         TOPOLOGY_NAME, config, builder.createTopology());  
  50.             } catch (Exception e) {  
  51.                 e.printStackTrace();  
  52.             }  
  53.         } else {  
  54.             LocalCluster cluster = new LocalCluster();  
  55.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  56.             Utils.sleep(100000);  
  57.             cluster.killTopology(TOPOLOGY_NAME);  
  58.             cluster.shutdown();  
  59.         }  
  60.           
  61.     }  
  62.       
  63. }  

 

 

注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.Storm2ESTopology false

 

文献出自:http://blog.csdn.net/fighting_one_piece/article/details/52228641

分享到:
评论

相关推荐

    大数据服务框架学习笔记.zip

    "大数据服务框架学习笔记.zip"很可能是包含一系列关于大数据服务框架的学习资料,如Hadoop、Spark、Flink等主流框架的讲解。下面,我们将深入探讨这些关键组件和框架。 首先,Hadoop是大数据处理的基础框架,由...

    大数据图标大全.docx

    12. **Logstash**: 数据收集、处理和转发工具,通常与Elasticsearch和Kibana一起构建ELK日志分析栈。 13. **Kibana**: 数据可视化工具,用于探索和展示Elasticsearch中的数据。 14. **Ranger**: 集中式安全管理...

    大数据、数据分析领域工具笔记

    除了上述工具,大数据生态系统还包括Elasticsearch(搜索和分析引擎)、Kafka(流处理平台)、Hue(Hadoop界面工具)等,它们共同构成了大数据处理的完整链条。 总结,这份笔记内容广泛,不仅介绍了大数据存储和...

    win7修复本地系统工具

    win7修复本地系统工具

    《自动化专业英语》04-Automatic-Detection-Block(自动检测模块).ppt

    《自动化专业英语》04-Automatic-Detection-Block(自动检测模块).ppt

    《计算机专业英语》chapter12-Intelligent-Transportation.ppt

    《计算机专业英语》chapter12-Intelligent-Transportation.ppt

    西门子S7-1200博图平台下3轴伺服螺丝机程序解析与应用

    内容概要:本文详细介绍了基于西门子S7-1200博图平台的3轴伺服螺丝机程序。该程序使用SCL语言编写,结合KTP700组态和TIA V14及以上版本,实现了对X、Y、Z三个轴的精密控制。文章首先概述了程序的整体架构,强调了其在自动化控制领域的高参考价值。接着深入探讨了关键代码片段,如轴初始化、运动控制以及主程序的设计思路。此外,还展示了如何通过KTP700组态实现人机交互,并分享了一些实用的操作技巧和技术细节,如状态机设计、HMI交互、异常处理等。 适用人群:从事自动化控制系统开发的技术人员,尤其是对西门子PLC编程感兴趣的工程师。 使用场景及目标:适用于希望深入了解西门子S7-1200博图平台及其SCL语言编程特点的学习者;旨在帮助读者掌握3轴伺服系统的具体实现方法,提高实际项目中的编程能力。 其他说明:文中提供的代码示例和设计理念不仅有助于理解和学习,还能直接应用于类似的实际工程项目中。

    MATLAB仿真:非线性滤波器在水下长基线定位(LBL)系统的应用与比较

    内容概要:本文详细探讨了五种非线性滤波器(卡尔曼滤波(KF)、扩展卡尔曼滤波(EKF)、无迹卡尔曼滤波(UKF)、粒子滤波(PF)和变维卡尔曼滤波(VDKF))在水下长基线定位(LBL)系统中的应用。通过对每种滤波器的具体实现进行MATLAB代码展示,分析了它们在不同条件下的优缺点。例如,KF适用于线性系统但在非线性环境中失效;EKF通过雅可比矩阵线性化处理非线性问题,但在剧烈机动时表现不佳;UKF利用sigma点处理非线性,精度较高但计算量大;PF采用蒙特卡罗方法,鲁棒性强但计算耗时;VDKF能够动态调整状态维度,适合信标数量变化的场景。 适合人群:从事水下机器人(AUV)导航研究的技术人员、研究生以及对非线性滤波感兴趣的科研工作者。 使用场景及目标:①理解各种非线性滤波器的工作原理及其在水下定位中的具体应用;②评估不同滤波器在特定条件下的性能,以便为实际项目选择合适的滤波器;③掌握MATLAB实现非线性滤波器的方法和技术。 其他说明:文中提供了详细的MATLAB代码片段,帮助读者更好地理解和实现这些滤波器。此外,还讨论了数值稳定性问题和一些实用技巧,如Cholesky分解失败的处理方法。

    VMware-workstation-full-14.1.3-9474260

    VMware-workstation-full-14.1.3-9474260

    DeepSeek系列-提示词工程和落地场景.pdf

    DeepSeek系列-提示词工程和落地场景.pdf

    javaSE阶段面试题

    javaSE阶段面试题

    《综合布线施工技术》第5章-综合布线工程测试.ppt

    《综合布线施工技术》第5章-综合布线工程测试.ppt

    安川机器人NX100使用说明书.pdf

    安川机器人NX100使用说明书.pdf

    S7-1200 PLC改造M7120平面磨床电气控制系统:IO分配、梯形图设计及组态画面实现

    内容概要:本文详细介绍了将M7120型平面磨床的传统继电器控制系统升级为基于西门子S7-1200 PLC的自动化控制系统的过程。主要内容涵盖IO分配、梯形图设计和组态画面实现。通过合理的IO分配,确保了系统的可靠性和可维护性;梯形图设计实现了主控制逻辑、砂轮升降控制和报警逻辑等功能;组态画面则提供了友好的人机交互界面,便于操作和监控。此次改造显著提高了设备的自动化水平、运行效率和可靠性,降低了维护成本。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是熟悉PLC编程和控制系统设计的专业人士。 使用场景及目标:适用于需要进行老旧设备升级改造的企业,旨在提高生产设备的自动化水平和可靠性,降低故障率和维护成本。具体应用场景包括但不限于金属加工行业中的平面磨床等设备的控制系统改造。 其他说明:文中还分享了一些实际调试中的经验和技巧,如急停逻辑的设计、信号抖动的处理方法等,有助于读者在类似项目中借鉴和应用。

    chromedriver-linux64-136.0.7103.48.zip

    chromedriver-linux64-136.0.7103.48.zip

    IMG_20250421_180507.jpg

    IMG_20250421_180507.jpg

    《网络营销策划实务》项目一-网络营销策划认知.ppt

    《网络营销策划实务》项目一-网络营销策划认知.ppt

    Lianantech_Security-Vulnerabil_1744433229.zip

    Lianantech_Security-Vulnerabil_1744433229

    MybatisCodeHelperNew2019.1-2023.1-3.4.1.zip

    MybatisCodeHelperNew2019.1-2023.1-3.4.1

    《Approaching(Almost)any machine learning problem》中文版第13章(最后一章)

    【深度学习部署】基于Docker的BERT模型训练与API服务部署:实现代码复用与模型共享

Global site tag (gtag.js) - Google Analytics