- 浏览: 2184111 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
上篇介绍了关于ES嵌套索引的增删改,本篇就接着上篇主题继续深入聊一下,上篇的添加和更新操作,其实是不安全的,所有的数据库db系统都会存在并发问题像关系型数据库MySQL,Oracle,SQL Server默认采用的是悲观锁。
在ElasticSearch中采用的乐观锁,下面先熟悉下什么是乐观锁和悲观锁:
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。
两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。
从上面的介绍中,我们不难发现es为什么要采用乐观锁,因为es大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低es的吞吐,当然并发问题是真实存在,下面给大家分享实际工作中遇到的并发问题。
最好的方式是在设计上就排除并发问题,比如我们的一个项目消费kafka,经过计算后的数据存入es中,如果不设计打入kafka时的策略,就可能会遇到并发插入和更新问题,sparkstreaming集成kafka时,kafka的有多少个分区,就需要给spark设置相应数目的Executors进程,比如10个kafka分区,现在有10个sparkstreaming进程在处理数据,同一个usser用户同一时刻的数据,如果被分发带不同的机器上计算完更新到es,那么就会遇到并发问题。比如是对一个数累加操作,原始是100,A进程和B进程同时读到这条数据做更新,A进程加10,B进程加20,正确的结果应该是130,但是由于并发更新,可能会导致A进程的累加操作丢失,最终的结果是120,或者B进程的累加操作丢失,那么最终的结果就是110,不管怎么更新在不考虑锁的情况下,都会导致数据有问题。那么如果我能将同一个用户的数据发送到kafka里面同一个分区内,那么就容易了,如果都在同一个分区内,一个分区内的数据处理是串行的这样就能避免并发问题。
当然如果不能避免,我们就需要通过es的乐观锁类解决并发问题。下面来看下在es中如何使用乐观锁处理并发问题,首先看下并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入es,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。
如何实现?
在插入时,使用es提供的create(true)方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。
scala代码如下:
上面说的是插入时的并发问题解决策略,接着我们看下更新时候遇到并发问题如何处理,主要有2种思路:
(1)如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要
我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。
核心代码如下:
(2)此外,我们还可以通过es内部维护的version字段来自定义实现灵活控制的乐观锁。
我们知道当我们第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:
内部维护的version可以在更新和删除的api时使用
下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:
比如
结果:
现在我们指定version=10去更新后,返回的新响应如下:
如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号
利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新
总结:
本篇主要了介绍了es里面乐观锁的使用,如果仅仅增量的累加或者累减操作,不关注顺序,关注最终结果,我们可以使用es服务端保证冲突重试就行,这样非常方便的就解决了并发冲突问题,如果关注增量顺序,比如索引和更新操作默认采用的最后的数据覆盖以前的数据,如果冲突了我们可以使用version字段来处理冲突问题,此外version可以使用es内部维护的version值,也可以使用我们外部应用传过来的值,并指定version去使用乐观锁进行更新。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
在ElasticSearch中采用的乐观锁,下面先熟悉下什么是乐观锁和悲观锁:
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。
两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。
从上面的介绍中,我们不难发现es为什么要采用乐观锁,因为es大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低es的吞吐,当然并发问题是真实存在,下面给大家分享实际工作中遇到的并发问题。
最好的方式是在设计上就排除并发问题,比如我们的一个项目消费kafka,经过计算后的数据存入es中,如果不设计打入kafka时的策略,就可能会遇到并发插入和更新问题,sparkstreaming集成kafka时,kafka的有多少个分区,就需要给spark设置相应数目的Executors进程,比如10个kafka分区,现在有10个sparkstreaming进程在处理数据,同一个usser用户同一时刻的数据,如果被分发带不同的机器上计算完更新到es,那么就会遇到并发问题。比如是对一个数累加操作,原始是100,A进程和B进程同时读到这条数据做更新,A进程加10,B进程加20,正确的结果应该是130,但是由于并发更新,可能会导致A进程的累加操作丢失,最终的结果是120,或者B进程的累加操作丢失,那么最终的结果就是110,不管怎么更新在不考虑锁的情况下,都会导致数据有问题。那么如果我能将同一个用户的数据发送到kafka里面同一个分区内,那么就容易了,如果都在同一个分区内,一个分区内的数据处理是串行的这样就能避免并发问题。
当然如果不能避免,我们就需要通过es的乐观锁类解决并发问题。下面来看下在es中如何使用乐观锁处理并发问题,首先看下并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入es,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。
如何实现?
在插入时,使用es提供的create(true)方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。
scala代码如下:
def insert(active:String,indexTime:Long,indexTemplate:String):Unit={ val json = JSON.toJSON(active).toString val irb=client.prepareIndex(indexTemplate, activeIndexName,pid); irb.setCreate(true)//保证同一时刻只会有一个数据插入成功 try{ irb.setSource(json).execute().actionGet() }catch { case e: DocumentAlreadyExistsException=>{ Thread.sleep(fail_sleep*1000)////插入失败时,等待1秒后重试 insert(active,indexTime,indexTemplate);//重新插入 } case e:Exception=>{ log.error(插入失败,异常:"+ ExceptionUtils.getStackTrace(e)) } } }
上面说的是插入时的并发问题解决策略,接着我们看下更新时候遇到并发问题如何处理,主要有2种思路:
(1)如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要
我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。
核心代码如下:
- val sb_json = new StringBuffer("ctx._source.ct += inc"); val params: java.util.HashMap[String, Int] = new java.util.HashMap[String, Int] params.put("inc", active.getCt) val script = new Script(sb_json.toString(), ScriptService.ScriptType.INLINE, "groovy", params) val up=client.prepareUpdate(indexTemplate,activeIndexName,pid) up.setRefresh(true) up.setScript(script) up.setRetryOnConflict(retryConunt)//设置重试次数 up.get()
(2)此外,我们还可以通过es内部维护的version字段来自定义实现灵活控制的乐观锁。
我们知道当我们第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:
{ "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[blog][1]: version conflict, current [2], provided [1]", "index": "website", "shard": "3" } ], "type": "version_conflict_engine_exception", "reason": "[blog][1]: version conflict, current [2], provided [1]", "index": "website", "shard": "3" }, "status": 409 }
内部维护的version可以在更新和删除的api时使用
下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:
比如
PUT /website/blog/2?version=5&version_type=external { "title": "My first external blog entry", "text": "Starting to get the hang of this..." }
结果:
{ "_index": "website", "_type": "blog", "_id": "2", "_version": 5, "created": true }
现在我们指定version=10去更新后,返回的新响应如下:
PUT /website/blog/2?version=10&version_type=external { "title": "My first external blog entry", "text": "This is a piece of cake..." } //=================== { "_index": "website", "_type": "blog", "_id": "2", "_version": 10, "created": false }
如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号
利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新
总结:
本篇主要了介绍了es里面乐观锁的使用,如果仅仅增量的累加或者累减操作,不关注顺序,关注最终结果,我们可以使用es服务端保证冲突重试就行,这样非常方便的就解决了并发冲突问题,如果关注增量顺序,比如索引和更新操作默认采用的最后的数据覆盖以前的数据,如果冲突了我们可以使用version字段来处理冲突问题,此外version可以使用es内部维护的version值,也可以使用我们外部应用传过来的值,并指定version去使用乐观锁进行更新。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
发表评论
-
复盘一个Elasticsearch排序问题的剖析
2019-07-15 21:12 1170最近线上的es查询的 ... -
elasticsearch里面bulk的用法
2018-04-09 20:23 1424上篇文章介绍了在es里 ... -
elasticsearch里面的关于批量读取mget的用法
2018-04-04 16:01 1834es的api除了提供了基本 ... -
elasticsearch的查询流程分析
2018-04-02 20:29 1330我们都知道es是一个分 ... -
如何在elasticsearch里面使用深度分页功能
2018-03-28 18:13 2022前面的文章提到过es默认的from+size的分页方式返回的结 ... -
如何在Elasticsearch里面使用索引别名
2018-03-27 20:37 1681在elasticsearch里面给index起一个alias ... -
如何优雅的全量读取Elasticsearch索引里面的数据
2018-03-26 20:27 7654### (一)scroll的介绍 有时候我们可能想要读取整个 ... -
关于elaticsearch中更新数据的几种方式
2018-03-21 19:00 969作为一个成熟的框架, ... -
Elasticsearch里面的segment合并
2018-03-20 17:50 2093通过前面的文章,我 ... -
Elasticsearch如何保证数据不丢失?
2018-03-19 20:52 2143上篇文章提到过,在elasticsearch和磁盘之间还有一层 ... -
为什么说Elasticsearch搜索是近实时的?
2018-03-16 19:41 9458通过前面两篇文章的介绍,我们大概已经知道了 Elasticse ... -
Elasticsearch如何动态维护一个不可变的倒排索引
2018-03-15 21:34 1140上一篇文章中介绍了Elasticsearch中是如何搜索文本 ... -
Elasticsearch如何检索数据
2018-03-14 20:11 1072我们都知道Elasticsearch是一个全文检索引擎,那么它 ... -
如何备份ElasticSearch索引数据到HDFS上
2018-02-09 18:19 2424在ElasticSearch里面备份策略已经比较成熟了 ... -
Elasticsearch5.6.4集群搭建
2018-02-07 20:13 1341本次搭建的是一个三节点的集群 (一)es的安装 (1)下 ... -
如何使log4j生成json格式的log
2017-09-15 17:44 3826使用java开发项目时,log日志一般都是应用程序必不可少的一 ... -
理解elasticsearch的parent-child关系
2017-09-04 18:43 2841前面文章介绍了,在es里面的几种数据组织关系,包括array ... -
简述ElasticSearch里面复杂关系数据的存储方式
2017-08-18 20:10 2414在传统的数据库里面,对数据关系描述无外乎三种,一对一,一对多和 ... -
使用Java Rest Client操作Elasticsearch
2017-08-09 19:42 2272Elasticsearch作为一个成熟的开源框架,对主流的多种 ... -
ElasticSearch里面的偏好查询
2017-06-22 17:17 1306在es查询的时候我们可 ...
相关推荐
**标题:“Elasticsearch 并发修改乐观锁”** 在分布式搜索引擎Elasticsearch中,数据的并发修改是一个关键问题,而乐观锁就是一种用于解决这种问题的机制。乐观锁假设在大多数情况下,不会发生数据冲突,因此在...
3. 更新策略:设计合理的更新策略,如采用乐观锁控制并发更新,确保数据一致性。 ### 集成与配置 要将MySQL热更新功能集成到Elasticsearch和IK分词器中,我们需要进行以下步骤: 1. 安装IK分词器:下载并安装`...
这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。 (2)区别 语法区别: es内部version: ?version=1 external version:?v
2. **乐观锁**:在提交操作时检查是否发生冲突,仅在未发生冲突时执行更新。 Elasticsearch内部维护了一个 `_version` 字段来实现内部版本控制,每次文档更新,该版本号自增。同时,可以通过 `version_type=...
2)指定版本号更新数据,第一次更新成功3)指定相同版本号,进行第二次更新失败参考ES 7.13——乐观锁并发控制Elasticsearch系列---并发控制及乐
* 电商网站商品管理:使用Elasticsearch实现商品的CRUD(Create、Read、Update、Delete)操作。 * 多种搜索方式:Query String Search、Query DSL、Phrase Search、Highlight Search等。 五、Elasticsearch基础...
1、先构造一条数据出来 PUT /test_index/test_type/7 { test_field:test test } -------------------------------结果------------------------------- { _index: test_index, _type: test_type, ...
在IT领域,尤其是在大数据分析和实时检索的场景中,Elasticsearch(ES)因其高效、灵活的特性,已经成为一种广泛使用的工具。本文将深入探讨在处理大宽表应用中的实践案例,以及面临的挑战和解决方案。 首先,我们...
* 版本控制问题:在并发情况下,Elasticsearch 可能会存在版本控制问题,需要使用乐观锁机制来解决这个问题。 其他概念 * 分词:Elasticsearch 中的分词是指将文本拆分成多个词语,用于提高搜索效率。 * 命中次数...
1、基于Elasticsearch最新版本,5.2版本,进行课程的讲解,让大家学好技术后,绝对不会落伍。而市面上的书籍和视频,使用的Elasticsearch版本都非常陈旧...4、包含很多独家的核心知识点和技术,比如乐观锁并发控制,写
对于秒杀场景,可以考虑使用乐观锁或悲观锁来控制并发下的数据一致性。 6. **预生成令牌**:为了避免请求洪峰直接冲击系统,可以在秒杀开始前预先生成一定数量的令牌,用户需要先获取令牌才能参与秒杀。这样可以...
- **数据库优化**:使用乐观锁或悲观锁防止并发问题,如商品数量减一操作。 5. **分布式锁**:在秒杀过程中,为了防止超卖,可能需要使用分布式锁来同步多线程间的操作,例如使用Redis实现的分布式锁。 6. **...
使用乐观锁或悲观锁来控制并发下的数据更新,避免死锁。 5. **缓存策略**:引入缓存(如Redis)可以提高读取速度,减轻数据库压力。秒杀商品信息、用户信息等常用数据可预加载到缓存中,但要注意缓存雪崩和缓存穿透...
10. **监控与日志**:集成Prometheus和Grafana进行性能监控,使用ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便于及时发现并解决问题。 11. **负载均衡**:使用Nginx或Spring Cloud Gateway...
可以使用乐观锁或悲观锁机制,例如在MySQL中使用`for update`或在数据表中添加版本号字段来实现。 2. **缓存策略**:利用Redis等内存数据库进行热点商品的缓存,减少对数据库的直接访问,提高响应速度。 3. **队列...
7. **监控与日志**:使用如Prometheus和Grafana等工具进行性能监控,以及ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志收集和分析,以便及时发现和解决问题。 综上所述,构建SpringBoot+Zookeeper+Dubbo的...
Mybatis-Plus是Mybatis的扩展插件,它在Mybatis的基础上做了增强,提供了更丰富的 CRUD 操作,如:分页查询、主键自增、乐观锁等。同时,它还简化了开发流程,降低了学习成本,提升了开发效率。 4. **Elastic...