本文为原创,转载请注明出处
Netty4源码分析- read
当selector检测到OP_READ事件时,触发read操作:
//NioEventLoop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } }
Read方法由NioByteUnsafe实现:
public void read() { assert eventLoop().inEventLoop(); final SelectionKey key = selectionKey(); final ChannelConfig config = config(); if (!config.isAutoRead()) { int interestOps = key.interestOps(); if ((interestOps & readInterestOp) != 0) { // only remove readInterestOp if needed key.interestOps(interestOps & ~readInterestOp); } } final ChannelPipeline pipeline = pipeline(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); boolean closed = false; Throwable exception = null; ByteBuf byteBuf = null; int messages = 0; try { for (;;) { byteBuf = allocHandle.allocate(allocator); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount == 0) { byteBuf.release(); byteBuf = null; break; } if (localReadAmount < 0) { closed = true; byteBuf.release(); byteBuf = null; break; } pipeline.fireChannelRead(byteBuf); allocHandle.record(localReadAmount); byteBuf = null; if (++ messages == maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } finally { if (byteBuf != null) { if (byteBuf.isReadable()) { pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } } pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { closed = true; } pipeline().fireExceptionCaught(exception); } if (closed) { setInputShutdown(); if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { key.interestOps(key.interestOps() & ~readInterestOp); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); } } } } } }
一、首先分析this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle()
config.getRecvByteBufAllocator()返回RecvByteBufAllocator,可以取名为接受缓存分配器。该分配器用于为channel分配receive buffers以存储随后读取的字节。默认返回的分配器类型是自适应缓存分配器AdaptiveRecvByteBufAllocator,它能根据前一次实际读取的字节数量,自适应调整当前缓存分配的大小,以防止缓存分配过多或过少。其结构如下:
其中:
- SIZE_TABLE:为固定的静态数组,按照从小到大的顺序预先存储可以分配的缓存大小。最小的为16,然后每次累加16,直到496。然后从512开始,每次向走位移1(即放大两倍),直到int发生溢出。本机的最大值为1073741824,所以数组的size为52。
- MinIndex和maxIndex为最小缓存(64)和最大缓存(65536)在SIZE_TABLE中对应的下标。分别为3和38
- 第一次分配缓存时,由于没有上一次的实际接收到的字节数做参考,因此需要给出初始值,由Initial指定,默认值为1024
- INDEX_INCREMENT:上次预估缓存偏小时,下次Index的递增值。默认为4;
- INDEX_DECREMENT:上次预估缓存偏大时,下次Index的递减值。默认为1
//AdaptiveRecvByteBufAllocator private static final class HandleImpl implements Handle { private final int minIndex; private final int maxIndex; private int index; private int nextReceiveBufferSize; private boolean decreaseNow; HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; } @Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(nextReceiveBufferSize); } @Override public int guess() { return nextReceiveBufferSize; } @Override public void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } } private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
- nextReceiveBufferSize记录下次分配缓存时应该分配的大小,即index下标在数组SIZE_TABLE中的值。
- ByteBuf allocate(ByteBufAllocator alloc)方法则根据上次预估的字节大小nextReceiveBufferSize分配缓存。
- Index记录nextReceiveBufferSize在数组SIZE_TABLE中的索引值:
相关推荐
赠送jar包:transport-netty4-client-5.5.1.jar; 赠送原API文档:transport-netty4-client-5.5.1-javadoc.jar; 赠送源代码:transport-netty4-client-5.5.1-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:transport-netty4-client-6.3.0.jar; 赠送原API文档:transport-netty4-client-6.3.0-javadoc.jar; 赠送源代码:transport-netty4-client-6.3.0-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:transport-netty4-client-5.5.1.jar; 赠送原API文档:transport-netty4-client-5.5.1-javadoc.jar; 赠送源代码:transport-netty4-client-5.5.1-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....
netty-socketio-netty-socketio-2.0.6 ,Socket.IO 是一个库,可以在客户端和服务器之间实现低延迟, 双向和基于事件的通信:netty-socketio-netty-socketio-2.0.6.tar.gznetty-socketio-netty-socketio-2.0.6.zip
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:transport-netty4-client-6.3.0.jar; 赠送原API文档:transport-netty4-client-6.3.0-javadoc.jar; 赠送源代码:transport-netty4-client-6.3.0-sources.jar; 赠送Maven依赖信息文件:transport-netty...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-transport-rxtx-4.1.74.Final.jar; 赠送原API文档:netty-transport-rxtx-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-rxtx-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:...
netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...