producer服务端:
1.nio接受请求 http://blackproof.iteye.com/blog/2239949
2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953
3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:
def handle(request: RequestChannel.Request) { try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里
appendToLocalLog是主要的处理类
3.2 按照客户produer设置的ack级别,处理如何返回客户端
0,不做任何返回,直接wake处理之后的请求
1,获取leader的result,并返回
-1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息
如果未满足且没有错误信息,则设置watcher
如果超时则放入到delay操作的队列中
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { ...... val sTime = SystemTime.milliseconds //将数据插入到本地log(默认本地为leader) val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) //获得结果,是否有错误信息(throw error) val numPartitionsInError = localProduceResults.count(_.error.isDefined) if(produceRequest.requiredAcks == 0) { //当acks基本为0,则无需任务响应,直接返回执行成功 // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since // no response is expected by the producer the handler will send a close connection response to the socket server // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata if (numPartitionsInError != 0) { info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) } else { if (firstErrorCode == ErrorMapping.NoError) offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) //offset,producer两种请求 if (offsetCommitRequestOpt.isDefined) { val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else requestChannel.noOperation(request.processor, request) } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { //需要leader确认请求,才返回执行成功 if (firstErrorCode == ErrorMapping.NoError) { //offsetsCache 更新offsetmanager的offset内存 offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) } val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) //返回处理之后的response,包含produceresult信息ProducerResponseStatus requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else { //需要所有replica都受到请求,才返回成功 // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, produceRequest.ackTimeoutMs.toLong, produceRequest, statuses, offsetCommitRequestOpt) //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能) // add the produce request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) if (satisfiedByMe) producerRequestPurgatory.respond(delayedRequest) } }appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicAndPartition, brokerId)) }
3.1
appendMessagesToLeader
...... //检查isr之后,进行真正往log里写的方法 val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应 replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较, maybeIncrementLeaderHW(leaderReplica) ......
3.1.1.log.append方法:
检查是否需要segment生成新文件,数据入segment,更新lastoffset
// maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush val segment = maybeRoll(validMessages.sizeInBytes) // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里; // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的) segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1)
segment.append方法:
数据入file channel流里,判断是否如index中
def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引 index.append(offset, log.sizeInBytes()) this.bytesSinceLastIndexEntry = 0 } // append the messages log.append(messages) this.bytesSinceLastIndexEntry += messages.sizeInBytes } }
3.1.2 unblockDelayedFetchRequests
检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
def unblockDelayedFetchRequests(key: TopicAndPartition) { val satisfied = fetchRequestPurgatory.update(key) debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size)) // send any newly unblocked responses satisfied.foreach(fetchRequestPurgatory.respond(_)) }
3.1.3 maybeIncrementLeaderHW
private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) //message offset相减,获得最小的offset(最迟更新的) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed val requestKey = new TopicAndPartition(this.topic, this.partitionId) replicaManager.unblockDelayedFetchRequests(requestKey) replicaManager.unblockDelayedProduceRequests(requestKey) } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } }
相关推荐
3/kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接 4/kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被...
./kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic ``` 在打开的控制台输入消息,按回车发送。 - 使用`kafka-console-consumer.sh`消费消息: ``` ./kafka-console-consumer.sh --...
使用`bin/kafka-topics.sh`命令可以创建主题,`bin/kafka-console-producer.sh`可以作为生产者发送消息,`bin/kafka-console-consumer.sh`则可以作为消费者接收并打印消息。 在实际应用中,Kafka常与其他大数据工具...
首先,确保已安装Kafka环境,并配置好服务端。Kafka服务器通常包括Zookeeper和Kafka Broker,它们分别负责元数据管理和实际的消息存储与分发。安装完成后,需要创建主题(Topic),这是Kafka中消息的基本单位。 接...
- 对于生产者,创建一个`FlinkKafkaProducer`实例,同样需要包含Kerberos配置。 - 生产者需要在发送消息前完成Kerberos认证。 5. **处理票证刷新** - Kerberos票证有有效期,需要定期刷新。在Java中,可以通过`...
Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...
- **server.properties**:Kafka服务端的主要配置文件,包括broker.id、log.dirs、num.partitions等关键参数。 - **client.properties**:客户端连接服务器时使用的配置文件,包含bootstrap.servers等参数。 ####...
除此之外,还可以使用YammerMetrics来报告服务端和客户端的性能指标,或使用Kafka Manager来查看整个集群的性能指标。 性能测试脚本可以用于对Kafka集群的写入性能进行压测,例如使用命令行工具./kafka-producer-...
在本文中,我们将深入探讨Kafka的基础代码实现,包括客户端和服务端的交互。 **1. Kafka架构** Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和 broker。生产者负责发布消息到主题,消费者...
3. **KafkaServer**:Kafka服务端,处理生产者和消费者的请求,包括Partition管理和Replica管理。 4. **ZookeeperIntegration**:与Zookeeper的交互,存储主题、分区和副本的元数据。 5. **MessageFormat**:消息...
1. Producer(生产者):消息的发布者,负责把数据发送给Broker。 2. Broker(服务端):负责接收、存储和处理消息。一个Kafka集群包含多个Broker。 3. Consumer(消费者):消息的订阅者,负责消费消息。 4. ...
生产者(Producer)负责向Kafka的主题中发布消息,而消费者(Consumer)订阅并消费主题中的消息。Kafka集群由一个或多个代理(Broker)组成,每个代理都是一个单独的服务端。生产者通过TCP协议发送消息给Kafka集群,...
- **代理(Broker)**:Kafka集群由一个或多个服务端组成,每个服务端称为代理,它们共同维护主题和分区的日志。 **1.3 主题与日志** - **主题**:每个主题包含多个分区,每个分区都维护了一个有序的消息序列,并且...
Kafka 的架构主要包括 **生产者(Producer)**、**Broker** 和 **消费者(Consumer)**: - **生产者**:使用用户主线程封装消息到 ProducerRecord,经过序列化后,由 Partitioner 决定目标分区并放入内存缓冲区。...
而"eureka-server-single"可能是一个Eureka服务注册与发现服务器,帮助系统中的各个组件(如Log4j2的客户端和服务端)找到彼此,确保日志数据能正确路由到Kafka。 集成Log4j2和Kafka的优势在于: 1. 实时性:日志...
这需要引入Apache Kafka的Java客户端库,并创建一个Producer实例: ```java Properties props = new Properties(); props.put("bootstrap.servers", "kafka服务器地址:端口"); props.put("key.serializer", "org....
Kafka的核心组件包括Broker、Topic、Producer和Consumer。Broker是Kafka集群中的处理节点,多个Broker可以构成一个集群,提供高可用性和可扩展性。Topic是消息的分类,每条消息都需指定一个Topic。Producer是消息的...
1. **生产者(Producer)**:客户端或任何产生消息的组件,负责将消息放入消息队列。 2. **消费者(Consumer)**:服务端或其他处理消息的组件,从消息队列中取出并处理消息。 3. **消息队列(Message Queue)**:...
在本系统中,SpringBoot应用通过Kafka的Producer API发送日志到预设的主题(Topic),这些主题可以代表不同的日志类型或者服务。Kafka的Broker集群接收到这些日志后,将它们分发到各个消费者。另一方面,消费者端...