`
sillycat
  • 浏览: 2539525 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

SOLR Cloud(7)Client Error Handler

    博客分类:
  • JAVA
 
阅读更多
SOLR Cloud(7)Client Error Handler

Achieved Replication Factor
Consider a collection with one shard and a replication factor of three. In this case, you have a shard leader and two additional replicas.The request is still considered successful from the perspective of the client.
Solr supports the optional min_rf parameter on update requests

If min_rf >=1, then Solo would return rf=1 in the response if the request only success on leader.

It is not supported in my Driver. https://github.com/inoio/solrs

It should be working in SOLRJ
http://lucene.472066.n3.nabble.com/SolrCloud-use-of-quot-min-rf-quot-through-SolrJ-td4164966.html

I may just try catch exceptions and make the current thread sleep for some time.
The core parts of the codes are as follow:

package com.sillycat.jobssolrconsumer.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.sillycat.jobssolrconsumer.model.SqsFutureReceiver;
import com.sillycat.jobssolrconsumer.model.SqsMessagesSolrResponse;
import com.sillycat.jobssolrconsumer.util.MessageUtil;

import io.ino.solrs.JavaAsyncSolrClient;

public class ArchiveJobsToSolrTask implements Runnable {

    private static final int HALF_HOUR = 180;

    private static final long STARTTIME = System.currentTimeMillis();

    private static final Logger logger = LoggerFactory.getLogger(ArchiveJobsToSolrTask.class);

    private final AmazonSQSAsync sqsClient;
    private final JavaAsyncSolrClient solrClient;
    …snip...
    private final int TEN_SECOND_DURATION = 10 * 1000; // seconds
    private int repeatedFailures = 0;
   
    …snip...
    public void run() {
        try {
            while (true) {
                // https://en.wikipedia.org/wiki/Reactor_pattern
                this.inflightMsg.acquire();
                // this.rateLimiter.acquire();

                logger.trace("Worker {} - Request for more SQS messages", workerId);
                SqsFutureReceiver<ReceiveMessageRequest, ReceiveMessageResult> processSqsMessageFuture = new SqsFutureReceiver<>();
                sqsClient.receiveMessageAsync(receiveMessageRequest, processSqsMessageFuture);

                logger.trace("Worker {} - Block thread for response", workerId);
                ReceiveMessageResult response = processSqsMessageFuture.get();

                if (response.getMessages().size() > 0) {
                    logger.trace("Worker {} - Process response in a non-blocking fashion", workerId);
                    processSqsMessages(sqsClient, solrClient, response).thenAccept(batchMessageProcessedCount -> {
                        logger.trace("Update message count");
                        long totalMessageProcessed = messageCount.addAndGet(batchMessageProcessedCount);

                        // log rates
                        if (totalMessageProcessed % 100 == 0) {
                            long currentTime = System.currentTimeMillis();
                            long duration = (currentTime - STARTTIME) / 1000;
                            logger.info("Worker {} - Jobs count {}; {} jobs/second", workerId, totalMessageProcessed,
                                    totalMessageProcessed * 10 / duration);
                        }
                    }).whenComplete((value, ex) -> {
                        this.inflightMsg.release();
                    });
                } else {
                    // release the ticket, wait for another 10 seconds
                    this.inflightMsg.release();
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.warn("Worker {} - Thread was interupted; terminating archiving SQS jobs to Solr.", workerId, e);
        } catch (Throwable t) {
            logger.warn("Worker {} - Worker threw an uncaught exception; terminating archiving SQS jobs to Solr.",
                    workerId, t);
            throw t;
        }
    }

    private CompletableFuture<Integer> processSqsMessages(AmazonSQSAsync sqsClient, JavaAsyncSolrClient solrClient,
            ReceiveMessageResult response) {
        return CompletableFuture.supplyAsync(() -> response)
                .thenCompose((receiveMessageResult) -> addSqsJobsToSolr(solrClient, receiveMessageResult))
                .thenCompose((messagesAndSolrResponse) -> deleteSuccessfullyProcessedMessagesFromSqs(sqsClient,
                        messagesAndSolrResponse))
                .thenApply((deleteResults) -> processSqsDeleteResults(deleteResults)).exceptionally((ex) -> {
                    logger.error("Unable to process SQS job messages", ex);
                    this.handleRetryPolicy();
                    return 0;
                });
    }

    private void handleRetryPolicy() {
        this.repeatedFailures = this.repeatedFailures + 1;
        try {
            // 1x1 = 1, 2x2 = 4, 3*3 = 9, 4*4 16, 5*5 = 15, 6*6 = 36
            int maxFailure = Math.min(this.repeatedFailures * this.repeatedFailures, HALF_HOUR);
            Thread.sleep(this.TEN_SECOND_DURATION * maxFailure);
        } catch (InterruptedException e) {
            logger.error("Thread Sleep Fail:", e);
        }
    }

    private CompletionStage<SqsMessagesSolrResponse> addSqsJobsToSolr(JavaAsyncSolrClient solrClient,
            ReceiveMessageResult receiveMessageResult) {
        logger.trace("Process the SQS message contents");
        final List<Message> messages = receiveMessageResult.getMessages();

        List<SolrInputDocument> solrJobs = null;
        try {
            solrJobs = convertSqsMessageToSolrJob(messages);
        } catch (Throwable t) {
            throw new RuntimeException("Unable to convert SQS message to Solr Jobs", t);
        }

        logger.trace("Send to Solr");
        CompletionStage<UpdateResponse> solrResponse = solrClient.addDocs(solrJobs);
        return solrResponse.thenApply((updateResponse) -> new SqsMessagesSolrResponse(messages, updateResponse));
    }

    private Integer processSqsDeleteResults(DeleteMessageBatchResult deleteResults) {
        if (deleteResults.getFailed().size() != 0) {
            logger.error("Unable to process {} messages", deleteResults.getFailed().size());
            throw new RuntimeException("Unable to process " + deleteResults.getFailed().size() + " messages");
        }
        // successful execute, reset the failure to 0
        this.repeatedFailures = 0;
        return deleteResults.getSuccessful().size();
    }

    private CompletionStage<DeleteMessageBatchResult> deleteSuccessfullyProcessedMessagesFromSqs(
            AmazonSQSAsync sqsClient, SqsMessagesSolrResponse messagesAndSolrResponse) {
        if (messagesAndSolrResponse.getUpdateResponse().getStatus() != 0) {
            throw new RuntimeException("Error response status of "
                    + messagesAndSolrResponse.getUpdateResponse().getStatus() + " from SOLR server");
        }

        List<Message> messages = messagesAndSolrResponse.getMessages();

        logger.trace("Return future of Solr success with original SQS Messages");

        // TODO consider always deleting messages that fail to process
        // TODO delete all messages that were processed
        final AtomicLong counter = new AtomicLong(0L);
        List<DeleteMessageBatchRequestEntry> deleteEntries = messages.stream()
                .map((m) -> new DeleteMessageBatchRequestEntry().withId("" + counter.getAndIncrement())
                        .withReceiptHandle(m.getReceiptHandle()))
                .collect(Collectors.toList());

        SqsFutureReceiver<DeleteMessageBatchRequest, DeleteMessageBatchResult> deleteFuture = new SqsFutureReceiver<>();
        sqsClient.deleteMessageBatchAsync(
                new DeleteMessageBatchRequest().withQueueUrl(queueURL).withEntries(deleteEntries), deleteFuture);
        return deleteFuture;
    }

    private List<SolrInputDocument> convertSqsMessageToSolrJob(final List<Message> messages) {
        logger.trace("Create Solr request");
        List<SolrInputDocument> solrJobs = messages.stream().flatMap(sqsMessage -> {
            // uncompress the messages
            String snsMsg = sqsMessage.getBody();
            List<Map<String, Object>> maps = MessageUtil.decodeMessages(snsMsg);

            return maps.stream().map(map -> {
                return messageService.convertHashMap2SolrInputDocument(map);
            });
        }).collect(Collectors.toList());
        return solrJobs;
    }

}

That is the codes.

References:
https://lucene.apache.org/solr/guide/7_1/solrcloud-recoveries-and-write-tolerance.html

分享到:
评论

相关推荐

    solr cloud6.1.0拼音分词

    Solr Cloud 6.1.0 是 Apache Solr 的一个版本,它是一个开源的企业级搜索平台,被广泛用于构建高效、可扩展的全文检索服务。在这个版本中,它支持拼音分词,使得中文搜索能力得到显著提升。拼音分词是处理中文文本的...

    solr入门java工程

    Solr 是一个开源的全文搜索引擎,它被广泛用于构建企业级的搜索应用。在这个"solr入门java工程"中,我们将探讨如何使用Java客户端与华为FusionInsight SolrTest进行交互,以及如何在SolrCloud模式下进行分布式搜索。...

    solr 数据迁移工具

    使用多线程方式 通过solrj 接口向solr新增索引信息

    ik-analyzer中文分词器for solr7

    在Solr7这个版本中,IK Analyzer被优化以兼容Solr Cloud模式,使得在分布式环境下也能顺畅地进行中文分词处理。 "兼容solr-cloud"意味着IK Analyzer已经适配了Solr的集群架构。Solr Cloud是Solr的一种分布式部署...

    ik-analyzer-solr7-7.x.zip

    标题“ik-analyzer-solr7-7.x.zip”表明这是一个与Solr7相关的压缩包,其中包含了IK Analyzer,一个广泛使用的中文分词工具。这个压缩包特别为Solr7版本进行了优化,提供了完整的配置文件,使得用户可以方便地集成到...

    kafka-solr-sink-connector:这是基于JSON的简单Solr Sink Kafka连接器,使用Solr Cloud

    kafka-solr-sink连接器这是基于Java的简单Solr Sink Kafka连接器,它从kafka主题获取纯json数据并推送到solr,同时支持solr cloud和独立模式。 注意:仅支持JSON数据,对于值转换器,请保留schemas.enable=false 。...

    solr-node-client:Node.js的Solr客户端

    solr-client-一个node.js solr客户端安装使用以下方法安装库: npm install --save solr-client文献资料支持Node.js 6+版本。 支持Solr 3-8版本。 有关更多详细信息,请参见。 如果您要从早期版本升级,请参阅。 您...

    solr7.5_ik分词器,suggest配置源文件文件

    Solr是中国Apache软件基金会开发的一款高性能、全文搜索引擎服务器,它基于Lucene库并提供了更高级别的服务,如分布式搜索、缓存、复制和集群管理。Ik分词器是Solr中常用的中文分词插件,它能有效地对中文文本进行...

    Tomcat上部署SolrCloud.txt

    7. **部署至Tomcat**:将解压后的Solr Web应用部署至Tomcat服务器中。 - 示例操作:将`/home/myuser/solr-war/`目录下的文件拷贝至Tomcat的webapps目录。 #### 五、总结 本文详细介绍了如何在Tomcat服务器上...

    ik-analyzer-solr7.zip

    解压"ik-analyzer-solr7.zip"后,你会得到ik-analyzer-solr7.x目录,这个目录包含了IKAnalyzer在Solr中的所有相关组件。通常,这些组件包括配置文件、分词器的JAR库以及可能的字典文件。配置文件(如`schema.xml`或`...

    针对Solr的SQL查询引擎

    Solr-SQL为Solr Cloud提供了SQL接口,开发人员可以通过JDBC协议在Solr Cloud上运行。同时,solr-sql是用于solr的Apache Calcite(见 http://calcite.apache.org)适配器。solr-sql 是用 Scala 编写的,它可以生成像 ...

    solr各种最近的jar包

    `solr-cloud.jar`包含了分布式搜索和集群管理的实现,使得多个Solr节点可以协同工作,实现高可用性和水平扩展。 6. **数据导入导出工具**:Solr提供了DataImportHandler(DIH)来导入外部数据源,如数据库、CSV文件...

    solr-cloud-5.3.0 window7伪集群搭建

    windows环境 1.伪集群,将压缩包解压后放在以下目录中(任意盘,此处是E:) :E:\solr\solr-5.3.0-cloud 2.修改 solr_home1\bin\solr.in.cmd中的SOLR_HOST 3.运行build脚本

    solr压力测试报告

    - **测试目的**:通过对单节点Solr与基于Hadoop分布式文件系统(HDFS)的Solr Cloud集群进行压力测试,分析两者在读写性能上的差异,从而为生产环境中的Solr部署提供参考。 #### 二、测试环境配置 - **单节点Solr**...

    solr7官方文档

    文档标题 "solr7官方文档" 指示了这是针对 Solr 7.x 版本的使用手册,这个版本的 Solr 是目前较为先进稳定的版本,包含了大量功能和性能上的改进。 文档的【描述】部分反复强调“solr 使用官方指南”,这意味着文档...

    java进阶Solr从基础到实战

    1. Solr Cloud 2. 函数查询 3. 地理位置查询 4. JSON Facet 章节四:Solr高级(下) 1. 深度分页 2. Solr Join查询 3. 相关度排序 4.Solr缓存 5.Spring Data Solr 章节五:综合案例,电商网站搜索页面 1.关键字搜索...

    支持 Solr7 的 ik 分词器

    **支持 Solr7 的 ik 分词器** 在信息检索和文本分析领域,分词器扮演着至关重要的角色,它能够将连续的文本流分解为可处理的词汇单元。`ik` 分词器是针对中文文本处理的一个高效工具,特别适用于大规模数据的搜索...

    ik-analyzer-solr7(支持solr7)

    标题中的"ik-analyzer-solr7(支持solr7)"指的是IK Analyzer,这是一个针对Apache Solr搜索引擎的中文分词插件,专为Solr 7版本进行了优化和适配。IK Analyzer是一款开源的Java实现的中文分词器,旨在提供高效、灵活...

    solr-maven-plugin:一个用于停止Apache Solr Cloud的Maven插件

    一个用于启动/停止Apache Solr Cloud的Maven插件。 安装 在Maven Central上可用的发行版 &lt; groupId&gt;io.github.borisnaguet &lt; artifactId&gt;solr-maven-plugin &lt; version&gt;0.5.0 Sonatype信息库中的快照(每次推...

Global site tag (gtag.js) - Google Analytics