提示:个人理解并且只针对NIO模式
总的来说,包括1个“核心接口”和2个“核心接口帮助类”:
(1)ChannelFactory(NioClientSocketChannelFactory和 NioServerSocketChannelFactory)
作用:初始化boss,work线程
(2)Channels,DefaultChannelPipeline
作用:统一发送底层IO thread的 ChannelEvent事件给handler(上行+下行)或ChannelSink(下行),通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果
(3)ChannelSink(NioClientSocketChannelSlink和NioServerSocketChannelSlink)负责初始化客户端ClientBootstrap 的connect操作和服务端ServertBootstrap 的bind操作,以及下行通道消息的write。
细节:
1. 服务端读数据:采用轮询channel的方式读取数据。
关键代码:
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
bb.flip();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
}
if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
2.服务端写数据:先保存数据到缓存队列,然后同步写入数据到计算机内核:
关键代码:
(1)synchronized (channel.writeLock)
(2)localWrittenBytes = buf.transferTo(ch);
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel);
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
List<Throwable> causes = null;
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (writtenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
if (buf != null) {
buf.release();
}
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
buf = null;
//noinspection UnusedAssignment
evt = null;
if (future != null) {
future.setFailure(t);
}
if (iothread) {
if (causes == null) {
causes = new ArrayList<Throwable>(1);
}
causes.add(t);
} else {
fireExceptionCaughtLater(channel, t);
}
if (t instanceof IOException) {
open = false;
}
}
}
channel.inWriteNowLoop = false;
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}
if (causes != null) {
for (Throwable cause: causes) {
// notify about cause now as it was triggered in the write loop
fireExceptionCaught(channel, cause);
}
}
if (!open) {
// close the channel now
close(channel, succeededFuture(channel));
}
if (iothread) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
3. 服务端Handlers都是共享,读的时候没有并发问题,因为都是在IO线程中轮询读取数据,直到写入业务线程的队列中才返回,但是写的时候存在并发问题,因为写操作由业务线程执行,通常业务会使用线程池并行处理业务消息,这就意味着在某一个时刻会有多个业务线程同时操作ChannelHandler,我们需要对ChannelHandler进行并发保护,通常需要加锁。如果同步块的范围不当,可能会导致严重的性能瓶颈,这对开发者的技能要求非常高,降低了开发效率;
4. netty 3.10 写操作的并发问题指的是ChannelHandler的成员变量,因为:
{局部变量的数据存在于栈内存中。栈内存中的局部变量随着方法的消失而消失。
成员变量存储在堆中的对象里面,由垃圾回收器负责回收},我发现58同城那个RPC框架中httpHandler就存在这样的问题,估计他们都没有用到上传附件的功能所以一直没有修改这个bug。
5.引用李林峰的一句话:在Netty 3的时候,upstream是在I/O线程里执行的,而downstream是在业务线程里执行。当Netty从网络读取一个数据报投递给业务handler的时候,handler是在I/O线程里执行;而当我们在业务线程中调用write和writeAndFlush向网络发送消息的时候,handler是在业务线程里执行,直到最后一个Header handler将消息写入到发送队列中,业务线程才返回。
6. 业务线程只需要持有channel对象既可写入响应数据到channel的writeBufferQueue,这样在上层应用几乎只需要关注业务即可。
分享到:
相关推荐
`netty-3.2.5.Final-sources.jar` 提供了Netty框架的源代码,这对于开发者来说是非常有价值的,因为它允许他们深入理解框架的工作原理,方便调试、学习和定制。通过查看源码,开发者可以更好地掌握如何利用Netty的...
netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...
这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...
Netty 3.6.2.Final 稳定版本 含源码
总之,Netty-4.1.97.Final源码提供了丰富的学习资源,涵盖了网络编程的各个方面,对于提升Java程序员的专业技能具有重要作用。通过深入研究源码,你将能够更好地掌握Netty的工作原理,为你的项目带来更高效、更稳定...
赠送jar包:netty-3.7.0.Final.jar; 赠送原API文档:netty-3.7.0.Final-javadoc.jar; 赠送源代码:netty-3.7.0.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.7.0.Final.pom; 包含翻译后的API文档:netty-...
赠送jar包:netty-3.10.5.Final.jar; 赠送原API文档:netty-3.10.5.Final-javadoc.jar; 赠送源代码:netty-3.10.5.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.10.5.Final.pom; 包含翻译后的API文档:...
netty-3.2.7.final JAR包,netty-3.2.7.final JAR包,netty-3.2.7.final JAR包
Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
这个“netty-netty-4.1.69.Final.tar.gz”文件是Netty的最新稳定版本,版本号为4.1.69.Final,它是一个压缩包文件,通常包含源码、编译后的类库、文档和其他相关资源。 Netty的核心特点包括: 1. **异步事件驱动**...
赠送jar包:netty-3.9.9.Final.jar; 赠送原API文档:netty-3.9.9.Final-javadoc.jar; 赠送源代码:netty-3.9.9.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.9.9.Final.pom; 包含翻译后的API文档:netty-...
netty4.x开发所需jar包,版本号为:4.1.6-Final,亲测可用。
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"netty-4.1.16.Final.jar"是Netty框架的一个...总的来说,这个压缩包为使用和学习Netty提供了完整的资源。
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty的知识点...通过学习,开发者可以深入理解 Netty 的工作原理,提高网络应用的开发效率和性能。
Netty.4.1.29.Final 包含Jar包支持、Javadoc、Source。。
Netty (netty-netty-4.1.74.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...