elasticsearch-Hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。
依赖版本信息:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>
Strom的extlib目录下jar包
- import java.util.Map;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- public class HandleBolt extends BaseRichBolt {
- private static final long serialVersionUID = 1L;
- private OutputCollector collector = null;
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- String name = "NA";
- if (input.contains("name")) {
- name = input.getStringByField("name");
- }
- String phone = "NA";
- if (input.contains("phone")) {
- phone = input.getStringByField("phone");
- }
- String rcall = "NA";
- if (input.contains("rcall")) {
- rcall = input.getStringByField("rcall");
- rcall = null == rcall || "null".equals(rcall) ? "NA" : rcall;
- }
- String address = "NA";
- if (input.contains("address")) {
- address = input.getStringByField("address");
- address = null == address || "null".equals(address) ? "NA" : address;
- }
- String email = "NA";
- if (input.contains("email")) {
- email = input.getStringByField("email");
- email = null == email || "null".equals(email) ? "NA" : email;
- }
- String idCard = "NA";
- if (input.contains("idCard")) {
- idCard = input.getStringByField("idCard");
- idCard = null == idCard || "null".equals(idCard) ? "NA" : idCard;
- }
- this.collector.emit(new Values(name, phone, rcall, address, email, idCard));
- this.collector.ack(input);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("name", "phone", "rcal", "address", "email", "idCard"));
- }
- }
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.hdfs.bolt.HdfsBolt;
- import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
- import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
- import org.apache.storm.hdfs.bolt.format.FileNameFormat;
- import org.apache.storm.hdfs.bolt.format.RecordFormat;
- import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
- import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
- import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
- import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
- import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
- import org.apache.storm.starter.bolt.PrinterBolt;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.utils.Utils;
- public class ES2StormTopology {
- private static final String TOPOLOGY_NAME = "es-storm-topology";
- public static void main(String[] args) {
- if (args.length != 1) {
- System.exit(0);
- }
- boolean isCluster = Boolean.parseBoolean(args[0]);
- TopologyBuilder builder = new TopologyBuilder();
- String target = "operator/telecom";
- String query = "?q=*";
- Map<Object, Object> configuration = new HashMap<Object, Object>();
- configuration.put("es.nodes", "192.168.10.20:9200");
- configuration.put("es.read.field.include", "name,phone,rcall,email,idCard,zipCode,address");
- configuration.put("es.storm.spout.fields", "name,phone,rcall,email,idCard,zipCode,address");
- builder.setSpout("es-storm-spout", new ESSpout(target, query, configuration), 1);
- builder.setBolt("storm-print-bolt", new PrinterBolt()).shuffleGrouping("es-storm-spout");
- builder.setBolt("storm-handle-bolt", new HandleBolt()).shuffleGrouping("es-storm-spout");
- RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(":");
- SyncPolicy syncPolicy = new CountSyncPolicy(10);
- FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
- FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/")
- .withPrefix("es_").withExtension(".log");
- HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://centos.host1:9000")
- .withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)
- .withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);
- builder.setBolt("storm-hdfs-bolt", hdfsBolt).globalGrouping("storm-handle-bolt");
- Config config = new Config();
- config.setDebug(true);
- if (isCluster) {
- try {
- config.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(
- TOPOLOGY_NAME, config, builder.createTopology());
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology(TOPOLOGY_NAME);
- cluster.shutdown();
- }
- }
- }
注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。
$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.ES2StormTopology false
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.starter.bolt.PrinterBolt;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.utils.Utils;
- import org.platform.storm.elasticsearch.bolt.ESBolt;
- import org.platform.storm.elasticsearch.spout.ESSpout;
- public class Storm2ESTopology {
- private static final String TOPOLOGY_NAME = "storm-es-topology";
- public static void main(String[] args) {
- if (args.length != 1) {
- System.exit(0);
- }
- boolean isCluster = Boolean.parseBoolean(args[0]);
- TopologyBuilder builder = new TopologyBuilder();
- String target = "operator/telecom";
- String query = "?q=*";
- Map<Object, Object> spoutConf = new HashMap<Object, Object>();
- spoutConf.put("es.nodes", "192.168.10.20:9200");
- spoutConf.put("es.read.field.include", "name,phone,rcall,email,idCard,zipCode,address");
- spoutConf.put("es.storm.spout.fields", "name,phone,rcall,email,idCard,zipCode,address");
- builder.setSpout("es-storm-spout", new ESSpout(target, query, spoutConf), 1);
- builder.setBolt("storm-print-bolt", new PrinterBolt()).shuffleGrouping("es-storm-spout");
- Map<Object, Object> boltConf = new HashMap<Object, Object>();
- boltConf.put("es.nodes", "192.168.10.20:9200");
- boltConf.put("es.index.auto.create", "true");
- boltConf.put("es.ser.writer.bytes.class", "org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter");
- //boltConf.put("es.input.json", "true");
- builder.setBolt("storm-es-bolt", new ESBolt("data/telecom", boltConf))
- .globalGrouping("es-storm-spout");
- Config config = new Config();
- config.setDebug(true);
- if (isCluster) {
- try {
- config.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(
- TOPOLOGY_NAME, config, builder.createTopology());
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology(TOPOLOGY_NAME);
- cluster.shutdown();
- }
- }
- }
注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。
$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.Storm2ESTopology false
文献出自:http://blog.csdn.net/fighting_one_piece/article/details/52228641
相关推荐
- **Hive和Pig支持**:对于使用Hive和Pig进行大数据处理的用户,Elasticsearch-Hadoop提供了适配器,使得这两个工具能与Elasticsearch进行交互。 在"elasticsearch-hadoop-8.8.0"这个版本中,可能包含以下改进和新...
elasticsearch-hadoop-hive-2.3.4.jar包下载
9. **RESTful接口**:除了传统的MapReduce和Spark支持,Elasticsearch-Hadoop还支持通过Hadoop的InputFormat和OutputFormat使用Hadoop的通用工具(如Hive和Pig)来与Elasticsearch交互,这些工具可以直接利用Elastic...
Elasticsearch-Hadoop是Elasticsearch与Apache Hadoop之间的桥梁,允许用户在Hadoop生态系统内无缝集成和处理Elasticsearch的数据。此版本"elasticsearch-hadoop-2.4.0.zip"是专为Hadoop 2.4.0版本设计的,确保了...
包含 elasticsearch-hadoop-6.6.1.jar elasticsearch-hadoop-hive-6.6.1.jar elasticsearch-spark-20_2.11-6.6.1.jar elasticsearch-storm-6.6.1.jar 等
jar包,官方版本,自测可用
elasticsearch-hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理...
jar包,官方版本,自测可用
jar包,官方版本,自测可用
2. **Spark集成**:对于Apache Spark,Elasticsearch-Hadoop提供了一个RDD(弹性分布式数据集)和DataFrame API,使得Spark应用可以直接与Elasticsearch进行交互,进行实时的数据查询和分析。 3. **Hive和Pig支持**...
赠送jar包:parquet-hadoop-1.8.2.jar; 赠送原API文档:parquet-hadoop-1.8.2-javadoc.jar; 赠送源代码:parquet-hadoop-1.8.2-sources.jar; 赠送Maven依赖信息文件:parquet-hadoop-1.8.2.pom; 包含翻译后的API...
总之,Elasticsearch-Hadoop 8.5.3是大数据生态系统中不可或缺的一部分,它为Hadoop与Elasticsearch之间的数据交互提供了桥梁,帮助用户更好地管理和分析海量数据,提升大数据分析的效率和价值。
将这个Uber jar放置在Flink的lib目录下,意味着Flink将在运行时使用这个特殊的jar包来与Hadoop 3.x进行交互,从而解决了兼容性问题。 在Hadoop 3.x中,引入了一些重要的改进和优化,如YARN的升级、HDFS的增强以及新...
flink-shaded-hadoop-3下载
jar包,官方版本,自测可用
《Flink Shaded Hadoop 2 Uber Jar:深入解析与应用》 Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这...
jar包,官方版本,自测可用
本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载
Spark的核心特性包括支持批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)以及机器学习(通过MLlib)和图计算(通过GraphX)。它采用了弹性分布式数据集(Resilient Distributed Datasets, ...
# 解压命令 tar -zxvf flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar.tar.gz # 介绍 用于CDH部署 Flink所依赖的jar包