`
woodding2008
  • 浏览: 289623 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka high-level consumer 多线程访问异常

 
阅读更多

        在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

 

 

 def hasNext(): Boolean = {
    if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常
      throw new IllegalStateException("Iterator is in failed state")
    state match {
      case DONE => false
      case READY => true
      case _ => maybeComputeNext()
    }
  }


  def maybeComputeNext(): Boolean = {
    state = FAILED              //重置了状态
    nextItem = Some(makeNext())        
    if(state == DONE) {
      false
    } else {
      state = READY
      true
    }
  }


protected def makeNext(): MessageAndMetadata[K, V] = {
    var currentDataChunk: FetchedDataChunk = null
    // if we don't have an iterator, get one
    var localCurrent = current.get()
    if(localCurrent == null || !localCurrent.hasNext) {
      if (consumerTimeoutMs < 0)
        currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞

      else {
        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
        if (currentDataChunk == null) {
          // reset state to make the iterator re-iterable
          resetState()
          throw new ConsumerTimeoutException
        }
      }
//省略部分代码
}

 

 

 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

分享到:
评论

相关推荐

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    pentaho-kafka-consumer.zip

    在标题"pentaho-kafka-consumer.zip"中,我们看到的是一个专门为Pentaho Kettle定制的Kafka消费者插件的压缩包。 这个压缩包的描述提到了如何在Pentaho环境中安装和使用这个插件。首先,你需要在你的Pentaho Kettle...

    kafka-clients-2.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    kafka-clients-2.0.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka2种工具 kafkatool-64bit.exe kafka-eagle-bin-1.4.6.tar.gz

    本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6.tar.gz进行详细介绍,帮助读者理解它们的功能和用途。 首先,我们来看kafkatool-64bit.exe,这是一个开箱即用的Kafka...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    kafka-python-2.0.2.tar.gz

    《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库...

    指定时间段消费Kafka工具

    使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...

    kafka-clients-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    kafka-manager-1.3.3.21

    - **集群管理**:Kafka-Manager 可以添加、删除和查看多个 Kafka 集群,方便多集群环境下的运维。 - **主题操作**:支持创建、修改、删除、查看主题配置,以及对分区进行增加或减少等操作。 - **消费者管理**:...

    kafka-2.13-3.4.0.tgz

    Kafka采用分布式架构,可以在多台服务器上部署,通过分区(Partitions)和副本(Replicas)实现水平扩展和高可用性。每个分区都有一个主副本,负责处理写操作,其他副本作为备份,当主副本故障时,可以自动切换。 ...

    kafka2.12-3.8.0

    kafka2.12-3.8.0

    kafka_2.11-2.2.2.tgz

    Kafka提供了丰富的命令行工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等,用于查看和管理Topics、Partitions、Consumer Groups等。 十一、注意事项 1. Kafka默认使用9092端口,确保没有其他服务占用。 ...

    java开发kafka-clients所需要的所有jar包以及源码

    - 异常处理:了解Kafka-clients可能抛出的异常类型,如`KafkaException`、`RetriableException`等,编写合适的错误处理逻辑。 - 性能调优:包括批处理、缓冲、重试策略、线程池设置等,以提高消息处理速度和系统...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

Global site tag (gtag.js) - Google Analytics