- 浏览: 274183 次
- 性别:
- 来自: 杭州
最新评论
-
天然呆的爱死你了呢:
herman_liu76 写道3) Handler执 ...
NIO+reactor模式的网路服务器设计方案 -
天然呆的爱死你了呢:
很棒,顶!
NIO+reactor模式的网路服务器设计方案 -
萨琳娜啊:
Java读源码之Netty深入剖析网盘地址:https://p ...
netty4源码分析-connect -
kyorisvc2012:
EventLoopGroup和EventLoop的继承关系感觉 ...
netty4服务端启动源码分析-线程的创建 -
herman_liu76:
3) Handler执行 task,读取客户端的请求 ...
NIO+reactor模式的网路服务器设计方案
Netty的写操作由两个步骤组成:
- Write:将msg存储到ChannelOutboundBuffer中
- Flush:将msg从ChannelOutboundBuffer中flush到套接字的发送缓冲区中。
上一篇文章分析了write,本文接着分析第二步flush:
////DefaultChannelHandlerContext public ChannelHandlerContext flush() { final DefaultChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } executor.execute(task); } return this; } private void invokeFlush() { try { ((ChannelOutboundHandler) handler).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }
由于flush是Outbound事件,所以会调用headHandler的flush方法
//HeadHandler public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
HeadHandler调用abstractUnsafe的flush方法
//AbstractUnsafe public void flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
outboundBuffer 是之前第一步中msg所存储的地方,通过调用outboundBuffer.addFlush(),将outboundBuffer 的unflushed置为tail,这样本次等待flush的msg在buffer数组中的位置区间就为[flushed, unflushed)。
//ChannelOutboundBuffer void addFlush() { unflushed = tail; }
接下来的flush0方法将这个区间的msg写到套接字的发送缓冲区中。
//ChannelOutboundBuffer protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { outboundBuffer.failFlushed(t); if (t instanceof IOException) { close(voidPromise()); } } finally { inFlush0 = false; } }
主要逻辑在doWrite方法里
//NioSocketChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { // Do non-gathering write for a single buffer case. final int msgCount = in.size(); if (msgCount <= 1) { super.doWrite(in); return; } // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); if (nioBuffers == null) { super.doWrite(in); return; } int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { in.remove(); } // Finish the write loop if no new messages were flushed by in.remove(). if (in.isEmpty()) { clearOpWrite(); break; } } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. for (int i = msgCount; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current(); final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { in.progress(readableBytes); in.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes); in.progress(writtenBytes); break; } else { // readableBytes == writtenBytes in.progress(readableBytes); in.remove(); break; } } incompleteWrite(setOpWrite); break; } } }
逻辑如下
- 如果ChannelOutboundBuffer的size<=1,即其中存储的待发送的msg只占用buffer数组中的一个entry(buffer是一个Entry[]数组,参见上一篇文章),则不需要采用gathering write的方式,可以直接调用父类AbstractNioByteChannel的doWrite方法。
- 如果size>1,即其中存储的待发送的msg占用buffer数组中的至少两个entry,则通过调用in.nioBuffers()方法对ChannelOutboundBuffer的Entry[]数组变量buffer进行转换:将每个Entry元素中存储的msg由io.netty.buffer.ByteBuf类型转换为java.nio.ByteBuffer类型。最终得到ByteBuffer[]数组,并赋给变量buffers。
- 如果转换后得到的数组为空,即msg不是ByteBuf类型,则也不需要采用gathering write的方式,可以直接调用父类AbstractNioByteChannel的doWrite方法。
- 否则,执行gathering write方法。
a) 首先分析super.doWrite(in)方法,即父类AbstractNioByteChannel的doWrite方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); continue; } if (!buf.isDirect()) { ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. // We can do a better job by using our pooled allocator. If the current allocator does not // pool a direct buffer, we rely on JDK's direct buffer pool. buf = alloc.directBuffer(readableBytes).writeBytes(buf); in.current(buf); } } boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break; } } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } } }
protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } } protected final void setOpWrite() { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
说明:
- 如果当前msg为空,即buffer[flushed]存储的msg为空,则说明所有msg已经发送完毕,所以需要清除selectionKey中的OP_WRITE位。
- 目前Netty仅支持两种类型(ByteBuf和FileRegion)的写操作,本文只对ByteBuf类型进行分析。
-
首先调用buf.readableBytes()判断buf中是否有可读的消息,即writerIndex – readerIndex>0。如果结果为0,则执行in.remove方法;否则,采用类似于自旋锁的逻辑对buf执行write操作。
//NioSocketChannel protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); return writtenBytes; } //UnpooledHeapByteBuf public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; return readBytes; } private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = ByteBuffer.wrap(array); } return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length)); }
由于是非阻塞IO,所以最终写到发送缓冲区中的字节数writtenBytes可能会小于buf中期望写出的字节数expectedWrittenBytes。如果此时不再写,而是依赖selector的异步通知,则会导致buf里剩下的数据不能及时写出去(因为必须等到selector的下一次循环,即必须将本次循环中通知的未处理完的所有事件处理完后,以及剩下的task执行完后,然后再执行一次select,才能处理到这个channel的write事件;在这个过程中,还有可能会发生线程的上下文切换。这样,就会导致msg写到ChannelOutBoundBuffer后,会经历较大的延迟才能将消息flush到套接字的发送缓冲区。
Netty采用类似于自旋锁的逻辑,在一个循环内,多次调用write。这样就有可能将buf中的所有数据在一次flush调用中写完。循环的次数值为writeSpinCount,其默认值为16。
但是如果一次write调用返回0,则说明发送缓冲区已经完全没有空间了,如果还继续调用write,而系统调用开销是比较大的,所以是一种浪费,此种情况可以退出循环,设置selectionKey的OP_WRITE位,以依赖selector的异步通知。
如果在自旋期间多次调用write后,数据还是没有写完,而每次write调用的返回又不是0,说明每次的write确实写出去了一些字节,这种情况也不能立即退出flush再依赖selector的异步通知,因为有可能是自旋锁的循环次数设置小了导致buf的数据没有发送完,但实际发送缓冲区还是有空间的。因此将剩下数据的写作为一个异步任务放到当前线程的任务队列中,等待调度执行。这样当本次循环中选择的剩下的所有事件处理完后,就可以执行这个任务了,而不用等到由下次的selector唤醒。
- 如果msg已全部写完毕,则执行in.remove()方法进行清理
//ChannelOutBoundBuffer public boolean remove() { if (isEmpty()) { return false; } Entry e = buffer[flushed]; Object msg = e.msg; if (msg == null) { return false; } ChannelPromise promise = e.promise; int size = e.pendingSize; e.clear(); flushed = flushed + 1 & buffer.length - 1; safeRelease(msg); promise.trySuccess(); decrementPendingOutboundBytes(size); return true; }
首先对buffer[flushed]对应的Entry执行clear操作
//Entry public void clear() { buffers = null; buf = null; msg = null; promise = null; progress = 0; total = 0; pendingSize = 0; count = -1; }
然后将flunshed累加1,接着对msg执行基于引用计数的release操作,最后看一下decrementPendingOutboundBytes方法的实现
void decrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; if (size == 0 || channel == null) { return; } long oldValue = totalPendingSize; long newWriteBufferSize = oldValue - size; while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) { oldValue = totalPendingSize; newWriteBufferSize = oldValue - size; } int lowWaterMark = channel.config().getWriteBufferLowWaterMark(); if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) { channel.pipeline().fireChannelWritabilityChanged(); } } }
更新ChannelOutBoundBuffer中待发送的msg大小totalPendingSize,并判断如果totalPendingSize小于channel的低水位线,则设置channel为可写,并触发ChannelWritabilityChanged事件。
b)最后,分析gathering write逻辑:
//NioSocketChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { //之前已介绍非gathering write的逻辑,所以此处省略相关代码 int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { in.remove(); } // Finish the write loop if no new messages were flushed by in.remove(). if (in.isEmpty()) { clearOpWrite(); break; } } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. for (int i = msgCount; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current(); final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { in.progress(readableBytes); in.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes); in.progress(writtenBytes); break; } else { // readableBytes == writtenBytes in.progress(readableBytes); in.remove(); break; } } setOpWrite(); break; } } }
说明:
- 同样采用类似于自旋锁的方式执行gathering write
- 如果所有的msg都已flush到发送缓冲区中,则对这些msg执行release;如果ChannelOutBoundBuffer的isEmpty方法返回true(即执行flush期间,没有并发执行write操作导致ChannelOutBoundBuffer中新增待发送的msg,保持了unflushed不变。那么此种情况下,unflushed=flushed),则清除selectionKey的OP_WRITE位;
- 如果还有msg未flush到发送缓冲区中,则按照flushed->unflushed的顺序对每一个ByteBuf进行处理,如果ByteBuf全部flush完,则进行release,否则仅仅更新该ByteBuf的readerIndex
- incompleteWrite(setOpWrite)的作用之前已介绍过:如果最后一次write调用返回0,则说明发送缓冲区已经完全没有空间了,此种情况可以退出循环,设置selectionKey的OP_WRITE位,以依赖selector的异步通知。如果多次调用write后,数据还是没有写完,则将剩下数据的写作为一个异步任务放到当前线程的任务队列中,等待调度执行。
发表评论
-
支持连接池的netty client核心功能实现剖析
2015-01-28 17:15 15395支持连接池的netty client核心功能实现剖析 ... -
基于echo例子的netty4通信总结
2013-11-08 17:48 12476本文为原创,转载请注明出处 基于echo例子的netty ... -
Netty4源码分析- read
2013-11-07 19:06 5916本文为原创,转载请注明出处 Netty4源码分析- re ... -
netty 4源码分析-write
2013-11-05 11:00 8261本文为原创,转载请注明出处 netty 4源码分析-w ... -
netty4源码分析-accept
2013-09-13 10:56 6370本文为原创,转载请注明出处 netty4源码分析-acc ... -
netty4源码分析-connect
2013-09-06 15:06 10441本文为原创,转载请注明出处 netty4源码分析-con ... -
Netty4源码分析-NioEventLoop实现的线程运行逻辑
2013-08-21 16:42 17880Netty4源码分析-NioEventLoop实现的线程运 ... -
netty4服务端启动源码分析-线程的创建
2013-08-15 16:14 19497本文为原创,转载请注明出处 netty4服务端启动源码分 ... -
netty4源码分析-bind
2013-08-13 19:15 6483本文为原创,转载请注明出处 netty4源码分析-bin ... -
netty4源码分析-socket
2013-08-12 15:00 8716本文为原创,转载请注明出处 netty4源码分析-soc ... -
netty 4.x源码分析
2013-08-12 14:59 15867netty 4.x源码分析 服务端需 ... -
NIO+reactor模式的网路服务器设计方案
2012-12-16 16:59 6374NIO+reactor 模式的网路服务器设计方案 ... -
BlockingIO +thread-per-connection的网络服务器设计方案
2012-12-16 16:01 3509BlockingIO +thread-per-conne ...
相关推荐
赠送jar包:transport-netty4-client-5.5.1.jar; 赠送原API文档:transport-netty4-client-5.5.1-javadoc.jar; 赠送源代码:transport-netty4-client-5.5.1-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:transport-netty4-client-6.3.0.jar; 赠送原API文档:transport-netty4-client-6.3.0-javadoc.jar; 赠送源代码:transport-netty4-client-6.3.0-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:transport-netty4-client-5.5.1.jar; 赠送原API文档:transport-netty4-client-5.5.1-javadoc.jar; 赠送源代码:transport-netty4-client-5.5.1-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:transport-netty4-client-6.3.0.jar; 赠送原API文档:transport-netty4-client-6.3.0-javadoc.jar; 赠送源代码:transport-netty4-client-6.3.0-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
netty-socketio-netty-socketio-2.0.6 ,Socket.IO 是一个库,可以在客户端和服务器之间实现低延迟, 双向和基于事件的通信:netty-socketio-netty-socketio-2.0.6.tar.gznetty-socketio-netty-socketio-2.0.6.zip
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:reactor-netty-http-1.0.11.jar; 赠送原API文档:reactor-netty-http-1.0.11-javadoc.jar; 赠送源代码:reactor-netty-http-1.0.11-sources.jar; 赠送Maven依赖信息文件:reactor-netty-...
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:transport-netty4-client-6.2.3.jar; 赠送原API文档:transport-netty4-client-6.2.3-javadoc.jar; 赠送源代码:transport-netty4-client-6.2.3-sources.jar; 赠送Maven依赖信息文件:transport-netty...