`
qindongliang1922
  • 浏览: 2193064 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117789
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126214
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60161
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71505
社区版块
存档分类
最新评论

elasticsearch里面bulk的用法

    博客分类:
  • ELK
阅读更多

上篇文章介绍了在es里面批量读取数据的方法mget,本篇我们来看下关于批量写入的方法bulk。




bulk api可以在单个请求中一次执行多个索引或者删除操作,使用这种方式可以极大的提升索引性能。

bulk的语法格式是:
````
action and meta_data \n
optional source \n

action and meta_data \n
optional source \n

action and meta_data \n
optional source \n
````



从上面能够看到,两行数据构成了一次操作,第一行是操作类型可以index,create,update,或者delete,第二行就是我们的可选的数据体,使用这种方式批量插入的时候,我们需要设置的它的Content-Type为application/json。


针对不同的操作类型,第二行里面的可选的数据体是不一样的,如下:

````
(1)index 和 create  第二行是source数据体
(2)delete 没有第二行
(3)update 第二行可以是partial doc,upsert或者是script
````



我们可以将我们的操作直接写入到一个文本文件中,然后使用curl命令把它发送到服务端:

一个requests文件内容如下:
````
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
````


发送命令如下:
````
curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
````
响应结果如下:
````
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
````


注意由于我们每行必须有一个换行符,所以json格式只能在一行里面而不能使用格式化后的内容,下面看一个正确的post bulk的请求数据体:

````
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
````


bulk请求的返回操作的结果也是批量的,每一个action都会有具体的应答体,来告诉你当前action是成功执行还是失败 :
````
{
   "took": 30,
   "errors": false,
   "items": [
      {
         "index": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 0,
            "_primary_term": 1
         }
      },
      {
         "delete": {
            "_index": "test",
            "_type": "_doc",
            "_id": "2",
            "_version": 1,
            "result": "not_found",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 404,
            "_seq_no" : 1,
            "_primary_term" : 2
         }
      },
      {
         "create": {
            "_index": "test",
            "_type": "_doc",
            "_id": "3",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 2,
            "_primary_term" : 3
         }
      },
      {
         "update": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 2,
            "result": "updated",
            "_shards": {
                "total": 2,
                "successful": 1,
                "failed": 0
            },
            "status": 200,
            "_seq_no" : 3,
            "_primary_term" : 4
         }
      }
   ]
}
````


bulk请求的路径有三种和前面的mget的请求类似:

````
(1) /_bulk  

(2)/{index}/_bulk

(3)/{index}/{type}/_bulk

````


上面的三种格式,如果提供了index和type那么在数据体里面的action就可以不提供,同理提供了index但没有type,那么就需要在数据体里面自己添加type。


此外,还有几个参数可以用来控制一些操作:

(1)数据体里面可以使用_version字段

(2)数据体里面可以使用_routing字段

(3)可以设置wait_for_active_shards参数,数据拷贝到多个shard之后才进行bulk操作

(4)refresh控制多久间隔多搜索可见



最后重点介绍下update操作,update操作在前面的文章也介绍过,es里面提供了多种更新数据的方法如:

````
(1)doc
(2)upsert
(3)doc_as_upsert
(4)script
(5)params ,lang ,source
````


在bulk里面的使用update方法和java api里面类似,前面的文章也介绍过详细的使用,现在我们看下在bulk的使用方式:

````
POST _bulk
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "_doc", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "_doc", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}
````
其实就是非格式化的内容,放在一行然后提交就行了,不同之处在于前面的文章介绍的是单次请求,而使用bulk之后就可以一次请求批量发送多个操作了。



总结:

本篇文章介绍了在es里面bulk操作的用法,使用bulk操作我们可以批量的插入数据来提升写入性能,但针对不同的action的它的数据格式体是不一样的,这一点需要注意,同时在每行数据结束时必须加一个换行符,不然es是不能正确识别其格式的。


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
0
0
分享到:
评论

相关推荐

    springBoot整合kafka和elasticSearch,实现批量拉取日志以及批量更新到es里

    在`bulkSaveToEs`方法中,我们使用Elasticsearch的Bulk API进行批量保存,这能显著提高插入效率。`parseLog`方法根据实际日志格式解析并转换日志数据。 为了实现“百万日志秒处理”的目标,我们需要注意优化以下几...

    基于.netcore搜索封装ElasticSearch.zip

    在.NET Core开发中,Elasticsearch已经成为一种广泛使用的全文搜索引擎,它提供了高度可扩展性和实时分析功能。这个名为"基于.netcore搜索封装ElasticSearch.zip"的压缩包,显然包含了一个针对.NET Core平台的...

    Elasticsearch BulkProcessor 的具体实现.pdf

    通过使用 BulkProcessor,可以显著提高 Elasticsearch 在处理大批量数据时的性能。它不仅减少了网络通信的开销,还允许开发者自定义批量操作的策略和回调监听器,从而更好地控制数据的写入过程。对于需要频繁进行...

    es-bulk-update-builder:Elasticsearch的批量更新请求主体构建器

    es-bulk-update-builder Elasticsearch的批量更新请求主体构建器安装$ npm install --save es-bulk-update-builder用法用法基本上如下: const BulkUpdateBuilder = require ( 'es-bulk-update-builder' ) ;...

    elasticsearch-7.4.0-win64.rar

    - **服务管理**:可以使用`elasticsearch-service.bat`脚本安装为系统服务,方便管理和启动。 4. **RESTful API**: - Elasticsearch通过HTTP协议和JSON格式进行通信,提供了丰富的CRUD操作接口。 - 使用`GET`、...

    Springboot集成elasticsearch

    在应用启动时,我们可以使用`@PostConstruct`注解的方法来初始化数据或者同步数据库中的数据到Elasticsearch: ```java @Component public class ElasticsearchInitializer { @Autowired private UserRepository...

    ElasticSearch集群部署方式

    本文档将详细介绍ElasticSearch集群部署的方法及其配置,并解释如何针对不同的操作系统进行配置,以及如何修改社会公共信息系统中的配置文件来适配ElasticSearch集群。 #### 一、ElasticSearch集群部署 ##### 1.1 ...

    Elasticsearch 34道面试题和答案.docx

    Elasticsearch 面试题和答案 本文档总结了 34 道 Elasticsearch 面试题和答案,涵盖了 Elasticsearch 的基础概念、索引架构、集群管理、性能优化、倒排索引、master 选举等多个方面。 一、Elasticsearch 基础...

    elasticsearch-2016-8最新可用java代码

    本资料“elasticsearch-2016-8最新可用java代码”是针对2016年8月版本的Elasticsearch,提供了使用Java API进行操作的示例代码,对于理解如何在Java应用中集成和操作Elasticsearch具有重要意义。 1. **Elastic...

    springboot-elasticsearch-master.rar

    本项目"springboot-elasticsearch-master"就是将这两者完美融合,提供了基于SpringBoot的Elasticsearch API实现,旨在简化ES的使用,让开发者能够更便捷地进行数据查询、新增、删除以及批量操作。 首先,我们需要...

    elasticsearch.jar

    **Elasticsearch.jar详解** Elasticsearch是一款强大的...了解其工作原理和使用方法,可以帮助我们构建高效、稳定的搜索和分析平台。随着版本的升级,Elasticsearch持续改进,为开发者提供了更强大、更易用的工具。

    elasticsearch1.0.zip

    Elasticsearch提供了`bulk()` API来进行批量插入和删除。批量操作可以显著提高效率,减少网络通信次数。"批量删除"和"批量导入"的实现可能涉及到创建一个包含多个操作的BulkRequest,然后调用`execute()`方法来执行...

    使用Hbase协作器(Coprocessor)同步数据到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)

    3. **连接Elasticsearch**:在Coprocessor中,使用Elasticsearch的Java API建立与ES集群的连接,创建索引和映射,然后将HBase中的数据写入相应的Elasticsearch索引。 4. **处理数据**:根据业务需求,可能需要对...

    Elasticsearch官方示例:accounts.json

    这通常通过Elasticsearch的Bulk API实现,或者使用工具如`elasticsearch-py`(Python客户端)或`curl`命令行工具。 一旦数据被索引,我们就可以执行各种搜索和分析操作。例如,你可以使用`GET`请求对特定账户进行...

    elasticsearch-demo

    通过运行这个`elasticsearch-demo`项目,学习者可以全面理解Elasticsearch的使用方法,包括安装、配置、索引管理、查询语句编写、性能优化等方面的知识。这不仅有助于理论学习,还能提供实践经验,为实际工作中的...

    最新版linux elasticsearch-8.1.3-linux-x86_64.tar.gz

    Elasticsearch是一个开源的全文搜索引擎,它以其高效、可...总之,Elasticsearch是一个强大的搜索和分析引擎,通过理解其核心概念、配置方法、API使用以及分布式特性,可以充分利用其功能来构建高性能的搜索解决方案。

    ElasticSearch代码实例C#

    使用`Elasticsearch.Net`提供的`ClusterStats`, `NodesStats`, ` IndicesStats`等方法可以实时监控集群状态、节点性能以及索引统计信息。 8. **错误处理** 在进行ES操作时,需注意异常处理,如网络错误、无效的...

    Integration betweena Elasticsearch and Spark

    例如,要“提高Elasticsearch的批量性能”,可以通过优化ES的配置,如调整bulk threads数量,或是优化es-hadoop的配置,例如调整byteSize/batchSize参数。文档还提到了启用自动ID生成的必要性,并指出了2.1.0版本中...

    elasticsearch-tools:用于执行诸如批量导入导出和导出导入映射之类的工作的一系列Elasticsearch命令行工具

    安装npm install -g elasticsearch-tools 安装后,您将可以使用以下命令行工具:出口 输入用法:es-export-bulk选项es-export-bulk --helpUsage: es-export-bulk [options] Options: -h, --help output usage ...

Global site tag (gtag.js) - Google Analytics