`
fish_no7
  • 浏览: 27968 次
文章分类
社区版块
存档分类
最新评论

Kafka-producer端-network层消息发送的源码探究

阅读更多
前面我们对应用层的一些原理进行了特别的源码跟踪分析,今天我们就深入到网络层,看下producer端的sender线程是如何把一个消息发送到server端的。

    Sender实现了Runnable接口,最后被一个IOThread启动,核心的逻辑是在一个void run(long now)方法中去实现的,具体的代码解释之前的博客中有,这里就不一一介绍了,这里种点介绍发送的最后两个流程,即send和poll,具体的代码行为
for (ClientRequest request : requests)
     client.send(request, now);
this.client.poll(pollTimeout, now);

此处的client对象为NetworkClient,实现的是KafkaClient,我们先来看下KafkaClient定义的几个核心的接口
public void send(ClientRequest request, long now);
public List<ClientResponse> poll(long timeout, long now);

send主要是进行ClientRequest的发送的,poll是做具体的IO的
接下来我们看NetworkClient的实现
@Override
public void send(ClientRequest request, long now) {
    String nodeId = request.request().destination();
    if (!canSendRequest(nodeId))
        throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
    request.setSendTimeMs(now);
    this.inFlightRequests.add(request);
     selector.send(request.request());
}

send方法主要做了三件事情:1。检查node是否ready;2。把当前请求添加到队列中;3。调用Selector接口的send方法进行发送;此处可以发现,KafkaClient中的发送并不是具体的send实现,具体的send还是依赖的是network层的Selector实现的。
    那我们再来看下Selector的结构,Selector实现了Selectable,Selectable定义的核心方法摘要如下
/**
     * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
     * @param send The request to send
     */
    public void send(Send send);

    /**
     * Do I/O. Reads, writes, connection establishment, etc.
     * @param timeout The amount of time to block if there is nothing to do
     * @throws IOException
     */
    public void poll(long timeout) throws IOException;

可以发现定义的方法和NetworkClient的方法是一一对应的,也就是说他们是不同分层中对同一个方法的实现,而且很明显的是KafkaClient是依赖于Selectable的实现的;
那我们再来看下Selectable的实现类Selector,看下对send方法的实现:
public void send(Send send) {
    KafkaChannel channel = channelOrFail(send.destination());
    try {
        channel.setSend(send);
     } catch (CancelledKeyException e) {
        this.failedSends.add(send.destination());
        close(channel);
     }
}

其实就是获取了当前请求的Node的通道-Channel,然后把Channel中的Send设置一个值,并没有实际的IO操作,如建联,读写,释放等,这让我们想到了接口中定义的另外一个方法,没错,就是poll,poll方法才是做实际的IO操作的,和接口中申明的方法一致。
再回到NetworkClient的poll方法中
@Override
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

主要也是做了三件事。1.更新metadata信息;2.调用Selector的send方法进行IO;3.对已经完成的请求进行处理;4. 调用回调函数
看到这里终于看到了metadata相关的实际更新操作,之前在看的时候一只没看到和metadata相关的具体的读写是在哪儿进行的,终于在这里发现了  
接下来我们还是关注Selector的send方法,先上代码:
@Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }


首先获取准备好的事件
int readyKeys = select(timeout);

这里主要调用的NIO的Selector的select方法
如果有准备好的事件
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
}

在看下具体的pollSelectionKeys方法
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }

                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }


这里边做的操作就是传统的NIO对SocketChannel的操作模型,获取key类型,然后获取通道,针对不同的key类型做不同的逻辑处理。
但是请注意:此时,我们还没有将数据推到网络中,看代码
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
   Send send = channel.write();
   if (send != null) {
       this.completedSends.add(send);
       this.sensors.recordBytesSent(channel.id(), send.size());
   }
}

此时我们只是将数据写入到KafkaChannel中,在看KafkaChannel中的处理
public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }


关键看这句
        send.writeTo(transportLayer);

调用send方法把数据写入到TransportLayer层(传输层)
@Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
        // GatheringByteChannel or ScatteringByteChannel.
        if (channel instanceof TransportLayer)
            pending = ((TransportLayer) channel).hasPendingWrites();

        return written;
    }

底层还是调用了Channel的write方法,Kafka实现了自己的GatheringByteChannel,最后调用的还是PlaintextTransportLayer的write方法,看下PlaintextTransportLayer的数据结构,先看下构造方法
public PlaintextTransportLayer(SelectionKey key) throws IOException {
        this.key = key;
        this.socketChannel = (SocketChannel) key.channel();
    }
...
@Override
    public int write(ByteBuffer src) throws IOException {
        return socketChannel.write(src);
    }

可以看到实际调用的就是NIO的SocketChannel的write方法,实现数据从内存到网卡的写入。
至此一个写的分析就基本完成的。
    建连和读取基本类似,这里就不扒代码了。
分享到:
评论

相关推荐

    pentaho-kafka-producer.zip

    2. **解压插件**:下载`pentaho-kafka-producer.zip`文件后,使用解压缩工具将其解压。这将包含插件的各类文件,如Java类、配置文件等。 3. **复制到steps文件夹**:将解压后的所有文件和文件夹复制到`安装目录&gt;/...

    kafka-clients源码.zip

    《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够构建基于Kafka的应用程序。在2.*版本中,Kafka-clients进行了多方面的优化和改进,提升...

    java开发kafka-clients所需要的所有jar包以及源码

    拥有Kafka-clients的源码意味着你可以深入理解内部工作原理,例如消息序列化、分区逻辑、重试策略等。这对于优化性能、排查问题和定制功能非常有用。 6. **依赖管理**: 在Java项目中,通常使用Maven或Gradle来...

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    kafka-clients-2.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...

    kafka-clients-2.0.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    pentaho-kafka-consumer.zip

    在标题"pentaho-kafka-consumer.zip"中,我们看到的是一个专门为Pentaho Kettle定制的Kafka消费者插件的压缩包。 这个压缩包的描述提到了如何在Pentaho环境中安装和使用这个插件。首先,你需要在你的Pentaho Kettle...

    kafka-python-2.0.2.tar.gz

    在`kafka-python`中,可以通过创建一个`KafkaProducer`实例并调用其`send()`方法来发送消息: ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') ...

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka-manager-1.3.3.21

    【Kafka-Manager 1.3.3.21 知识点详解】 Kafka-Manager 是一个基于 Apache Kafka 的开源管理工具,由 XebiaLabs 开发,旨在简化 Kafka 集群的管理和监控。它提供了直观的 Web UI,帮助用户轻松地管理 Kafka 集群,...

    flink-connector-kafka-0.10-2.11-1.10.0-API文档-中文版.zip

    赠送jar包:flink-connector-kafka-0.10_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.10_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.10_2.11-1.10.0-sources.jar; 赠送Maven...

    kafka-clients-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...

    kafka-manager 最新版本 已经编译好,可直接使用

    Kafka-Manager是一款开源的Apache Kafka管理工具,由Xiaomi公司开发并维护。它提供了一个用户友好的界面,使得Kafka集群的管理和监控变得更加直观和便捷。在本压缩包中,你获得的是kafka-manager的最新版本——1.3....

    flink-connector-kafka-base-2.11-1.10.0-API文档-中文版.zip

    赠送jar包:flink-connector-kafka-base_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-base_2.11-1.10.0-sources.jar; 赠送Maven...

    spring-kafka-producer.xml

    spring-kafka-producer.xml

    nodejs kafka-node 消费消息,生产消息(csdn)————程序.pdf

    在Node.js环境中,`kafka-node`库是一个用于与Apache Kafka进行交互的客户端库,它提供了生产者和消费者的功能,使我们能够轻松地在Node.js应用程序中发送和接收消息。以下将详细介绍如何使用`kafka-node`库进行消息...

    kafka-manager-1.3.3.7.zip

    说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...

Global site tag (gtag.js) - Google Analytics