上篇介绍了关于ES嵌套索引的增删改,本篇就接着上篇主题继续深入聊一下,上篇的添加和更新操作,其实是不安全的,所有的数据库db系统都会存在并发问题像关系型数据库MySQL,Oracle,SQL Server默认采用的是悲观锁。
在ElasticSearch中采用的乐观锁,下面先熟悉下什么是乐观锁和悲观锁:
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。
两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。
从上面的介绍中,我们不难发现es为什么要采用乐观锁,因为es大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低es的吞吐,当然并发问题是真实存在,下面给大家分享实际工作中遇到的并发问题。
最好的方式是在设计上就排除并发问题,比如我们的一个项目消费kafka,经过计算后的数据存入es中,如果不设计打入kafka时的策略,就可能会遇到并发插入和更新问题,sparkstreaming集成kafka时,kafka的有多少个分区,就需要给Spark设置相应数目的Executors进程,比如10个kafka分区,现在有10个sparkstreaming进程在处理数据,同一个usser用户同一时刻的数据,如果被分发带不同的机器上计算完更新到es,那么就会遇到并发问题。比如是对一个数累加操作,原始是100,A进程和B进程同时读到这条数据做更新,A进程加10,B进程加20,正确的结果应该是130,但是由于并发更新,可能会导致A进程的累加操作丢失,最终的结果是120,或者B进程的累加操作丢失,那么最终的结果就是110,不管怎么更新在不考虑锁的情况下,都会导致数据有问题。那么如果我能将同一个用户的数据发送到kafka里面同一个分区内,那么就容易了,如果都在同一个分区内,一个分区内的数据处理是串行的这样就能避免并发问题。
当然如果不能避免,我们就需要通过es的乐观锁类解决并发问题。下面来看下在es中如何使用乐观锁处理并发问题,首先看下并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入es,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。
如何实现?
在插入时,使用es提供的create(true)方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。
Scala代码如下:
上面说的是插入时的并发问题解决策略,接着我们看下更新时候遇到并发问题如何处理,主要有2种思路:
(1)如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要 我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。
核心代码如下:
(2)此外,我们还可以通过es内部维护的version字段来自定义实现灵活控制的乐观锁。
我们知道当我们第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:
内部维护的version可以在更新和删除的api时使用
下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:
比如
结果:
现在我们指定version=10去更新后,返回的新响应如下:
如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号
利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新
总结:
本篇主要了介绍了es里面乐观锁的使用,如果仅仅增量的累加或者累减操作,不关注顺序,关注最终结果,我们可以使用es服务端保证冲突重试就行,这样非常方便的就解决了并发冲突问题,如果关注增量顺序,比如索引和更新操作默认采用的最后的数据覆盖以前的数据,如果冲突了我们可以使用version字段来处理冲突问题,此外version可以使用es内部维护的version值,也可以使用我们外部应用传过来的值,并指定version去使用乐观锁进行更新。
http://mp.weixin.qq.com/s/yapfvRIIlXvHsjOEdpCy1g
相关推荐
**标题:“Elasticsearch 并发修改乐观锁”** 在分布式搜索引擎Elasticsearch中,数据的并发修改是一个关键问题,而乐观锁就是一种用于解决这种问题的机制。乐观锁假设在大多数情况下,不会发生数据冲突,因此在...
3. 更新策略:设计合理的更新策略,如采用乐观锁控制并发更新,确保数据一致性。 ### 集成与配置 要将MySQL热更新功能集成到Elasticsearch和IK分词器中,我们需要进行以下步骤: 1. 安装IK分词器:下载并安装`...
这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。 (2)区别 语法区别: es内部version: ?version=1 external version:?v
2. **乐观锁**:在提交操作时检查是否发生冲突,仅在未发生冲突时执行更新。 Elasticsearch内部维护了一个 `_version` 字段来实现内部版本控制,每次文档更新,该版本号自增。同时,可以通过 `version_type=...
2)指定版本号更新数据,第一次更新成功3)指定相同版本号,进行第二次更新失败参考ES 7.13——乐观锁并发控制Elasticsearch系列---并发控制及乐
* 电商网站商品管理:使用Elasticsearch实现商品的CRUD(Create、Read、Update、Delete)操作。 * 多种搜索方式:Query String Search、Query DSL、Phrase Search、Highlight Search等。 五、Elasticsearch基础...
1、先构造一条数据出来 PUT /test_index/test_type/7 { test_field:test test } -------------------------------结果------------------------------- { _index: test_index, _type: test_type, ...
在IT领域,尤其是在大数据分析和实时检索的场景中,Elasticsearch(ES)因其高效、灵活的特性,已经成为一种广泛使用的工具。本文将深入探讨在处理大宽表应用中的实践案例,以及面临的挑战和解决方案。 首先,我们...
* 版本控制问题:在并发情况下,Elasticsearch 可能会存在版本控制问题,需要使用乐观锁机制来解决这个问题。 其他概念 * 分词:Elasticsearch 中的分词是指将文本拆分成多个词语,用于提高搜索效率。 * 命中次数...
1、基于Elasticsearch最新版本,5.2版本,进行课程的讲解,让大家学好技术后,绝对不会落伍。而市面上的书籍和视频,使用的Elasticsearch版本都非常陈旧...4、包含很多独家的核心知识点和技术,比如乐观锁并发控制,写
对于秒杀场景,可以考虑使用乐观锁或悲观锁来控制并发下的数据一致性。 6. **预生成令牌**:为了避免请求洪峰直接冲击系统,可以在秒杀开始前预先生成一定数量的令牌,用户需要先获取令牌才能参与秒杀。这样可以...
- **数据库优化**:使用乐观锁或悲观锁防止并发问题,如商品数量减一操作。 5. **分布式锁**:在秒杀过程中,为了防止超卖,可能需要使用分布式锁来同步多线程间的操作,例如使用Redis实现的分布式锁。 6. **...
使用乐观锁或悲观锁来控制并发下的数据更新,避免死锁。 5. **缓存策略**:引入缓存(如Redis)可以提高读取速度,减轻数据库压力。秒杀商品信息、用户信息等常用数据可预加载到缓存中,但要注意缓存雪崩和缓存穿透...
10. **监控与日志**:集成Prometheus和Grafana进行性能监控,使用ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便于及时发现并解决问题。 11. **负载均衡**:使用Nginx或Spring Cloud Gateway...
可以使用乐观锁或悲观锁机制,例如在MySQL中使用`for update`或在数据表中添加版本号字段来实现。 2. **缓存策略**:利用Redis等内存数据库进行热点商品的缓存,减少对数据库的直接访问,提高响应速度。 3. **队列...
7. **监控与日志**:使用如Prometheus和Grafana等工具进行性能监控,以及ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便及时发现和解决问题。 综上所述,构建SpringBoot+Zookeeper+Dubbo的...
Mybatis-Plus是Mybatis的扩展插件,它在Mybatis的基础上做了增强,提供了更丰富的 CRUD 操作,如:分页查询、主键自增、乐观锁等。同时,它还简化了开发流程,降低了学习成本,提升了开发效率。 4. **Elastic...