`
weitao1026
  • 浏览: 1054073 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

es通过spark实现join查询

 
阅读更多

elastic search是一个分布式的搜索引擎,支持对数亿的数据进行秒级的查询操作。其底层使用了lucene来进行存储,对lucene进行了分布式的封装,同时在数据进入时进行了translog以实现fail over。

 

在将elastic search时当做数据库使用时,必然会遇到join操作。

 

这里提供spark sql来实现join的一种思路。

 

spark是一个通用的分布式处理框架,包括但不限于数据的读、写、流式计算等操作。使用spark,可以将自己的业务逻辑,由spark框架分布在多台机器上来执行。

 

elastic search在2.1+的版本上,对spark提供了支持。

 

下面提供一些读写的例子,绝大部分内容取自于参考资料中的官方文档。

 

需要注意的是,spark是使用scala语言来开发的,而elasticsearch是使用java语言来开发的。本文中的例子是使用java语言来调用spark的库,官方有更简捷的使用spark语言的例子。

 

1. 使用spark来读写elasticsearch

Java代码  收藏代码
  1. import java.util.Map;  
  2. import java.util.Map.Entry;  
  3.   
  4. import org.apache.spark.SparkConf;  
  5. import org.apache.spark.api.java.JavaPairRDD;  
  6. import org.apache.spark.api.java.JavaRDD;  
  7. import org.apache.spark.api.java.JavaSparkContext;  
  8. import org.apache.spark.api.java.function.Function;  
  9. import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;  
  10.   
  11. import com.google.common.collect.ImmutableList;  
  12. import com.google.common.collect.ImmutableMap;  
  13.   
  14. public class WriteToEs {  
  15.   
  16.     public static void main(String[] args) {  
  17.         SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");  
  18.         conf.set("es.index.auto.create""true");  
  19.   
  20.         JavaSparkContext jsc = new JavaSparkContext(conf);  
  21.   
  22.         // 写入es  
  23.         {  
  24.             Map<String, ?> numbers = ImmutableMap.of("one"1"two"2);  
  25.             Map<String, ?> airports = ImmutableMap.of("OTP""Otopeni""SFO",  
  26.                     "San Fran");  
  27.   
  28.             JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(  
  29.                     numbers, airports));  
  30.             JavaEsSpark.saveToEs(javaRDD, "spark/docs");  
  31.         }  
  32.   
  33.         // 从es中读取  
  34.         {  
  35.             JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(  
  36.                     jsc, "spark/docs");  
  37.   
  38.             for (Entry<String, Map<String, Object>> entry : esRDD  
  39.                     .collectAsMap().entrySet()) {  
  40.                 System.out.println(entry.getKey());  
  41.                 System.out.println(entry.getValue());  
  42.             }  
  43.         }  
  44.   
  45.         // 从es中读取  
  46.         {  
  47.             JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,  
  48.                     "spark/docs""?q=one:1").values();  
  49.             // JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,  
  50.             // "spark/docs").values();  
  51.   
  52.             Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() {  
  53.                 public Boolean call(Map<String, Object> map) throws Exception {  
  54.                     return map.containsKey("one");  
  55.                 }  
  56.             };  
  57.             JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter);  
  58.             for (Map<String, Object> map : filtered.collect()) {  
  59.                 System.out.println(map);  
  60.             }  
  61.         }  
  62.   
  63.         jsc.close();  
  64.     }  
  65. }  
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class WriteToEs {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
		conf.set("es.index.auto.create", "true");

		JavaSparkContext jsc = new JavaSparkContext(conf);

		// 写入es
		{
			Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
			Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO",
					"San Fran");

			JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(
					numbers, airports));
			JavaEsSpark.saveToEs(javaRDD, "spark/docs");
		}

		// 从es中读取
		{
			JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(
					jsc, "spark/docs");

			for (Entry<String, Map<String, Object>> entry : esRDD
					.collectAsMap().entrySet()) {
				System.out.println(entry.getKey());
				System.out.println(entry.getValue());
			}
		}

		// 从es中读取
		{
			JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,
					"spark/docs", "?q=one:1").values();
			// JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,
			// "spark/docs").values();

			Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() {
				public Boolean call(Map<String, Object> map) throws Exception {
					return map.containsKey("one");
				}
			};
			JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter);
			for (Map<String, Object> map : filtered.collect()) {
				System.out.println(map);
			}
		}

		jsc.close();
	}
}

 

2. 使用spark sql来读写elasticsearch

 

此例子为了演示join而写,没有实际的意义。

 

Java代码  收藏代码
  1. import java.io.Serializable;  
  2. import java.util.ArrayList;  
  3. import java.util.List;  
  4.   
  5. import org.apache.spark.SparkConf;  
  6. import org.apache.spark.api.java.JavaSparkContext;  
  7. import org.apache.spark.sql.DataFrame;  
  8. import org.apache.spark.sql.Row;  
  9. import org.apache.spark.sql.SQLContext;  
  10. import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
  11.   
  12. public class SparkSql implements Serializable {  
  13.   
  14.     private static final long serialVersionUID = -8843264837345502547L;  
  15.   
  16.     public static class People {  
  17.         private int id;  
  18.         private String name;  
  19.         private String surname;  
  20.         private int age;  
  21.   
  22.         public People(int id, String name, String surname, int age) {  
  23.             this.id = id;  
  24.             this.name = name;  
  25.             this.surname = surname;  
  26.             this.age = age;  
  27.         }  
  28.   
  29.         public People() {  
  30.         }  
  31.   
  32.         public int getId() {  
  33.             return id;  
  34.         }  
  35.   
  36.         public void setId(int id) {  
  37.             this.id = id;  
  38.         }  
  39.   
  40.         public String getName() {  
  41.             return name;  
  42.         }  
  43.   
  44.         public void setName(String name) {  
  45.             this.name = name;  
  46.         }  
  47.   
  48.         public String getSurname() {  
  49.             return surname;  
  50.         }  
  51.   
  52.         public void setSurname(String surname) {  
  53.             this.surname = surname;  
  54.         }  
  55.   
  56.         public int getAge() {  
  57.             return age;  
  58.         }  
  59.   
  60.         public void setAge(int age) {  
  61.             this.age = age;  
  62.         }  
  63.   
  64.     }  
  65.   
  66.     public static void main(String[] args) {  
  67.         SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");  
  68.         conf.set("es.index.auto.create""true");  
  69.   
  70.         JavaSparkContext jsc = new JavaSparkContext(conf);  
  71.   
  72.         // 写入  
  73.         {  
  74.             SQLContext sc = new SQLContext(jsc);  
  75.   
  76.             List<People> data = new ArrayList<People>();  
  77.             data.add(new People(1"Micheal""Mike"18));  
  78.             data.add(new People(2"Flowaters""fw"18));  
  79.   
  80.             DataFrame people = sc.createDataFrame(data, People.class);  
  81.   
  82.             JavaEsSparkSQL.saveToEs(people, "spark/person");  
  83.         }  
  84.   
  85.         // 读取  
  86.         {  
  87.             SQLContext sql = new SQLContext(jsc);  
  88.   
  89.             // 注册表people  
  90.             DataFrame people = sql.read().format("es").load("spark/people");  
  91.             people.registerTempTable("people");  
  92.   
  93.             // 注册表person  
  94.             DataFrame person = sql.read().format("es").load("spark/person");  
  95.             person.registerTempTable("person");  
  96.   
  97.             // 查看表的schema  
  98.             // df.printSchema();  
  99.   
  100.             // 执行sql  
  101.             DataFrame df = sql  
  102.                     .sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name");  
  103.   
  104.             // 输出执行的结果  
  105.             for (Row row : df.collectAsList()) {  
  106.                 System.out.println(row.toString());  
  107.             }  
  108.         }  
  109.   
  110.         jsc.close();  
  111.     }  
  112. }  
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

public class SparkSql implements Serializable {

	private static final long serialVersionUID = -8843264837345502547L;

	public static class People {
		private int id;
		private String name;
		private String surname;
		private int age;

		public People(int id, String name, String surname, int age) {
			this.id = id;
			this.name = name;
			this.surname = surname;
			this.age = age;
		}

		public People() {
		}

		public int getId() {
			return id;
		}

		public void setId(int id) {
			this.id = id;
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public String getSurname() {
			return surname;
		}

		public void setSurname(String surname) {
			this.surname = surname;
		}

		public int getAge() {
			return age;
		}

		public void setAge(int age) {
			this.age = age;
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
		conf.set("es.index.auto.create", "true");

		JavaSparkContext jsc = new JavaSparkContext(conf);

		// 写入
		{
			SQLContext sc = new SQLContext(jsc);

			List<People> data = new ArrayList<People>();
			data.add(new People(1, "Micheal", "Mike", 18));
			data.add(new People(2, "Flowaters", "fw", 18));

			DataFrame people = sc.createDataFrame(data, People.class);

			JavaEsSparkSQL.saveToEs(people, "spark/person");
		}

		// 读取
		{
			SQLContext sql = new SQLContext(jsc);

			// 注册表people
			DataFrame people = sql.read().format("es").load("spark/people");
			people.registerTempTable("people");

			// 注册表person
			DataFrame person = sql.read().format("es").load("spark/person");
			person.registerTempTable("person");

			// 查看表的schema
			// df.printSchema();

			// 执行sql
			DataFrame df = sql
					.sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name");

			// 输出执行的结果
			for (Row row : df.collectAsList()) {
				System.out.println(row.toString());
			}
		}

		jsc.close();
	}
}

 

 

参考资料:

1. elasticsearch官方网站:https://www.elastic.co/

2. elasticsearch官方对spark的支持:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

分享到:
评论

相关推荐

    阿里云Elasticsearch7.x jar包.zip

    - `elasticsearch-rest-client-7.4.0.jar`:Elasticsearch的REST客户端库,允许通过HTTP/RESTful API与Elasticsearch集群进行交互。 - `lucene-suggest-8.2.0.jar`:Lucene的建议组件,用于实现自动补全和拼写检查...

    elasticsearch与hadoop比较

    在数据处理能力方面,Elasticsearch的聚合统计和全文搜索功能虽然强大,但其并不支持SQL中的join或子查询等复杂数据处理操作。Elasticsearch也不支持中间数据输出或数据集转换,因此在处理需要复杂计算逻辑的任务时...

    大数据时代下统计分析的新利器——SparkR

    甚至有第三方扩展支持Avro、CSV、ElasticSearch和Cassandra等格式。 - **高扩展性**:所有针对SparkR DataFrame的操作都会自动分布到整个Spark集群中进行处理。这得益于Spark强大的分布式内存计算框架,使得SparkR...

    最全的Spark基础知识解答.pdf

    Elasticsearch的基础代码示例通常涉及索引创建、文档插入、查询和聚合等功能。 Spark作为计算框架的优势在于其快速、统一的API、高可扩展性和对迭代计算的良好支持。它是AMPlab在UC Berkeley开源的项目,不仅继承了...

    实现大数据即席查询秒级响应.pdf

    搜索引擎如Elasticsearch能够实现快速搜索和简单的聚合操作,但主要针对搜索而非在线分析处理(OLAP),处理复杂计算(如TopN、Join、多级聚合)时可能会遇到挑战,而且缺乏SQL支持。而SQL on Hadoop方案(如Impala...

    万亿数据库核心存储引擎的技术实现.pdf

    在全文检索方面,文档提到了使用Elasticsearch(ES)实现秒级响应。ES是一种分布式、实时的搜索和分析引擎,特别适合处理大量文本数据的检索。通过列簇和异构存储,可以实现数据的冷热分离,提高查询速度。 对于...

    3-9+impala打造交互查询系统.pdf

    此外,ElasticSearch、Solr、ClickHouse和Druid则更多地被用于全文搜索和实时分析。 PART 02:Impala与竞品比较 1. Hadoop/Hive:Hadoop是分布式存储和计算的基础框架,Hive则基于Hadoop提供数据仓库服务,但其...

    druid的使用

    在与其他技术的竞品分析中,Druid与ES(ElasticSearch)、KVStore(如Hbase、Cassandra、OpenTSDB)以及SQL on Hadoop(如Impala/Drill/Spark/Presto)等数据处理技术相比,有其独特的优缺点。例如,与ES相比,Druid...

    apache doris 从部署到应用全解

    Doris 支持与其他数据源集成,如 Hive、Iceberg、Hudi、Elasticsearch,以及 JDBC 和 ODBC,便于构建多源数据目录。此外,Doris 还提供了一系列的工具和插件,如 Spark 和 Flink 的连接器,DataX doriswriter,以及...

    1、Druid(Imply-3.0.4)介绍及部署(centos6.10)、验证

    1. 与Elasticsearch相比,Druid更专注于OLAP,对导入和聚合进行了优化,但不支持全文检索。 2. 相较于Key/Value Stores(如HBase、Cassandra、OpenTSDB),Druid的列式存储和索引提供了更快的扫描速度。 3. 相比...

    cours base de données avancée

    3. **SQL查询语言**: 掌握高级SQL语法,包括子查询、联接(JOIN)、分组(GROUP BY)、聚合函数(SUM, AVG, COUNT, MAX, MIN)以及窗口函数的使用。 4. **索引与查询优化**: 学习如何创建和管理索引,理解其对查询...

    开源项目-pilosa-pilosa.zip

    10. **与其他系统集成**:Pilosa 可以作为其他系统的数据层,如Hadoop、Spark或Elasticsearch的补充,提高数据分析的效率。 总结来说,Pilosa 是一个强大而灵活的开源工具,适用于需要快速查询和分析海量数据的场景...

    MaxCompute对开源系统的支持与融合.pdf

    MaxCompute2.0的开源计算引擎支持包括但不限于Presto、Kylin、Drill、Spark、ElasticSearch和Flink等。这些开源计算引擎与MaxCompute2.0保持了自研优势的同时,还能够拥抱开源生态,实现数据存储、资源调度和安全...

    Meetup-Druid和Kylin

    - 使用Kafka作为消息中间件,结合Elasticsearch和Spark Streaming进行监控数据的收集、处理和展示。 - 实现了基于业务线粒度的资源隔离,确保不同业务间的稳定性和工作负载差异。 #### Kylin在美团点评的实践 美团...

    Clickhouse在众安的应用实践——百亿保险数据实时分析探索.pdf

    在实际应用中,ClickHouse解决了原始系统使用Elasticsearch保存用户标签数据时存在的查询效率低下和数据更新延迟的问题。通过构建标签平台,结合ClickHouse,众安保险将历史保单数据、用户数据和用户行为数据进行...

    8-6+Doris数智一体解决方案.pdf

    例如,Doris可以与Spark进行联邦数据分析,同时支持业界最快的SQL on Elasticsearch,使得用户可以在不离开Doris环境的情况下对ES数据进行高效查询,进一步提升了数据分析的效率和深度。 【应用实践Practice】: ...

    wh swifts使用手册_201903131

    Swifts 目前支持通过 JDBC 与 MySQL、Oracle、ElasticSearch、MongoDB、Cassandra、Hbase、SQL Server 等多种数据库进行交互。 6. LOOKUP 操作:Swifts 提供了类似 SQL 的 JOIN 语法,支持单字段和多字段 JOIN。 - ...

    4、乐信(25问)1

    6. **监控与日志**:Prometheus、Grafana用于系统性能监控,ELK Stack(Elasticsearch、Logstash、Kibana)处理日志分析。 7. **安全性**:HTTPS、WAF(Web Application Firewall)、CSRF防护等确保网络安全。 ...

    wh swifts使用手册_201903191

    4. Lookup 操作:Swifts 支持从多种数据库(如 MySQL、Oracle、ElasticSearch 等)进行数据查找(lookup)操作。这类似于 SQL 的 join,但 Swifts 针对不支持 SQL 的数据库如 HBase 和 Redis 提供了类似的语法支持。...

Global site tag (gtag.js) - Google Analytics