`
qindongliang1922
  • 浏览: 2172229 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117108
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125448
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59555
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71034
社区版块
存档分类
最新评论

ElasticSearch并发操作之乐观锁的使用

    博客分类:
  • ELK
阅读更多
上篇介绍了关于ES嵌套索引的增删改,本篇就接着上篇主题继续深入聊一下,上篇的添加和更新操作,其实是不安全的,所有的数据库db系统都会存在并发问题像关系型数据库MySQL,Oracle,SQL Server默认采用的是悲观锁。

在ElasticSearch中采用的乐观锁,下面先熟悉下什么是乐观锁和悲观锁:


悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。



乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。



两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。




从上面的介绍中,我们不难发现es为什么要采用乐观锁,因为es大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低es的吞吐,当然并发问题是真实存在,下面给大家分享实际工作中遇到的并发问题。

最好的方式是在设计上就排除并发问题,比如我们的一个项目消费kafka,经过计算后的数据存入es中,如果不设计打入kafka时的策略,就可能会遇到并发插入和更新问题,sparkstreaming集成kafka时,kafka的有多少个分区,就需要给spark设置相应数目的Executors进程,比如10个kafka分区,现在有10个sparkstreaming进程在处理数据,同一个usser用户同一时刻的数据,如果被分发带不同的机器上计算完更新到es,那么就会遇到并发问题。比如是对一个数累加操作,原始是100,A进程和B进程同时读到这条数据做更新,A进程加10,B进程加20,正确的结果应该是130,但是由于并发更新,可能会导致A进程的累加操作丢失,最终的结果是120,或者B进程的累加操作丢失,那么最终的结果就是110,不管怎么更新在不考虑锁的情况下,都会导致数据有问题。那么如果我能将同一个用户的数据发送到kafka里面同一个分区内,那么就容易了,如果都在同一个分区内,一个分区内的数据处理是串行的这样就能避免并发问题。


当然如果不能避免,我们就需要通过es的乐观锁类解决并发问题。下面来看下在es中如何使用乐观锁处理并发问题,首先看下并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入es,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。


如何实现?

在插入时,使用es提供的create(true)方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。

scala代码如下:
def  insert(active:String,indexTime:Long,indexTemplate:String):Unit={
        val json = JSON.toJSON(active).toString
        val irb=client.prepareIndex(indexTemplate, activeIndexName,pid);
        irb.setCreate(true)//保证同一时刻只会有一个数据插入成功
        try{
          irb.setSource(json).execute().actionGet()
        }catch {
          case e: DocumentAlreadyExistsException=>{
            
            Thread.sleep(fail_sleep*1000)////插入失败时,等待1秒后重试
            insert(active,indexTime,indexTemplate);//重新插入
          }
          case e:Exception=>{
            log.error(插入失败,异常:"+ ExceptionUtils.getStackTrace(e))
          }
        }
	}



上面说的是插入时的并发问题解决策略,接着我们看下更新时候遇到并发问题如何处理,主要有2种思路:

(1)如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要
我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。

核心代码如下:
-       val sb_json = new StringBuffer("ctx._source.ct +=  inc");
        val params: java.util.HashMap[String, Int] = new java.util.HashMap[String, Int]
        params.put("inc", active.getCt)

        val script = new Script(sb_json.toString(), ScriptService.ScriptType.INLINE, "groovy", params)
        val up=client.prepareUpdate(indexTemplate,activeIndexName,pid)
        up.setRefresh(true)
        up.setScript(script)
        up.setRetryOnConflict(retryConunt)//设置重试次数
        up.get()  


(2)此外,我们还可以通过es内部维护的version字段来自定义实现灵活控制的乐观锁。

我们知道当我们第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:

{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

内部维护的version可以在更新和删除的api时使用


下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:

比如

PUT /website/blog/2?version=5&version_type=external
{
  "title": "My first external blog entry",
  "text":  "Starting to get the hang of this..."
}

结果:
{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 5,
  "created":  true
}


现在我们指定version=10去更新后,返回的新响应如下:
PUT /website/blog/2?version=10&version_type=external
{
  "title": "My first external blog entry",
  "text":  "This is a piece of cake..."
}
//===================

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 10,
  "created":  false
}

如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号

利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新



总结:

本篇主要了介绍了es里面乐观锁的使用,如果仅仅增量的累加或者累减操作,不关注顺序,关注最终结果,我们可以使用es服务端保证冲突重试就行,这样非常方便的就解决了并发冲突问题,如果关注增量顺序,比如索引和更新操作默认采用的最后的数据覆盖以前的数据,如果冲突了我们可以使用version字段来处理冲突问题,此外version可以使用es内部维护的version值,也可以使用我们外部应用传过来的值,并指定version去使用乐观锁进行更新。




有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。





1
0
分享到:
评论

相关推荐

    Elasticsearch 并发修改乐观锁

    **标题:“Elasticsearch 并发修改乐观锁”** 在分布式搜索引擎Elasticsearch中,数据的并发修改是一个关键问题,而乐观锁就是一种用于解决这种问题的机制。乐观锁假设在大多数情况下,不会发生数据冲突,因此在...

    ElasticSearch基于Mysql热更新IK词典项目

    3. 更新策略:设计合理的更新策略,如采用乐观锁控制并发更新,确保数据一致性。 ### 集成与配置 要将MySQL热更新功能集成到Elasticsearch和IK分词器中,我们需要进行以下步骤: 1. 安装IK分词器:下载并安装`...

    17.分布式文档系统_动手实战演练Elasticsearch基于external version进行乐观锁并发控制

    这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。 (2)区别 语法区别: es内部version: ?version=1 external version:?v

    elasticsearch与kibana环境安装与es的基本操作.docx

    2. **乐观锁**:在提交操作时检查是否发生冲突,仅在未发生冲突时执行更新。 Elasticsearch内部维护了一个 `_version` 字段来实现内部版本控制,每次文档更新,该版本号自增。同时,可以通过 `version_type=...

    kivihub#kivihub-blog#3.ES的乐观锁1

    2)指定版本号更新数据,第一次更新成功3)指定相同版本号,进行第二次更新失败参考ES 7.13——乐观锁并发控制Elasticsearch系列---并发控制及乐

    es快速入门笔记,自我整理,共75节

    * 电商网站商品管理:使用Elasticsearch实现商品的CRUD(Create、Read、Update、Delete)操作。 * 多种搜索方式:Query String Search、Query DSL、Phrase Search、Highlight Search等。 五、Elasticsearch基础...

    16.分布式文档系统_动手实战演练Elasticsearch基于_version进行乐观锁并发控制

    1、先构造一条数据出来 PUT /test_index/test_type/7 { test_field:test test } -------------------------------结果------------------------------- { _index: test_index, _type: test_type, ...

    03-Elasticsearch大宽表应用案例实践探索 杭州 1.6 2024

    在IT领域,尤其是在大数据分析和实时检索的场景中,Elasticsearch(ES)因其高效、灵活的特性,已经成为一种广泛使用的工具。本文将深入探讨在处理大宽表应用中的实践案例,以及面临的挑战和解决方案。 首先,我们...

    ES学习笔记.docx

    * 版本控制问题:在并发情况下,Elasticsearch 可能会存在版本控制问题,需要使用乐观锁机制来解决这个问题。 其他概念 * 分词:Elasticsearch 中的分词是指将文本拆分成多个词语,用于提高搜索效率。 * 命中次数...

    ES-核心知识篇(上半季)课件资料-2.rar

    1、基于Elasticsearch最新版本,5.2版本,进行课程的讲解,让大家学好技术后,绝对不会落伍。而市面上的书籍和视频,使用的Elasticsearch版本都非常陈旧...4、包含很多独家的核心知识点和技术,比如乐观锁并发控制,写

    高并发秒杀系统.zip

    对于秒杀场景,可以考虑使用乐观锁或悲观锁来控制并发下的数据一致性。 6. **预生成令牌**:为了避免请求洪峰直接冲击系统,可以在秒杀开始前预先生成一定数量的令牌,用户需要先获取令牌才能参与秒杀。这样可以...

    SSM实战项目——Java高并发秒杀API.zip

    - **数据库优化**:使用乐观锁或悲观锁防止并发问题,如商品数量减一操作。 5. **分布式锁**:在秒杀过程中,为了防止超卖,可能需要使用分布式锁来同步多线程间的操作,例如使用Redis实现的分布式锁。 6. **...

    基于SpringBoot高并发商城秒杀系统项目.zip

    使用乐观锁或悲观锁来控制并发下的数据更新,避免死锁。 5. **缓存策略**:引入缓存(如Redis)可以提高读取速度,减轻数据库压力。秒杀商品信息、用户信息等常用数据可预加载到缓存中,但要注意缓存雪崩和缓存穿透...

    SpringBoot实现Java高并发秒杀系统.zip

    10. **监控与日志**:集成Prometheus和Grafana进行性能监控,使用ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便于及时发现并解决问题。 11. **负载均衡**:使用Nginx或Spring Cloud Gateway...

    一个整合SSM框架的高并发和商品秒杀项目,学习目前较流行的Java框架组合实现高并发秒杀API.zip

    可以使用乐观锁或悲观锁机制,例如在MySQL中使用`for update`或在数据表中添加版本号字段来实现。 2. **缓存策略**:利用Redis等内存数据库进行热点商品的缓存,减少对数据库的直接访问,提高响应速度。 3. **队列...

    SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip

    7. **监控与日志**:使用如Prometheus和Grafana等工具进行性能监控,以及ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便及时发现和解决问题。 综上所述,构建SpringBoot+Zookeeper+Dubbo的...

    基于 springboot 学习整合自己所学到的知识,Mybatis、Mybatis-plus、ES、Redis、M.zip

    Mybatis-Plus是Mybatis的扩展插件,它在Mybatis的基础上做了增强,提供了更丰富的 CRUD 操作,如:分页查询、主键自增、乐观锁等。同时,它还简化了开发流程,降低了学习成本,提升了开发效率。 4. **Elastic...

Global site tag (gtag.js) - Google Analytics