`

kafka SocketServer类

阅读更多

 

 

SocketServer是kafka nio,包含一个accept线程,接受socket连接,并把连接(平均)放入processors中,多个processor线程接受nio的处理请求和相应

processor请求只是将request放入requestchannel queue中(由KafkaRequestHandlerPool中handler完成)

processor响应是在requestchannel上注册对应的processor,processor将response发送给client

 

/**
   * Start the socket server
   */
  def startup() {
    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    for(i <- 0 until numProcessorThreads) {
      processors(i) = new Processor(i, 
                                    time, 
                                    maxRequestSize, 
                                    aggregateIdleMeter,
                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                    numProcessorThreads, 
                                    requestChannel,
                                    quotas,
                                    connectionsMaxIdleMs)
      //processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write
      Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
    }

    newGauge("ResponsesBeingSent", new Gauge[Int] {
      def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
    })

    // register the processor threads for notification of responses
    // 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id
    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
   
    // start accepting connections
    //接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应
    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
    Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
    acceptor.awaitStartup
    info("Started")
  }

 

获得请求,放入queue中,等待handler处理

  /*
   * Process reads from ready sockets
   */
  def read(key: SelectionKey) {
    lruConnections.put(key, currentTimeNanos)
    val socketChannel = channelFor(key)
    var receive = key.attachment.asInstanceOf[Receive]
    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }
    val read = receive.readFrom(socketChannel)
    val address = socketChannel.socket.getRemoteSocketAddress();
    trace(read + " bytes read from " + address)
    if(read < 0) {
      close(key)
    } else if(receive.complete) {
      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)//放入queue中,等待handler处理
      key.attach(null)
      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {
      // more reading to be done
      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
      wakeup()
    }
  }

 

 响应请求

  /*
   * Process writes to ready sockets
   */
  def write(key: SelectionKey) {
    val socketChannel = channelFor(key)
    val response = key.attachment().asInstanceOf[RequestChannel.Response]
    val responseSend = response.responseSend
    if(responseSend == null)
      throw new IllegalStateException("Registered for write interest but no response attached to key.")
    val written = responseSend.writeTo(socketChannel)//将response发送相应给client
    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
    if(responseSend.complete) {
      response.request.updateRequestMetrics()
      key.attach(null)
      trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
    } else {
      trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_WRITE)
      wakeup()
    }
  }

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    socketserver-kafka:使用Netty和Kafka的TCPIP套接字服务器程序代码

    socketserver-kafka用Java netty 实现简单的socket 通讯,消费kafka消息队列appserver 目录是netty的socket监听启动。ServiceOrderConsumerAPI.java 是kafka的主要消费程序。程序写的很简单。只是做个小演示希望各位...

    kafka-2.6.0-src.tgz

    4. **协议与网络层**:Kafka使用自定义的TCP协议进行通信,客户端通过SocketSender和SocketServer进行数据传输。协议设计高效且易于扩展,支持批量发送和零拷贝。 5. **日志管理**:Kafka将消息存储在硬盘上的Log...

    深入理解Apache Kafka-初稿.pdf

    第八章Kafka网络层剖析对SocketServer源码进行了分析,以及Buffer包的封装。 第九章详细分析了Kafka中server的运作机制,包括健康检查、broker状态管理、offset管理以及checkpoint机制。 第十章对Kafka中log文件...

    很全面的kafka技术文档

    `Kafka.Network` 包包含了 Kafka 的网络通信组件,主要包括以下核心类: - **SocketServer**:负责监听来自客户端的连接请求。 - **RequestHandler**:处理每个客户端请求的具体逻辑。 - **NetworkProcessor**:...

    java8源码-kafka-1.1.1-sourcecode:1.1.1版本kafka的源代码注释

    SocketServer //kafka网络层的封装 |-- Acceptor //Acceptor线程的封装 |-- Processor //Processor线程的封装 Selector //对java selector的封装,封装了核心的poll,selectionkeys的遍历,事件的注册等操作 Kafka...

    kafkaI源码

    6. **网络通信**:Kafka使用Netty作为网络库,`kafka.network`包包含了请求处理器和SocketServer,它们负责接收和响应客户端请求。 7. **配置与集群协调**:`kafka.utils.ZkUtils`类提供了对ZooKeeper的操作,用于...

    KafkaHighAvailability(中)

    在这个过程中,Broker通过SocketServer接收和响应请求,使用Java NIO的Reactor模式实现高效网络通信。 总的来说,Kafka的高可用性机制是通过Controller的智能监控和决策,以及与Zookeeper和Brokers的有效协作来实现...

Global site tag (gtag.js) - Google Analytics