- 浏览: 996230 次
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
打算使用xmemcache作为memcache的客户端,由于x ...
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
netty 默认通道配置后续:http://donald-draper.iteye.com/blog/2393510
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:http://donald-draper.iteye.com/blog/2394619
netty Pooled字节buf分配器:http://donald-draper.iteye.com/blog/2394814
Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。
今天我们回到Socket通道,由于socket通道我们讲了好久,先来把Nio socket通道的父类抽象字节通道回顾一下:
从上面来看地址绑定,如果jdk >= 1.7 则使用socket通道绑定地址,否则委托通道内关联的socket。
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。
nio socket通道初始化,主要是创建socket通道,初始化socket通道配置NioSocketChannelConfig。地址绑定操作,如果jdk大于1.7 则socket通道直接绑定地址,否则委托通道内关联的socket。连接操作,直接委托给内部的socket通道连接操作。socket通道读取操作,实际委托给socket通道的read操作,从Socket通道读取数据,写到当前buf。写字节buf,实际委托给socket通道的写操作,从当前buf读取数据,写socket通道中。socket通道写文件region,委托给文件Region的转移数据操作transferTo,从文件Region读取数据,写到通道中。
写通道Outbound缓存区,首先从Outbound缓存区获取刷新链上的写请求对应的字节buf,然后委托给socket通道的写操作,发送数据,发送成功后,从刷新链上移除已经发送的写请求。关闭数据流,就是从事件循环反注册,即事件循环取消选择key,然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。关闭输出流与关闭输入流思路一致。关闭通道实际为关闭通道输入流和输出流。断开连接实际为close通道。
再来看将刷新链上的写请求消息,添加到nio buffer数组中:
从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,
主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,
这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
netty 默认通道配置后续:http://donald-draper.iteye.com/blog/2393510
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:http://donald-draper.iteye.com/blog/2394619
netty Pooled字节buf分配器:http://donald-draper.iteye.com/blog/2394814
Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。
今天我们回到Socket通道,由于socket通道我们讲了好久,先来把Nio socket通道的父类抽象字节通道回顾一下:
/** * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); //选择器提供者 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private final SocketChannelConfig config;//socket通道配置 /** * Create a new instance */ public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } /** * Create a new instance using the given {@link SelectorProvider}. */ public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } /** * Create a new instance using the given {@link SocketChannel}. */ public NioSocketChannel(SocketChannel socket) { this(null, socket); } /** * Create a new instance * * @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user * @param socket the {@link SocketChannel} which will be used */ public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } }
private static SocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise. *委托给选择提供者,打开一个socket通道 * See [url=https://github.com/netty/netty/issues/2308]#2308[/url]. */ return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } @Override protected void autoReadCleared() { clearReadPending(); } }
@Override protected void doBind(SocketAddress localAddress) throws Exception { doBind0(localAddress); } private void doBind0(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { //jdk >= 1.7 则使用socket通道绑定地址 SocketUtils.bind(javaChannel(), localAddress); } else { //否则委托通道内关联的socket SocketUtils.bind(javaChannel().socket(), localAddress); } }
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException { try { //在当前访问控制权限下,委托给socket通道绑定socket地址 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws IOException { socketChannel.bind(address); return null; } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } } public static void bind(final Socket socket, final SocketAddress bindpoint) throws IOException { try { AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws IOException { socket.bind(bindpoint); return null; } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
从上面来看地址绑定,如果jdk >= 1.7 则使用socket通道绑定地址,否则委托通道内关联的socket。
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { //本地地址不为空,则绑定本地socket地址 doBind0(localAddress); } boolean success = false; try { //否则委托SocketUtils boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { //如果连接操作没完成,则添加连接事件到选择key兴趣事件集 selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { //直接委托给socket通道 return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
@Override protected void doFinishConnect() throws Exception { //直接调用内部socket通道的完成连接方法 if (!javaChannel().finishConnect()) { throw new Error(); } }
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { //从unsafe获取字节buf分配器Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置需要尝试读字节数 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); //委托给字节buf,从Socket通道读取数据,写到当前buf return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; }
@Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); try { return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); } catch (ClosedChannelException ignored) { return -1; } }
@Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); //从当前buf读取数据,写socket通道中 return buf.readBytes(javaChannel(), expectedWrittenBytes); }
@Override public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; return readBytes; } private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = ByteBuffer.wrap(array); } return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length)); }
@Override protected long doWriteFileRegion(FileRegion region) throws Exception { final long position = region.transferred(); return region.transferTo(javaChannel(), position); }
public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class); private final File f;//关联文件 private final long position; private final long count; private long transferred; private FileChannel file;//关联文件通道 /** * Explicitly open the underlying file-descriptor if not done yet. 打开一个文件通道 */ public void open() throws IOException { if (!isOpen() && refCnt() > 0) { // Only open if this DefaultFileRegion was not released yet. file = new RandomAccessFile(f, "r").getChannel(); } } //转移文件region的内容到可写字节通道 @Override public long transferTo(WritableByteChannel target, long position) throws IOException { long count = this.count - position; if (count < 0 || position < 0) { throw new IllegalArgumentException( "position out of range: " + position + " (expected: 0 - " + (this.count - 1) + ')'); } if (count == 0) { return 0L; } if (refCnt() == 0) { throw new IllegalReferenceCountException(0); } // Call open to make sure fc is initialized. This is a no-oop if we called it before. //确保通道打开 open(); //从文件Region读取数据,写到通道中 long written = file.transferTo(this.position + position, count, target); if (written > 0) { transferred += written; } return written; } ... }
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size();//获取刷新写请求size if (size == 0) { // All written so clear OP_WRITE //如果所有写请求都已经刷新,则清除选择key兴趣事件集,移除写事件 clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. //获取Outbound缓冲中的刷新队列写请求对应的nio字节buf ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount();//nio字节buf数量 long expectedWrittenBytes = in.nioBufferSize();//需要些的字节数 SocketChannel ch = javaChannel();//获取关联通道 // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //委托给内部通道的写操作 final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //委托给内部通道的写操作 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. //根据已写字节数,从Outbound刷新队列中,移除已经刷新成功的写请求 in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }
@Override public ChannelFuture shutdownInput() { return shutdownInput(newPromise()); } @Override public ChannelFuture shutdownInput(final ChannelPromise promise) { //从事件循环反注册 Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); if (closeExecutor != null) {//如果关闭执行器不为空 closeExecutor.execute(new Runnable() { @Override public void run() { //执行实际关闭输入流工作 shutdownInput0(promise); } }); } else { //在当前事件循环中在,则直接执行,否则创建一个线程,完成实际关闭输入流工作 EventLoop loop = eventLoop(); if (loop.inEventLoop()) { shutdownInput0(promise); } else { loop.execute(new Runnable() { @Override public void run() { shutdownInput0(promise); } }); } } return promise; }
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
private final class NioSocketChannelUnsafe extends NioByteUnsafe { @Override protected Executor prepareToClose() { try { if (javaChannel().isOpen() && config().getSoLinger() > 0) { // We need to cancel this key of the channel so we may not end up in a eventloop spin // because we try to read or write until the actual close happens which may be later due // SO_LINGER handling. // See https://github.com/netty/netty/issues/4449 //从事件循环反注册 doDeregister(); return GlobalEventExecutor.INSTANCE; } } catch (Throwable ignore) { // Ignore the error as the underlying channel may be closed in the meantime and so // getSoLinger() may produce an exception. In this case we just return null. // See https://github.com/netty/netty/issues/4449 } return null; } }
@Override protected void doDeregister() throws Exception { //事件循环,取消选择key eventLoop().cancel(selectionKey()); }
private void shutdownInput0(final ChannelPromise promise) { try { shutdownInput0(); promise.setSuccess(); } catch (Throwable t) { promise.setFailure(t); } } private void shutdownInput0() throws Exception { if (PlatformDependent.javaVersion() >= 7) { //委托给socket通道 javaChannel().shutdownInput(); } else {//否则,委托给通道关联的socket javaChannel().socket().shutdownInput(); } }
然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。
@Override public ChannelFuture shutdownOutput() { return shutdownOutput(newPromise()); } @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); } }); } else { EventLoop loop = eventLoop(); if (loop.inEventLoop()) { shutdownOutput0(promise); } else { loop.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); } }); } } return promise; } private void shutdownOutput0(final ChannelPromise promise) { try { shutdownOutput0(); promise.setSuccess(); } catch (Throwable t) { promise.setFailure(t); } } private void shutdownOutput0() throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().shutdownOutput(); } else { javaChannel().socket().shutdownOutput(); } }
@Override public ChannelFuture shutdown() { return shutdown(newPromise()); } @Override public ChannelFuture shutdown(final ChannelPromise promise) { Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { shutdown0(promise); } }); } else { EventLoop loop = eventLoop(); if (loop.inEventLoop()) { shutdown0(promise); } else { loop.execute(new Runnable() { @Override public void run() { shutdown0(promise); } }); } } return promise; } private void shutdown0(final ChannelPromise promise) { Throwable cause = null; try { shutdownOutput0(); } catch (Throwable t) { cause = t; } try { shutdownInput0(); } catch (Throwable t) { if (cause == null) { promise.setFailure(t); } else { logger.debug("Exception suppressed because a previous exception occurred.", t); promise.setFailure(cause); } return; } if (cause == null) { promise.setSuccess(); } else { promise.setFailure(cause); } }
@Override protected void doDisconnect() throws Exception { doClose(); } @Override protected void doClose() throws Exception { super.doClose(); javaChannel().close(); }
@Override public ServerSocketChannel parent() { return (ServerSocketChannel) super.parent(); } @Override public SocketChannelConfig config() { return config; } @Override protected SocketChannel javaChannel() { return (SocketChannel) super.javaChannel(); } @Override public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); } @Override public boolean isOutputShutdown() { return javaChannel().socket().isOutputShutdown() || !isActive(); } @Override public boolean isInputShutdown() { return javaChannel().socket().isInputShutdown() || !isActive(); } @Override public boolean isShutdown() { Socket socket = javaChannel().socket(); return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive(); } @Override public InetSocketAddress localAddress() { return (InetSocketAddress) super.localAddress(); } @Override public InetSocketAddress remoteAddress() { return (InetSocketAddress) super.remoteAddress(); }
nio socket通道初始化,主要是创建socket通道,初始化socket通道配置NioSocketChannelConfig。地址绑定操作,如果jdk大于1.7 则socket通道直接绑定地址,否则委托通道内关联的socket。连接操作,直接委托给内部的socket通道连接操作。socket通道读取操作,实际委托给socket通道的read操作,从Socket通道读取数据,写到当前buf。写字节buf,实际委托给socket通道的写操作,从当前buf读取数据,写socket通道中。socket通道写文件region,委托给文件Region的转移数据操作transferTo,从文件Region读取数据,写到通道中。
写通道Outbound缓存区,首先从Outbound缓存区获取刷新链上的写请求对应的字节buf,然后委托给socket通道的写操作,发送数据,发送成功后,从刷新链上移除已经发送的写请求。关闭数据流,就是从事件循环反注册,即事件循环取消选择key,然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。关闭输出流与关闭输入流思路一致。关闭通道实际为关闭通道输入流和输出流。断开连接实际为close通道。
public final class ChannelOutboundBuffer { // Assuming a 64-bit JVM: // - 16 bytes object header // - 8 reference fields // - 2 long fields // - 2 int fields // - 1 boolean field // - padding //Entry buf 头部数据size static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); //通道Outbound buf 线程本地Buf,存放刷新链表中的写请求消息 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { return new ByteBuffer[1024]; } }; //buf 关联通道 private final Channel channel; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry;刷新写请求链的链头 // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry;//未刷新的写请求链的链头 // The Entry which represents the tail of the buffer private Entry tailEntry; // The number of flushed entries that are not written yet private int flushed;//刷新Entry链上待发送的写请求数 private int nioBufferCount;//当前待发送的消息buf数量 private long nioBufferSize;//当前待发送的所有消息buf的字节数 private boolean inFail;//是否刷新失败 //通道待发送的字节数 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); @SuppressWarnings("UnusedDeclaration") private volatile int unwritable;//通道写状态 //触发通道ChannelWritabilityChanged事件任务线程 private volatile Runnable fireChannelWritabilityChangedTask; ChannelOutboundBuffer(AbstractChannel channel) { this.channel = channel; } }
再来看将刷新链上的写请求消息,添加到nio buffer数组中:
/** * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned * array and the total number of readable bytes of the NIO buffers respectively. 将刷新链中的写请求对象消息放到nio buf数组中。#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度 和可读字节数 * * Note that the returned array is reused and thus should not escape * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. 返回的nio buf将会被 NioSocketChannel#doWrite方法重用 * */ public ByteBuffer[] nioBuffers() { long nioBufferSize = 0;//nio buf数组中的字节数 int nioBufferCount = 0;//nio buf数组长度 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //获取通道Outbound缓存区线程本地的niobuf数组 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; //遍历刷新链,链上的写请求Entry的消息必须为ByteBuf while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { //在写请求没有取消的情况下,获取写请求消息buf,及buf的读索引,和可读字节数 ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { if (Integer.MAX_VALUE - readableBytes < nioBufferSize) { //如果消息buf可读字节数+nioBufferSize大于整数的最大值,则跳出循环 // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will // raise an IOException. On Linux it may work depending on the // architecture and kernel but to be safe we also enforce the limit here. // This said writing more the Integer.MAX_VALUE is not a good idea anyway. // // See also: // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2 // - http://linux.die.net/man/2/writev break; } //更新buf的size nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } //需要buf的数量 int neededSpace = nioBufferCount + count; //如果buf需求数量大于当前nio buf数组 if (neededSpace > nioBuffers.length) { //则扩容nio数组为原来的两倍, nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); //更新nio buf数组 NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { //如果需要的buf数量为1,则获取写请求的buf ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer //如果buf为空,则创建一个buf实例 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } //将消息buf,添加到nio buf数组中 nioBuffers[nioBufferCount ++] = nioBuf; } else { //否则获取写请求的buf数组 ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms // of Object allocation //分配buf数组 entry.bufs = nioBufs = buf.nioBuffers(); } //添加写请求buf数组到通道Outbound缓存区的nio buf数组中 nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } } entry = entry.next; } //更新当前nio buffer 计数器和字节数 this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
//则扩容nio数组 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { // double capacity until it is big enough // See https://github.com/netty/netty/issues/1890 //则扩容nio数组为原来的两倍 newCapacity <<= 1; if (newCapacity < 0) { throw new IllegalStateException(); } } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; //拷贝原始中size buf到新的的buf数组中 System.arraycopy(array, 0, newArray, 0, size); return newArray; }
//添加写请求buf数组到通道Outbound缓存区的nio buf数组中 private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { //遍历添加的buf数组,添加到缓存区的nio buf数组中 for (ByteBuffer nioBuf: nioBufs) { if (nioBuf == null) { break; } nioBuffers[nioBufferCount ++] = nioBuf; } return nioBufferCount; }
从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,
主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,
这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
/** * Removes the fully written entries and update the reader index of the partially written entry. * This operation assumes all messages in this buffer is {@link ByteBuf}. 从刷新写请求链表,移除writtenBytes个字节数 */ public void removeBytes(long writtenBytes) { for (;;) { Object msg = current();//获取当前写请求消息 if (!(msg instanceof ByteBuf)) { //写请求非ByteBuf实例,且writtenBytes为0 assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //获取消息buf的读指针 final int readerIndex = buf.readerIndex(); //获取buf中可读的字节数 final int readableBytes = buf.writerIndex() - readerIndex; //如果可读字节数小于,需要移除的字节数 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { //则更新写请求任务进度 progress(readableBytes); //更新移除字节数 writtenBytes -= readableBytes; } //移除链头写请求消息 remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { //如果可读字节数大于需要移除的字节数,则移动消息buf的读索引到readerIndex + (int) writtenBytes位置 buf.readerIndex(readerIndex + (int) writtenBytes); //则更新写请求任务进度 progress(writtenBytes); } break; } } //最后清除nio buffer clearNioBuffers(); } /** * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written. 返回当前需要发送的消息 */ public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; } /** * Notify the {@link ChannelPromise} of the current message about writing progress. 获取通道刷新任务的进度 */ public void progress(long amount) { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { long progress = e.progress + amount; e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } } /** * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no * flushed message exists at the time this method is called it will return {@code false} to signal that no more * messages are ready to be handled. 移除当前消息,并标记通道异步任务为成功,并返回true。如果没有刷新消息存在,则返回false,表示没有消息需要处理 */ public boolean remove() { Entry e = flushedEntry; if (e == null) { //刷新消息链为空,则清除NioBuffer clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; //移除写请求Entry removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. //写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件 ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } //取消,则回收 // recycle the entry e.recycle(); return true; }
if (e == null) { //刷新消息链为空,则清除NioBuffer clearNioBuffers(); return false; }
// Clear all ByteBuffer from the array so these can be GC'ed. // See https://github.com/netty/netty/issues/3837 private void clearNioBuffers() { int count = nioBufferCount; if (count > 0) { //重置nio buf计数器,填充线程本地nio buf数组为空。 nioBufferCount = 0; Arrays.fill(NIO_BUFFERS.get(), 0, count, null); } }
//移除写请求Entry removeEntry(e);
private void removeEntry(Entry e) { if (-- flushed == 0) {//刷新链为空 // processed everything flushedEntry = null; if (e == tailEntry) {//链尾 tailEntry = null; unflushedEntry = null; } } else { //否则,刷新链头往后移一位 flushedEntry = e.next; } }
/** * A region of a file that is sent via a {@link Channel} which supports * [url=http://en.wikipedia.org/wiki/Zero-copy]zero-copy file transfer[/url]. * * <h3>Upgrade your JDK / JRE</h3> * * {@link FileChannel#transferTo(long, long, WritableByteChannel)} has at least * four known bugs in the old versions of Sun JDK and perhaps its derived ones. * Please upgrade your JDK to 1.6.0_18 or later version if you are going to use * zero-copy file transfer. * [list] * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988]5103988[/url] * - FileChannel.transferTo() should return -1 for EAGAIN instead throws IOException</li> * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6253145]6253145[/url] * - FileChannel.transferTo() on Linux fails when going beyond 2GB boundary</li> * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6427312]6427312[/url] * - FileChannel.transferTo() throws IOException "system call interrupted"</li> * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6524172]6470086[/url] * - FileChannel.transferTo(2147483647, 1, channel) causes "Value too large" exception</li> * [/list] * * <h3>Check your operating system and JDK / JRE</h3> * * If your operating system (or JDK / JRE) does not support zero-copy file * transfer, sending a file with {@link FileRegion} might fail or yield worse * performance. For example, sending a large file doesn't work well in Windows. * * <h3>Not all transports support it</h3> */ public interface FileRegion extends ReferenceCounted { /** * Returns the offset in the file where the transfer began. */ long position(); /** * Returns the bytes which was transfered already. * * @deprecated Use {@link #transferred()} instead. */ @Deprecated long transfered(); /** * Returns the bytes which was transfered already. */ long transferred(); /** * Returns the number of bytes to transfer. */ long count(); /** * Transfers the content of this file region to the specified channel. *转移当前文件Region内容到通道 * @param target the destination of the transfer * @param position the relative offset of the file where the transfer * begins from. For example, <tt>0</tt> will make the * transfer start from {@link #position()}th byte and * <tt>{@link #count()} - 1</tt> will make the last * byte of the region transferred. */ long transferTo(WritableByteChannel target, long position) throws IOException; @Override FileRegion retain(); @Override FileRegion retain(int increment); @Override FileRegion touch(); @Override FileRegion touch(Object hint); }
netty Pooled字节buf分配器
2017-09-28 13:00 2092netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2490netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1341netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1341netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1620netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1868netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1432netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2869netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2217netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2060netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1106netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1905netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1251netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1234netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 984netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1330netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2215netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1124netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1888netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1917netty Inboudn/Outbound通道Invoker ...
例如,`NioSocketChannel`代表一个客户端到服务器的TCP连接。 2. **EventLoopGroup**: 负责事件循环的线程池,处理来自通道的事件。通常有两个组,一个用于接受新连接(`BossGroup`),另一个处理已建立连接的数据...
- `Channel`有多种实现,如`NioServerSocketChannel`、`NioSocketChannel`等,选择合适的`Channel`类型对于性能至关重要。 3. **EventLoop**与**EventLoopGroup**: - `EventLoop`:负责处理一系列`Channel`的...
本文将深入探讨Netty中的几个关键组件,包括`NioServerSocketChannel`、`NioSocketChannel`、`NioEventLoop`、`NioEventLoopGroup`以及`Unsafe`系列类,并通过类关系图来解析它们的作用和相互关系。 1. **Nio...
Netty的核心在于其使用了非阻塞I/O(Non-blocking I/O,也称为NIO)模型,这使得它在处理大量并发连接时表现出色。NIO是一种I/O模型,与传统的阻塞I/O相比,它允许多个通道(channel)在单个线程中同时处理多个请求...
客户端代码结构与服务器端类似,同样使用 `NioEventLoopGroup` 和 `NioSocketChannel`,并通过 `ChannelInitializer` 初始化管道处理程序。值得注意的是,客户端的 `ChannelInitializer` 中也添加了 `StringDecoder`...
1. 创建 Bootstrap:与服务器类似,客户端也需要一个 Bootstrap,但 Channel 类型通常为 NioSocketChannel。 2. 连接服务器:通过调用 connect() 方法,指定服务器的 IP 和端口来建立连接。 3. 配置 Pipeline:...
.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler...
### Netty源码解析——服务启动过程 #### 一、Netty概述 Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐...
在Netty中,BossGroup负责接收新的连接请求,WorkerGroup则处理已连接的SocketChannel上的读写事件。BossGroup通常只有一个线程,而WorkerGroup可以有多个线程,这样的设计能够高效地处理并发连接。 Netty的...
.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p...
2. **客户端连接**:客户端使用Bootstrap实例,配置EventLoopGroup和Channel(如NioSocketChannel)。客户端的Pipeline中通常会包含解码器和编码器,以便将应用程序对象与网络传输的数据进行转换。 3. **数据交换**...
1. **配置Bootstrap**:为服务器和客户端设置Bootstrap,指定EventLoopGroup(负责事件循环)和Channel(如NioServerSocketChannel或NioSocketChannel)。 2. **添加处理器**:根据需求添加HttpServerCodec、...
.channel(NioSocketChannel.class) // 使用NIO的SocketChannel .handler(new ChannelInitializer<SocketChannel>() { // 配置通道处理器 @Override protected void initChannel(SocketChannel ch) throws ...
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...