之前一篇博客中写道solrCloud对查询的请求是在服务端进行的组装,是对所有的shard的所有的replica进行的轮训的。这两天看了下在服务端solr是如何进行操作的,这里涉及到的代码超级多,我就只贴一部分,用来说明大意即可。
在将查询请求发往到某个replica之后,先根据path找到某个requestHandler(我们这里用select举例),然后再用这个requestHandler中所有的searchComponent进行查询操作,他的分布式的操作就是体现在多个searchComponent中,每一个searchComponent不只是要完成它所存在的shard中的工作,还有其他shard中的工作,想当然这里不会使用同步,也就是不会在当前的shard的任务完成之后才会将请求转发到其他的shard,最好是采取异步执行的方式,将某个任务交给线程池,然后继续执行自己的任务,在执行完成后再处理其他的shard返回的数据。solr正是采取了后者——使用一个shardHandler用来转发请求到其他的shard,然后异步的等待其他的shard的执行结果。在solrCloud中使用的shardHandler的实现类是HttpShardHandler,顾名思义,他是采用http的协议与其他的shard进行交互的,其他shard的操作结果返回到当前的shard中,然后再组装最后的结果。在solrhome下我们可以发现有个solr.xml,里面就有关于HttpShardHandler的配置,
<shardHandlerFactory name="shardHandlerFactory" class="HttpShardHandlerFactory"> <!--用于产生一个HttpShardHandler--> <int name="socketTimeout">${socketTimeout:600000}</int> <!--httpClient的socketTimeout--> <int name="connTimeout">${connTimeout:60000}</int> <!--httpClient的connectionTimeout--> </shardHandlerFactory>
HttpShardHandler发起http请求使用的是apache的httpClient,上面的两个配置就是配置的httpClient的两个超时时间(httpCLient有三个超时时间,详情参看我的另一个博客)。想到这就会有很多的疑问,如果访问的时候某个shard死掉了呢(zk中的session还没有过期的情况),又或者他没有死掉但是他的操作非常慢一直到超过上面配置的socketTimeout呢,这种情况下怎么操作?但凡遇到这种情况,看源码是最好的办法,在httpShardHandler中,有个submit方法,他就是某个searchComponent添加任务到httpShardHandler的线程池中,我们看一下这个方法:
/** * 第一个参数表示要发起的请求 * 第二个参数表示要发送到的shard的所有的replica的url,用|分隔 * 第三个参数表示请求的参数 */ @Override public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { // do this outside of the callable for thread safety reasons final List<String> urls = getURLs(sreq, shard);//获得本次访问的所有的url,一个shard有多个replica Callable<ShardResponse> task = new Callable<ShardResponse>() {//将请求封装为一个可以异步执行的callable,最后返回的是一个ShardResponse @Override public ShardResponse call() throws Exception { ShardResponse srsp = new ShardResponse(); if (sreq.nodeName != null) { srsp.setNodeName(sreq.nodeName); } srsp.setShardRequest(sreq); srsp.setShard(shard); SimpleSolrResponse ssr = new SimpleSolrResponse(); srsp.setSolrResponse(ssr); long startTime = System.nanoTime(); try { params.remove(CommonParams.WT); // use default (currently javabin) params.remove(CommonParams.VERSION); QueryRequest req = makeQueryRequest(sreq, params, shard); req.setMethod(SolrRequest.METHOD.POST); // no need to set the response parser as binary is the default // req.setResponseParser(new BinaryResponseParser()); // if there are no shards available for a slice, urls.size()==0 if (urls.size() == 0) { // TODO: what's the right error code here? We should use the same thing when // all of the servers for a shard are down. throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); } if (urls.size() <= 1) { String url = urls.get(0); srsp.setShardAddress(url); try (SolrClient client = new HttpSolrClient(url, httpClient)) {//如果只有一个url,也就是只有一个replica,则直接用这个url发起http请求 ssr.nl = client.request(req); } } else { LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);//如果有多个replica,则会进行负载均衡。 ssr.nl = rsp.getResponse(); srsp.setShardAddress(rsp.getServer()); } } catch (ConnectException cex) { srsp.setException(cex); } catch (Exception th) { // 从这两个catch可以发现,如果在执行的时候发生了任何的异常都会将异常封装到srsp也就是最后的结果中,而不会抛出异常。 srsp.setException(th); if (th instanceof SolrException) { srsp.setResponseCode(((SolrException) th).code()); } else { srsp.setResponseCode(-1); } } ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); return transfomResponse(sreq, srsp, shard); } }; try { if (shard != null) { MDC.put("ShardRequest.shards", shard); } if (urls != null && !urls.isEmpty()) { MDC.put("ShardRequest.urlList", urls.toString()); } pending.add(completionService.submit(task));//将封装的任务添提交到completionService,由其他线程执行这个任务,等待执行结果,然后将最后的结果放到一个集合中(pending就是一个泛型是Future的集合) } finally { MDC.remove("ShardRequest.shards"); MDC.remove("ShardRequest.urlList"); } }
看完上面的代码就知道了原来果然是采用的异步执行,并且在执行过程中不会抛出任何的错误,如果有错误的也会封装在结果中。然后我们再看一下取结果的时候的操作,下面的代码摘抄于org.apache.solr.handler.component.SearchHandler.handleRequestBody(SolrQueryRequest, SolrQueryResponse)这个方法,
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); //从请求中得到shards.tolerant参数,默认是false ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors():shardHandler1.takeCompletedOrError();//得到线程池执行的结果 if (srsp == null) break; // no more requests to wait for // Was there an exception? if (srsp.getException() != null) { //如果有异常 // If things are not tolerant, abort everything and rethrow if(!tolerant) {//如果没有在参数中写shards.tolerant=true,则报错 shardHandler1.cancelAll();//取消所有的操作, if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } else { if(rsp.getResponseHeader().get("partialResults") == null) {//如果是容错的,也就是shards.tolerant=true,则不报错,允许部分成功,然后再响应头中添加一个值partialResults=true,表示这词的请求是部分成功。 rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } } }
现在知道了如果在请求的时候害怕因为某个shard响应太慢而耽误太多的时间,则可以将httpShardHandler的两个timeout配置的小一点,然后再请求中设置shards.tolerant=true,这样就可以了。我测试的java代码(我这次使用的是solr5.5.3):
static CloudSolrClient getServer(){ CloudSolrClient server = new CloudSolrClient("10.90.26.115:2181/solr5"); server.setZkConnectTimeout(10000*3); return server; } static void queryTest() throws SolrServerException, IOException{ CloudSolrClient server = getServer(); SolrQuery query = new SolrQuery("id:?6");//我搜一下id是两位数,并且是以6结尾的。 query.set("shards.tolerant", true);//设置允许出错 QueryResponse response = server.query("你的集合的名字", query);// System.out.println(response.getResults().getNumFound()); System.out.println(response.getResponseHeader().get("partialResults")); }
执行上面的代码,分三个阶段,
第一个阶段是将所有的shard都存活,可以发现打印的partialResults是null,
第二个是将某一个shard停掉,设置不容错,即shards.tolerant=false,结果是报异常,提示某个shard没有节点处理。
第三个是维持某一个shard停掉,设置shards.tolerant=true 可以发现不报错了,但是numFound变少了,而且打印的是true。
至此,已经掌握容错请求的实现。在实际生产中可以根据响应头的partialResults来记录日志,而不影响前台的展示。
相关推荐
基于SolrCloud的分布式相似性检测系统是一项创新技术,它在文档相似性检测领域提供了一种全新的解决方案。随着数字化进程的加快,文档、论文、网页等数字文档的数量呈现出爆炸性增长。这种增长不仅带来了信息的极大...
《基于SolrCloud的分布式科技项目查重系统》这篇文章主要探讨了如何利用分布式技术构建一个实时响应的科技项目查重系统。该系统旨在从海量科技项目文件库中检测出与待检测项目相似的文本,确保科技项目的原创性和...
它在SolrCloud中负责管理集群状态和处理节点故障。 系统设计还涵盖了数据挖掘技术与应用,以及文本挖掘的研究方向。这些技术应用在数据预处理、索引构建和检索优化中。数据挖掘是通过算法和统计学等方法,在大量...
分布式驱动一体化解决方案在汽车制造业中扮演着至关重要的角色,尤其在总装车间的物料输送环节。伦茨公司的EMS单轨悬挂输送系统是这类解决方案的典型代表,它以其高度自动化和可靠性,显著提升了物料输送的效率。 ...
- **Collection**:SolrCloud中的最高级别单位,可以包含多个shard。 - **Shard**:一个逻辑上的数据切片,可以有多个replica。 - **Replica**:shard的一个副本,用于容错和负载均衡。 - **Document**:索引中...
### 分布式事务原理及解决方案 #### 1. 引言 随着信息技术的发展,特别是近年来微服务架构的兴起,分布式事务成为了企业级应用中的一个重要挑战。分布式事务涉及到多个不同的服务或资源之间的协同工作,需要保证在...
本文来自于csdn,本文主要从分布式的原因,事务特性,和解决方案中深入理解了分布式事务,希望对您的学习有所帮助。 分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的...
分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的...
最全分布式事务视频课程详细...3.分布式事务解决方案之2PC(xa、seata) 4.分布式事务解决方案之TCC(hmily) 5.分布式事务解决方案之可靠消息最终一致性 6.分布式事务解决方案之最大努力通知 7.分布式事务综合案例分析
分布式事务的解决方案主要针对的是大型分布式系统中保证数据一致性的问题。在电商架构中,随着业务量的增加,数据库压力增大,需要通过分库分表来提高性能和响应速度。然而,这种优化策略会引入新的挑战,即如何在...
分布式故障响应机制是一种基于分布式理念的线上故障处理方法,能够有效解决线上故障处理过程中的时效性和用户体验问题。具体来说,分布式故障响应机制强调在网络环境下多个节点之间的协调与合作。在多媒体教室管理中...
分布式系统运维交付解决方案研究与应用 1. 引言: 随着网络技术的快速进步,软件运行平台从单机环境转向网络环境,计算机系统逐渐由集中式向分布式系统发展。分布式系统因规模庞大、服务对象多、关系复杂等特点,给...
从标题《对分布式数据挖掘解决方案的思考.pdf》和描述《对分布式数据挖掘解决方案的思考.pdf》中可以提炼出以下知识点: 1. 数据挖掘的挑战:传统数据挖掘解决方案存在计算能力有限的问题,尤其在数据挖掘算法迭代...
在分布式事务中,XA、Saga、TCC和MQ补偿等解决方案都可以用于实现分布式事务的一致性和可靠性。每种解决方案都有其特点和优势,选择哪种解决方案取决于具体的业务需求和系统架构。 在 MySQL 中,XA 事务可以通过...
陕鼓集团在分布式能源系统中采用了这种高效的离心压缩机技术,使得其分布式能源系统解决方案能够更贴合用户需求,有效实现节能减排。 此外,陕鼓集团的分布式能源系统解决方案不仅包含硬件设备,还包括销售、生产、...
常用的分布式事务解决方案介绍.ppt常用的分布式事务解决方案介绍.ppt常用的分布式事务解决方案介绍.ppt常用的分布式事务解决方案介绍.ppt常用的分布式事务解决方案介绍.ppt