SocketServer是kafka nio,包含一个accept线程,接受socket连接,并把连接(平均)放入processors中,多个processor线程接受nio的处理请求和相应
processor请求只是将request放入requestchannel queue中(由KafkaRequestHandlerPool中handler完成)
processor响应是在requestchannel上注册对应的processor,processor将response发送给client
/** * Start the socket server */ def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) //processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses // 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections //接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应 this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") }
获得请求,放入queue中,等待handler处理
/* * Process reads from ready sockets */ def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req)//放入queue中,等待handler处理 key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } }
响应请求
/* * Process writes to ready sockets */ def write(key: SelectionKey) { val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel)//将response发送相应给client trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } }
相关推荐
本资源"java_kafka交互工具类.rar"提供了一套Java集成Kafka的工具类,帮助开发者便捷地进行单机或集群环境下的Kafka操作。以下将详细介绍其中可能包含的关键知识点: 1. **Kafka基本概念** - **主题(Topic)**:...
SpringBoot整合Kafka配置类方式
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
socketserver-kafka用Java netty 实现简单的socket 通讯,消费kafka消息队列appserver 目录是netty的socket监听启动。ServiceOrderConsumerAPI.java 是kafka的主要消费程序。程序写的很简单。只是做个小演示希望各位...
**Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...
**Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...
Kafka Manager是一款强大的开源工具,专门用于管理Apache Kafka集群。这款工具由Yahoo!开发,旨在提供一个用户友好的界面,使Kafka集群的管理和监控变得更加简单。在本压缩包中,你得到了预编译的Kafka Manager版本...
3. **创建消费者配置类**:创建一个配置类,使用`@Configuration`和`@EnableKafka`注解启用Kafka消费者。这里可以定义消费者的属性,如key和value的序列化方式。 ```java @Configuration @EnableKafka public class...
**Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...
在Java中,我们可以创建一个Producer实例,配置相关的生产者属性(如bootstrap servers、key-value序列化类等),然后使用`send()`方法将消息发送到指定的主题。 【Kafka Consumer】 Kafka消费者则用于订阅和消费...
2. **创建消费者**:使用`KafkaConsumer`类,可以选择group_id(消费组),设置auto_offset_reset(自动偏移重置策略)等参数。消费者可以通过`assign()`方法手动指定要消费的分区,或通过`subscribe()`方法订阅主题...
在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....
本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...
**Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...
Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...
**Kafka Tool for Linux: 管理与使用Apache Kafka集群的高效工具** Apache Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用。Kafka Tool是针对Kafka集群进行管理和操作的一款图形用户界面(GUI)工具...
**Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
在这个“kafka封装 订阅和发布类”的主题中,我们将深入探讨如何对Kafka的订阅和发布功能进行简单的封装,以便更方便地处理数据。 首先,我们需要理解Kafka的核心概念:生产者(Producer)和消费者(Consumer)。...
2. **Kafka可视化工具**:压缩包中的“kafka工具”可能包含了像Kafka Manager、Confluent Control Center或者Kafka Tool之类的可视化工具。例如,Kafka Manager提供了一个web界面,可以清晰地展示Kafka集群的状态,...