`

kafka producer服务端

阅读更多

 

producer服务端:

1.nio接受请求 http://blackproof.iteye.com/blog/2239949

 

2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953

 

3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:

  def handle(request: RequestChannel.Request) {
    try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

 

 3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里

         appendToLocalLog是主要的处理类

 3.2 按照客户produer设置的ack级别,处理如何返回客户端

           0,不做任何返回,直接wake处理之后的请求

           1,获取leader的result,并返回

           -1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息

                   如果未满足且没有错误信息,则设置watcher

                   如果超时则放入到delay操作的队列中

 

  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    ......
    val sTime = SystemTime.milliseconds
    //将数据插入到本地log(默认本地为leader)
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false
    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))

    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)

    //获得结果,是否有错误信息(throw error)
    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
    if(produceRequest.requiredAcks == 0) {
      //当acks基本为0,则无需任务响应,直接返回执行成功
      // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
      // no response is expected by the producer the handler will send a close connection response to the socket server
      // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
      if (numPartitionsInError != 0) {
        info(("Send the close connection response due to error handling produce request " +
          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
        requestChannel.closeConnection(request.processor, request)
      } else {

        if (firstErrorCode == ErrorMapping.NoError)
          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))

        //offset,producer两种请求
        if (offsetCommitRequestOpt.isDefined) {
          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
        } else
          requestChannel.noOperation(request.processor, request)
      }
    } else if (produceRequest.requiredAcks == 1 ||
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {
      //需要leader确认请求,才返回执行成功

      if (firstErrorCode == ErrorMapping.NoError) {
        //offsetsCache 更新offsetmanager的offset内存
        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
      }

      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
      //返回处理之后的response,包含produceresult信息ProducerResponseStatus
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    } else {
      //需要所有replica都受到请求,才返回成功
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(
        producerRequestKeys,
        request,
        produceRequest.ackTimeoutMs.toLong,
        produceRequest,
        statuses,
        offsetCommitRequestOpt)

      //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能)
      // add the produce request for watch if it's not satisfied, otherwise send the response back
      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
      if (satisfiedByMe)
        producerRequestPurgatory.respond(delayedRequest)
    }
  }
 appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法

 

 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
        val info = partitionOpt match {
          case Some(partition) =>
            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader
          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicAndPartition, brokerId))
        }

 3.1

appendMessagesToLeader

          ...... 
         //检查isr之后,进行真正往log里写的方法
          val info = log.append(messages, assignOffsets = true)
          // probably unblock some follower fetch requests since log end offset has been updated
          // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较,
          maybeIncrementLeaderHW(leaderReplica)
         ...... 

3.1.1.log.append方法:

检查是否需要segment生成新文件,数据入segment,更新lastoffset

        // maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush
        val segment = maybeRoll(validMessages.sizeInBytes)

        // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里;
        // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的)
        segment.append(appendInfo.firstOffset, validMessages)

        // increment the log end offset
        updateLogEndOffset(appendInfo.lastOffset + 1)

 segment.append方法:

数据入file channel流里,判断是否如index中

  def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

 3.1.2 unblockDelayedFetchRequests

检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应

  def unblockDelayedFetchRequests(key: TopicAndPartition) {
    val satisfied = fetchRequestPurgatory.update(key)
    debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))

    // send any newly unblocked responses
    satisfied.foreach(fetchRequestPurgatory.respond(_))
  }

 3.1.3 maybeIncrementLeaderHW

  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
    //message offset相减,获得最小的offset(最迟更新的)
    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
    val oldHighWatermark = leaderReplica.highWatermark
    if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark
      leaderReplica.highWatermark = newHighWatermark
      debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
      // some delayed requests may be unblocked after HW changed
      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
      replicaManager.unblockDelayedFetchRequests(requestKey)
      replicaManager.unblockDelayedProduceRequests(requestKey)
    } else {
      debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
        .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
    }
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

 
分享到:
评论

相关推荐

    kafka客户端

    3/kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接 4/kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被...

    kafka 2.7.0安装包

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic ``` 在打开的控制台输入消息,按回车发送。 - 使用`kafka-console-consumer.sh`消费消息: ``` ./kafka-console-consumer.sh --...

    kafka服务端安装包

    使用`bin/kafka-topics.sh`命令可以创建主题,`bin/kafka-console-producer.sh`可以作为生产者发送消息,`bin/kafka-console-consumer.sh`则可以作为消费者接收并打印消息。 在实际应用中,Kafka常与其他大数据工具...

    C#_Kafka_Demo.rar

    首先,确保已安装Kafka环境,并配置好服务端。Kafka服务器通常包括Zookeeper和Kafka Broker,它们分别负责元数据管理和实际的消息存储与分发。安装完成后,需要创建主题(Topic),这是Kafka中消息的基本单位。 接...

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    - 对于生产者,创建一个`FlinkKafkaProducer`实例,同样需要包含Kerberos配置。 - 生产者需要在发送消息前完成Kerberos认证。 5. **处理票证刷新** - Kerberos票证有有效期,需要定期刷新。在Java中,可以通过`...

    Scala安装包和kafka安装包

    Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...

    kafka全套视频教程

    - **server.properties**:Kafka服务端的主要配置文件,包括broker.id、log.dirs、num.partitions等关键参数。 - **client.properties**:客户端连接服务器时使用的配置文件,包含bootstrap.servers等参数。 ####...

    kafka及其性能测试

    除此之外,还可以使用YammerMetrics来报告服务端和客户端的性能指标,或使用Kafka Manager来查看整个集群的性能指标。 性能测试脚本可以用于对Kafka集群的写入性能进行压测,例如使用命令行工具./kafka-producer-...

    kafka基础代码实现

    在本文中,我们将深入探讨Kafka的基础代码实现,包括客户端和服务端的交互。 **1. Kafka架构** Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和 broker。生产者负责发布消息到主题,消费者...

    kafka使用与安装

    3. **KafkaServer**:Kafka服务端,处理生产者和消费者的请求,包括Partition管理和Replica管理。 4. **ZookeeperIntegration**:与Zookeeper的交互,存储主题、分区和副本的元数据。 5. **MessageFormat**:消息...

    kafka流培训材料

    1. Producer(生产者):消息的发布者,负责把数据发送给Broker。 2. Broker(服务端):负责接收、存储和处理消息。一个Kafka集群包含多个Broker。 3. Consumer(消费者):消息的订阅者,负责消费消息。 4. ...

    kafka开发文档

    生产者(Producer)负责向Kafka的主题中发布消息,而消费者(Consumer)订阅并消费主题中的消息。Kafka集群由一个或多个代理(Broker)组成,每个代理都是一个单独的服务端。生产者通过TCP协议发送消息给Kafka集群,...

    Kafka_API_文档

    - **代理(Broker)**:Kafka集群由一个或多个服务端组成,每个服务端称为代理,它们共同维护主题和分区的日志。 **1.3 主题与日志** - **主题**:每个主题包含多个分区,每个分区都维护了一个有序的消息序列,并且...

    kafka开发运维实战分享.pptx

    Kafka 的架构主要包括 **生产者(Producer)**、**Broker** 和 **消费者(Consumer)**: - **生产者**:使用用户主线程封装消息到 ProducerRecord,经过序列化后,由 Partitioner 决定目标分区并放入内存缓冲区。...

    log4j2+kafka

    而"eureka-server-single"可能是一个Eureka服务注册与发现服务器,帮助系统中的各个组件(如Log4j2的客户端和服务端)找到彼此,确保日志数据能正确路由到Kafka。 集成Log4j2和Kafka的优势在于: 1. 实时性:日志...

    java实现tcp客户端发送服务端解析程序

    这需要引入Apache Kafka的Java客户端库,并创建一个Producer实例: ```java Properties props = new Properties(); props.put("bootstrap.servers", "kafka服务器地址:端口"); props.put("key.serializer", "org....

    Kafka快速实战与基本原理详解:从零到精通

    Kafka的核心组件包括Broker、Topic、Producer和Consumer。Broker是Kafka集群中的处理节点,多个Broker可以构成一个集群,提供高可用性和可扩展性。Topic是消息的分类,每条消息都需指定一个Topic。Producer是消息的...

    采用消息队列实现客户端与服务端的通信

    1. **生产者(Producer)**:客户端或任何产生消息的组件,负责将消息放入消息队列。 2. **消费者(Consumer)**:服务端或其他处理消息的组件,从消息队列中取出并处理消息。 3. **消息队列(Message Queue)**:...

    基于springboot+kafka的日志处理系统.zip

    在本系统中,SpringBoot应用通过Kafka的Producer API发送日志到预设的主题(Topic),这些主题可以代表不同的日志类型或者服务。Kafka的Broker集群接收到这些日志后,将它们分发到各个消费者。另一方面,消费者端...

Global site tag (gtag.js) - Google Analytics