`

数据库数据导入Elasticsearch案例分享

阅读更多
The best elasticsearch highlevel java rest api-----bboss

基于bboss持久层和bboss elasticsearch客户端实现数据库数据导入es案例分享(支持各种数据库和各种es版本)

通过bboss,可以非常方便地将数据库表数据导入到es中:

  • 支持逐条数据导入
  • 批量数据导入
  • 批量数据多线程并行导入
  • 定时增量(串行/并行)数据导入
  • 下面详细介绍本案例。


1.案例对应的源码
批量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/TestDB2ESImport.java

定时增量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java

2.在工程中导入jdbc es maven坐标
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
<version>5.2.5</version>
</dependency>

本文从mysql数据库表td_cms_document导入数据到es中,除了导入上述maven坐标,还需要额外导入mysql驱动坐标:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>

3.配置es地址
新建application.properties文件,内容为:

elasticsearch.rest.hostNames=10.21.20.168:9200
## 集群地址用逗号分隔
#elasticsearch.rest.hostNames=10.180.211.27:9200,10.180.211.28:9200,10.180.211.29:9200
4.编写简单的导入代码
同步批量导入
	public void testSimpleImportBuilder(){
		ImportBuilder importBuilder = ImportBuilder.newInstance();
		try {
			//清除测试表数据
			ElasticSearchHelper.getRestClientUtil().dropIndice("dbclobdemo");
		}
		catch (Exception e){

		}
		//数据源相关配置,可选项,可以在外部启动数据源
		importBuilder.setDbName("test")
				.setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包
				.setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效
				.setDbUser("root")
				.setDbPassword("123456")
				.setValidateSQL("select 1")
				.setUsePool(false);//是否使用连接池


		//指定导入数据的sql语句,必填项,可以设置自己的提取逻辑
		importBuilder.setSql("select * from td_cms_document");
		/**
		 * es相关配置
		 */
		importBuilder
				.setIndex("dbclobdemo") //必填项
				.setIndexType("dbclobdemo") //必填项
				.setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
				.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
				.setBatchSize(10000)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
				.setJdbcFetchSize(20000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效


		/**
		 * 执行数据库表数据导入es操作
		 */
		DataStream dataStream = importBuilder.builder();
		dataStream.db2es();
	}
可以直接运行上述代码,查看数据导入效果。

异步批量导入

	public void testSimpleLogImportBuilderFromExternalDBConfig(){
		ImportBuilder importBuilder = ImportBuilder.newInstance();
		try {
			//清除测试表
			ElasticSearchHelper.getRestClientUtil().dropIndice("dbdemo");
		}
		catch (Exception e){

		}
        //数据源相关配置,可选项,可以在外部启动数据源
		importBuilder.setDbName("test")
				.setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包
				.setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true")
				.setDbUser("root")
				.setDbPassword("123456")
				.setValidateSQL("select 1")
				.setUsePool(false);//是否使用连接池


		//指定导入数据的sql语句,必填项,可以设置自己的提取逻辑
		importBuilder.setSql("select * from td_sm_log");
		/**
		 * es相关配置
		 */
		importBuilder
				.setIndex("dbdemo") //必填项
				.setIndexType("dbdemo") //必填项
				.setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
				.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
				.setBatchSize(5000)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
				.setJdbcFetchSize(10000);//设置数据库的查询fetchsize

		/**
		 * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
		 */
		importBuilder.setParallel(true);//设置为多线程并行批量导入
		importBuilder.setQueue(100);//设置批量导入线程池等待队列长度
		importBuilder.setThreadCount(200);//设置批量导入线程池工作线程数量
		importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 
		importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
		importBuilder.setRefreshOption("refresh"); // 为了实时验证数据导入的效果,强制刷新数据,生产环境请设置为null或者不指定
		
		/**
		 * 执行数据库表数据导入es操作
		 */
		DataStream dataStream = importBuilder.builder();
		dataStream.db2es();
		
		long count = ElasticSearchHelper.getRestClientUtil().countAll("dbdemo");
		System.out.println("数据导入完毕后索引表dbdemo中的文档数量:"+count);
	}

说明:从数据库检索数据放入批处理列表,到达batchsize就提交一次作业,最多threadcount个工作线程并行处理作业,如果线程都在忙,没有空闲的工作线程,那么作业就会放到队列里面排队,如果队列也满了,则会阻塞等待释放的队列位置,每等待100次打印一次等待次数的日志。

batchsize,queue,threadcount的配置要结合服务器的内存和cpu配置来设置,设置大了容易内存溢出,设置小了影响处理速度,所以要权衡考虑。


导入的时候需要观察服务端的write线程池的状态,如果出现reject任务的情况,就需要调优elasticsearch配置参数:

thread_pool.bulk.queue_size: 1000   es线程等待队列长度

thread_pool.bulk.size: 10   线程数量,与cpu的核数对应

一个有字段属性映射的稍微复杂案例实现
	public void testImportBuilder(){
		ImportBuilder importBuilder = ImportBuilder.newInstance();
		try {
			//清除测试表
			ElasticSearchHelper.getRestClientUtil().dropIndice("dbclobdemo");
		}
		catch (Exception e){

		}
		//数据源相关配置,可选项,可以在外部启动数据源
		importBuilder.setDbName("test")
				.setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包
				.setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true")
				.setDbUser("root")
				.setDbPassword("123456")
				.setValidateSQL("select 1")
				.setUsePool(false);//是否使用连接池


		//指定导入数据的sql语句,必填项,可以设置自己的提取逻辑
		importBuilder.setSql("select * from td_cms_document");
		/**
		 * es相关配置
		 */
		importBuilder
				.setIndex("dbclobdemo") //必填项
				.setIndexType("dbclobdemo") //必填项
				.setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");
				.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
				.setEsIdField("documentId")//可选项
				.setEsParentIdField(null) //可选项,如果不指定,es自动为文档产生id
				.setRoutingValue(null) //可选项		importBuilder.setRoutingField(null);
				.setEsDocAsUpsert(true)//可选项
				.setEsRetryOnConflict(3)//可选项
				.setEsReturnSource(false)//可选项
				.setEsVersionField(null)//可选项
				.setEsVersionType(null)//可选项
				.setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") //可选项,默认日期格式
				.setLocale("zh_CN")  //可选项,默认locale
				.setTimeZone("Etc/UTC")  //可选项,默认时区
				.setBatchSize(50)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
				.setJdbcFetchSize(10000);//设置数据库的查询fetchsize

		/**
		 * db-es mapping 表字段名称到es 文档字段的映射:比如document_id -> docId
		 * 可以配置mapping,也可以不配置,默认基于java 驼峰规则进行db field-es field的映射和转换
		 */
		importBuilder.addFieldMapping("document_id","docId")
					 .addFieldMapping("docwtime","docwTime")
					 .addIgnoreFieldMapping("channel_id");//添加忽略字段
		/**
		 * 为每条记录添加额外的字段和值
		 * 可以为基本数据类型,也可以是复杂的对象
		 */
		importBuilder.addFieldValue("testF1","f1value");
		importBuilder.addFieldValue("testInt",0);
		importBuilder.addFieldValue("testDate",new Date());
		importBuilder.addFieldValue("testFormateDate","yyyy-MM-dd HH",new Date());
		TestObject testObject = new TestObject();
		testObject.setId("testid");
		testObject.setName("jackson");
		importBuilder.addFieldValue("testObject",testObject);

        /**
		 * 重新设置es数据结构
		 */
		importBuilder.setDataRefactor(new DataRefactor() {
			public void refactor(Context context) throws Exception  {
				CustomObject customObject = new CustomObject();
				customObject.setAuthor((String)context.getValue("author"));
				customObject.setTitle((String)context.getValue("title"));
				customObject.setSubtitle((String)context.getValue("subtitle"));
				context.addFieldValue("docInfo",customObject);//如果还需要构建更多的内部对象,可以继续构建

				//上述三个属性已经放置到docInfo中,如果无需再放置到索引文档中,可以忽略掉这些属性
				context.addIgnoreFieldMapping("author");
				context.addIgnoreFieldMapping("title");
				context.addIgnoreFieldMapping("subtitle");
			}
		});

		/**
		 * 执行数据库表数据导入es操作
		 */
		DataStream dataStream = importBuilder.builder();
		dataStream.db2es();
	}

定时增量导入
源码文件 https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java
	public void testSimpleLogImportBuilderFromExternalDBConfig(){
		ImportBuilder importBuilder = ImportBuilder.newInstance();
		//增量定时任务不要删表,但是可以通过删表来做初始化操作
//		try {
//			//清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表
//			ElasticSearchHelper.getRestClientUtil().dropIndice("dbdemo");
//		}
//		catch (Exception e){
//
//		}

		//数据源相关配置,可选项,可以在外部启动数据源
		importBuilder.setDbName("test")
				.setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包
				.setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true")
				.setDbUser("root")
				.setDbPassword("123456")
				.setValidateSQL("select 1")
				.setUsePool(true);//是否使用连接池

		//指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,设置增量变量log_id
		importBuilder.setSql("select * from td_sm_log where log_id > #[log_id]");
		/**
		 * es相关配置
		 */
		importBuilder
				.setIndex("dbdemo") //必填项
				.setIndexType("dbdemo") //必填项
//				.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
				.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
				.setBatchSize(5000)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
				.setJdbcFetchSize(10000);//设置数据库的查询fetchsize
		importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
//					 .setScheduleDate(date) //指定任务开始执行时间:日期
				     .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行
					 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
//		importBuilder.setNumberLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
//		importBuilder.setNumberLastValueColumn("log_id");//手动指定日期增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
		importBuilder.setFromFirst(true);//任务重启时,重新开始采集数据,适合于每次全量导入数据的情况,如果是全量导入,可以先删除原来的索引数据
		importBuilder.setLastValueStorePath("testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
//		importBuilder.setLastValueStoreTableName("logs");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab
		importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
		// 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型

//		importBuilder.

		/**
		 * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
		 */
		importBuilder.setParallel(true);//设置为多线程并行批量导入
		importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
		importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
		importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行
		importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
		importBuilder.setEsIdField("log_id");//设置文档主键,不设置,则自动产生文档id
		importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO
//		importBuilder.setDiscardBulkResponse(true);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度
		/**
		 * 执行数据库表数据导入es操作
		 */
		DataStream dataStream = importBuilder.builder();
		dataStream.db2es();//执行导入操作

		System.out.println();

		
	}

5 开发交流
完整的demo工程

https://gitee.com/bbossgroups/eshelloword-booter

elasticsearch技术交流群:166471282

elasticsearch微信公众号:

bboss微信公众号:
1
0
分享到:
评论

相关推荐

    elasticsearch数据库下载、配置、使用案例

    ### Elasticsearch 数据库下载、配置及使用案例详解 #### 一、Elasticsearch 概述 Elasticsearch 是一款基于 Lucene 开发的分布式、RESTful 搜索和分析引擎,广泛应用于全文检索、日志和实时数据分析领域。它以其...

    数据接入ElasticSearch方式培训PPT

    数据接入ElasticSearch是现代大数据处理中的重要环节,它涉及到如何高效、稳定地将各种...通过学习,我们可以更好地理解和掌握如何将数据高效、准确地导入Elasticsearch,从而发挥其在大数据分析和检索中的强大能力。

    Elasticsearch 案例

    在这个案例中,我们将探讨如何利用Elasticsearch开发一个简易的搜索引擎,实现数据导入、建立索引以及按关键词进行搜索的功能。 首先,我们要了解Elasticsearch的基本概念。索引(Index)是Elasticsearch中的核心...

    Spring Boot整合ElasticSearch和Mysql 附案例源码.docx

    1. **将数据库数据导入ElasticSearch:** - **目标:** 将MySQL数据库中的数据同步到ElasticSearch中,以便利用ElasticSearch的快速检索能力。 - **技术要点:** - 数据抽取与转换:使用工具或自定义逻辑从MySQL...

    技术领域+数据抽取+应用工具ES.rar

    在20210803ES这个文件中,可能包含了关于如何使用Elasticsearch进行数据抽取的教程、配置示例、代码片段或者案例研究。可能的内容可能涵盖以下几个方面: 1. **安装与配置**:介绍如何在不同的操作系统上安装...

    Elasticsearch示例数据 logs.json shakespeare.json accounts.json

    在使用这些JSON文件导入Elasticsearch时,通常会使用`curl`命令或Elasticsearch的`bulk` API。首先,我们需要确保Elasticsearch服务已经启动,并配置了相应的索引模板。然后,将JSON文件内容发送到Elasticsearch的 `...

    Elasticsearch顶尖高手系列.zip

    7. **数据导入导出**:了解如何从各种数据源导入数据到Elasticsearch,以及如何将数据导出到其他系统。 8. **安全与监控**:学习如何配置Elasticsearch的安全功能,如SSL/TLS加密、用户权限管理,以及如何使用监控...

    elasticsearch

    JDBC River是Elasticsearch的一个插件,用于从关系型数据库(如MySQL)中导入数据到Elasticsearch。它通过定时执行SQL查询,将结果同步到ES索引中,实现数据的实时更新。在Linux环境下,可以通过运行`mysql-simple-...

    word读取内容并操作存储数据库

    在IT行业中,处理文档数据和数据库操作是一项常见的任务。在这个场景下,我们需要使用Node.js来读取Word文档的内容,并将其解析后存储到数据库中。以下是一系列相关知识点的详细解释: 1. **Node.js**: Node.js是一...

    elasticsearch 官网 account.json下载

    4. **JSON文档**:Elasticsearch以JSON文档作为数据格式,这与现代Web服务和NoSQL数据库的数据格式相吻合,方便数据的导入和处理。 5. **搜索功能**:ES提供全文检索、结构化查询、近实时分析等多种搜索方式,支持...

    elasticsearch学习笔记.rar

    在"elasticsearch学习笔记.pdf"中,你将详细学习这些概念,并通过实际案例来加深理解,包括如何安装配置ES,创建和管理索引,执行搜索,以及进行高级操作如数据导入导出、监控和报警等。这将是你掌握Elasticsearch...

    1.0 ElasticSearch6实战教程资料.zip

    Elasticsearch(ES)6 是一个流行的、高性能的全文搜索引擎,常用于大数据分析、日志聚合、实时搜索等场景。它基于 Lucene 库,提供了分布式、RESTful 风格的 API,使得数据存储和检索变得简单高效。本教程资料是...

    elasticsearch-中文开发指南

    - **Elasticsearch River JDBC:** 描述如何使用 JDBC 连接器将关系型数据库中的数据流式导入 Elasticsearch。 #### 结论 Elasticsearch 作为一款高性能的搜索与数据分析引擎,提供了丰富的功能和灵活的 API 接口,...

    flume-es5.X依赖.zip

    https://blog.csdn.net/qq_25067199/article/details/79672209),作者详细介绍了如何创建一个自定义的ElasticsearchSink,通过Flume将日志数据高效地导入Elasticsearch。 在这个实践中,作者首先创建了一个名为`...

    基于SpringBoot+ElasticSearch+vue.js开发的大数据营销系统.zip

    总结,这个基于SpringBoot+ElasticSearch+Vue.js的大数据营销系统,不仅展示了现代化Web应用的开发流程,还提供了实践案例,对于学习和理解这些技术有极大的帮助。无论是初学者还是经验丰富的开发者,都能从中...

    KingbaseES-用户帮助手册.zip

    3. **数据库管理**:这部分将详细介绍如何创建、删除和管理数据库,包括数据表的创建、修改和删除,以及数据导入导出的操作,让读者理解数据库的基本管理流程。 4. **SQL语言支持**:KingbaseES支持SQL标准,手册中...

    数据中台应用场景-07-基于ES企业搜索中台(30页 PPT).pptx

    它通过整合多种数据源,并借助先进的搜索引擎技术(如Elasticsearch),实现对这些数据的有效管理和快速检索,从而提高工作效率和数据利用价值。 - **DB/DW/DL(数据库/数据仓库/数据湖)**:分别用于存储结构化...

    安卓Android二次元社区论坛bbs绘画app可导入Studio毕业源码案例设计.zip

    这个压缩包文件“安卓Android二次元社区论坛bbs绘画app可导入Studio毕业源码案例设计.zip”是一个关于安卓应用开发的毕业设计项目,主要聚焦于创建一个二次元社区论坛和绘画应用程序。它包含了一些关键组成部分,如...

    ElasticSearch从入门到企业实战教程(附源码)

    Elasticsearch是一个分布式、开源的全文搜索引擎,设计用于云计算环境中,提供实时数据分析和搜索功能。在本教程中,我们将深入探讨Elasticsearch的核心概念、功能以及如何将其应用于实际的企业级项目,尤其是电商...

    ES和HADOOP使用问题和需求

    在实时数据分析领域,ELK(Elasticsearch、Logstash、Kibana)栈提供了从数据采集、处理到可视化的完整解决方案。 - **节点个数考量**:节点个数不是越多越好,应根据具体需求和数据特性来确定。对于数据量不大且...

Global site tag (gtag.js) - Google Analytics