前面我们对应用层的一些原理进行了特别的源码跟踪分析,今天我们就深入到网络层,看下producer端的sender线程是如何把一个消息发送到server端的。
Sender实现了Runnable接口,最后被一个IOThread启动,核心的逻辑是在一个void run(long now)方法中去实现的,具体的代码解释之前的博客中有,这里就不一一介绍了,这里种点介绍发送的最后两个流程,即send和poll,具体的代码行为
for (ClientRequest request : requests)
client.send(request, now);
this.client.poll(pollTimeout, now);
此处的client对象为NetworkClient,实现的是KafkaClient,我们先来看下KafkaClient定义的几个核心的接口
public void send(ClientRequest request, long now);
public List<ClientResponse> poll(long timeout, long now);
send主要是进行ClientRequest的发送的,poll是做具体的IO的
接下来我们看
NetworkClient的实现
@Override
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
send方法主要做了三件事情:1。检查node是否ready;2。把当前请求添加到队列中;3。调用Selector接口的send方法进行发送;此处可以发现,KafkaClient中的发送并不是具体的send实现,具体的send还是依赖的是network层的Selector实现的。
那我们再来看下
Selector的结构,Selector实现了Selectable,Selectable定义的核心方法摘要如下
/**
* Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
* @param send The request to send
*/
public void send(Send send);
/**
* Do I/O. Reads, writes, connection establishment, etc.
* @param timeout The amount of time to block if there is nothing to do
* @throws IOException
*/
public void poll(long timeout) throws IOException;
可以发现定义的方法和NetworkClient的方法是一一对应的,也就是说他们是不同分层中对同一个方法的实现,而且很明显的是KafkaClient是依赖于Selectable的实现的;
那我们再来看下Selectable的实现类Selector,看下对send方法的实现:
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
其实就是获取了当前请求的Node的通道-Channel,然后把Channel中的Send设置一个值,并没有实际的IO操作,如建联,读写,释放等,这让我们想到了接口中定义的另外一个方法,没错,就是poll,poll方法才是做实际的IO操作的,和接口中申明的方法一致。
再回到NetworkClient的poll方法中
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
主要也是做了三件事。1.更新metadata信息;2.调用Selector的send方法进行IO;3.对已经完成的请求进行处理;4. 调用回调函数
看到这里终于看到了metadata相关的实际更新操作,之前在看的时候一只没看到和metadata相关的具体的读写是在哪儿进行的,终于在这里发现了
。
接下来我们还是关注Selector的send方法,先上代码:
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
首先获取准备好的事件
int readyKeys = select(timeout);
这里主要调用的NIO的Selector的select方法
如果有准备好的事件
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
在看下具体的pollSelectionKeys方法
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
这里边做的操作就是传统的NIO对SocketChannel的操作模型,获取key类型,然后获取通道,针对不同的key类型做不同的逻辑处理。
但是请注意:此时,我们还没有将数据推到网络中,看代码
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
此时我们只是将数据写入到KafkaChannel中,在看KafkaChannel中的处理
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
关键看这句
send.writeTo(transportLayer);
调用send方法把数据写入到TransportLayer层(传输层)
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
// This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
// Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
// GatheringByteChannel or ScatteringByteChannel.
if (channel instanceof TransportLayer)
pending = ((TransportLayer) channel).hasPendingWrites();
return written;
}
底层还是调用了Channel的write方法,Kafka实现了自己的GatheringByteChannel,最后调用的还是PlaintextTransportLayer的write方法,看下PlaintextTransportLayer的数据结构,先看下构造方法
public PlaintextTransportLayer(SelectionKey key) throws IOException {
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
}
...
@Override
public int write(ByteBuffer src) throws IOException {
return socketChannel.write(src);
}
可以看到实际调用的就是NIO的SocketChannel的write方法,实现数据从内存到网卡的写入。
至此一个写的分析就基本完成的。
建连和读取基本类似,这里就不扒代码了。
分享到:
相关推荐
2. **解压插件**:下载`pentaho-kafka-producer.zip`文件后,使用解压缩工具将其解压。这将包含插件的各类文件,如Java类、配置文件等。 3. **复制到steps文件夹**:将解压后的所有文件和文件夹复制到`安装目录>/...
《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够构建基于Kafka的应用程序。在2.*版本中,Kafka-clients进行了多方面的优化和改进,提升...
拥有Kafka-clients的源码意味着你可以深入理解内部工作原理,例如消息序列化、分区逻辑、重试策略等。这对于优化性能、排查问题和定制功能非常有用。 6. **依赖管理**: 在Java项目中,通常使用Maven或Gradle来...
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
在标题"pentaho-kafka-consumer.zip"中,我们看到的是一个专门为Pentaho Kettle定制的Kafka消费者插件的压缩包。 这个压缩包的描述提到了如何在Pentaho环境中安装和使用这个插件。首先,你需要在你的Pentaho Kettle...
在`kafka-python`中,可以通过创建一个`KafkaProducer`实例并调用其`send()`方法来发送消息: ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') ...
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
【Kafka-Manager 1.3.3.21 知识点详解】 Kafka-Manager 是一个基于 Apache Kafka 的开源管理工具,由 XebiaLabs 开发,旨在简化 Kafka 集群的管理和监控。它提供了直观的 Web UI,帮助用户轻松地管理 Kafka 集群,...
赠送jar包:flink-connector-kafka-0.10_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.10_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.10_2.11-1.10.0-sources.jar; 赠送Maven...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
Kafka-Manager是一款开源的Apache Kafka管理工具,由Xiaomi公司开发并维护。它提供了一个用户友好的界面,使得Kafka集群的管理和监控变得更加直观和便捷。在本压缩包中,你获得的是kafka-manager的最新版本——1.3....
赠送jar包:flink-connector-kafka-base_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-base_2.11-1.10.0-sources.jar; 赠送Maven...
spring-kafka-producer.xml
在Node.js环境中,`kafka-node`库是一个用于与Apache Kafka进行交互的客户端库,它提供了生产者和消费者的功能,使我们能够轻松地在Node.js应用程序中发送和接收消息。以下将详细介绍如何使用`kafka-node`库进行消息...
说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...