- 浏览: 985044 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
引言:
前面篇文章我们看了抽象通道的内部类抽象Unsafe,先来回顾一下:
抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。地址绑定方法委托给doBind,待子类实现。
关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加消息到OutBound Buf中。
刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
在抽象通道的变量声明中,我们看到一个有Outbound buf,在抽象Unsafe的中的写消息,实际为将消息添加的Outbound buf中,刷新操作,即将Outbound buf中为未刷新的消息队列添加到刷新队列中,然后发送刷新队列消息。
今天我们就来看一下ChannelOutboundBuffer
从上面来看,通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,
一个未刷新的buf链表和一个刷新buf链表。
我们来看一下buf Entry的定义
通道写消息时,消息将会被包装成写请求Entry。
来看添加消息到通道Outbound缓冲区
//计算消息size
//在添加消息到未刷新写请求链表后,更新待发送的字节数
//ChannelConfig
//更新通道写状态
从上面可以看出,添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,
将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,
如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发通道ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
//更新通道待发送字节数的另一个版本,与上面不同道是,如果
通道待发送的字节数大于通道写bufsize,延时触发通道ChannelWritabilityChanged事件。
再来看刷新操作:
我们来看:
//ChannelConfig
//更新通道待发送字节数,另一个版本
从上面可以出,添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,
如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
再来看移除操作:
移除操作有几点要看:
1.
2.
从上面可以看出移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,
更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
再来看发送消息异常时,移除写请求
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
再来看将刷新链上的写请求消息,添加到nio buffer数组中:
1.
2.
从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
再来看其他方法:
//PromiseNotificationUtil
总结:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
附:
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
引言:
前面篇文章我们看了抽象通道的内部类抽象Unsafe,先来回顾一下:
抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。地址绑定方法委托给doBind,待子类实现。
关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加消息到OutBound Buf中。
刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
在抽象通道的变量声明中,我们看到一个有Outbound buf,在抽象Unsafe的中的写消息,实际为将消息添加的Outbound buf中,刷新操作,即将Outbound buf中为未刷新的消息队列添加到刷新队列中,然后发送刷新队列消息。
今天我们就来看一下ChannelOutboundBuffer
package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. 抽象通道用通道Outbound buf 存放写请求消息 * * All methods must be called by a transport implementation from an I/O thread, except the following ones: * [list] * [*]{@link #size()} and {@link #isEmpty()} * [*]{@link #isWritable()} * [*]{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)} * [/list] * */ public final class ChannelOutboundBuffer { // Assuming a 64-bit JVM: // - 16 bytes object header // - 8 reference fields // - 2 long fields // - 2 int fields // - 1 boolean field // - padding //Entry buf 头部数据size static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); //通道Outbound buf 线程本地Buf,存放刷新链表中的写请求消息 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { return new ByteBuffer[1024]; } }; //buf 关联通道 private final Channel channel; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry;刷新写请求链的链头 // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry;//未刷新的写请求链的链头 // The Entry which represents the tail of the buffer private Entry tailEntry; // The number of flushed entries that are not written yet private int flushed;//刷新Entry链上待发送的写请求数 private int nioBufferCount;//当前待发送的消息buf数量 private long nioBufferSize;//当前待发送的所有消息buf的字节数 private boolean inFail;//是否刷新失败 //通道待发送的字节数 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); @SuppressWarnings("UnusedDeclaration") private volatile int unwritable;//通道写状态 //触发通道ChannelWritabilityChanged事件任务线程 private volatile Runnable fireChannelWritabilityChangedTask; ChannelOutboundBuffer(AbstractChannel channel) { this.channel = channel; } }
从上面来看,通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,
一个未刷新的buf链表和一个刷新buf链表。
我们来看一下buf Entry的定义
static final class Entry { //Entry回收器 private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @Override protected Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }; private final Handle<Entry> handle;//Entry 对象回收Handle Entry next;//后继 Object msg;//消息对象 ByteBuffer[] bufs;//存放消息的字节buf数组 ByteBuffer buf;//存放消息的buf ChannelPromise promise;//写请求任务 long progress; long total;//消息size int pendingSize;//Entry消息字节数 int count = -1;//消息当前buf的数量 boolean cancelled; private Entry(Handle<Entry> handle) { this.handle = handle; } //创建Entry实例 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//待发送的消息字节数 entry.total = total; entry.promise = promise; return entry; } //取消写请求 int cancel() { if (!cancelled) { cancelled = true; int pSize = pendingSize; // release message and replace with an empty buffer //释放消息对象 ReferenceCountUtil.safeRelease(msg); msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; } //回收写请求,委托给 void recycle() { next = null; bufs = null; buf = null; msg = null; promise = null; progress = 0; total = 0; pendingSize = 0; count = -1; cancelled = false; handle.recycle(this); } //回收Entry,返回后继写请求 Entry recycleAndGetNext() { Entry next = this.next; recycle(); return next; } }
通道写消息时,消息将会被包装成写请求Entry。
来看添加消息到通道Outbound缓冲区
/** * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once * the message was written. */ public void addMessage(Object msg, int size, ChannelPromise promise) { //包装消息,为写请求Entry Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 //在添加消息到未刷新写请求链表后,更新待发送的字节数 incrementPendingOutboundBytes(entry.pendingSize, false); }
//计算消息size
private static long total(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).count(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return -1; }
//在添加消息到未刷新写请求链表后,更新待发送的字节数
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //如果待发送的字节数,大于通道写buf大小,则更新通道可状态 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { //更新通道写状态 setUnwritable(invokeLater); } }
//ChannelConfig
/** * Returns the high water mark of the write buffer. If the number of bytes * queued in the write buffer exceeds this value, {@link Channel#isWritable()} * will start to return {@code false}. */ int getWriteBufferHighWaterMark();
//更新通道写状态
private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { //写状态改变,则触发通道ChannelWritabilityChanged事件 fireChannelWritabilityChanged(invokeLater); } break; } } }
private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { //如果需要延时通知,则创建ChannelWritabilityChanged事件触发任务线程 Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { //通知通道,通道写状态已改变 pipeline.fireChannelWritabilityChanged(); } }; } //将ChannelWritabilityChanged事件触发线程,交由通道事件循环执行 channel.eventLoop().execute(task); } else { //否则直接触发ChannelWritabilityChanged事件 pipeline.fireChannelWritabilityChanged(); } }
从上面可以看出,添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,
将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,
如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发通道ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
//更新通道待发送字节数的另一个版本,与上面不同道是,如果
通道待发送的字节数大于通道写bufsize,延时触发通道ChannelWritabilityChanged事件。
/** * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ void incrementPendingOutboundBytes(long size) { incrementPendingOutboundBytes(size, true); }
再来看刷新操作:
/** * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed * and so you will be able to handle them. 刷新通道Outbound缓存区,即将先前添加的消息,标记为刷新 */ public void addFlush() { // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } //遍历未刷新写请求链表,将写请求添加到刷新链表中 do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes //如果写请求取消,则更新通道待发送字节数 int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry //置空未刷新写请求链表 unflushedEntry = null; } }
我们来看:
//如果写请求取消,则更新通道待发送字节数 int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true);
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } //更新通道待发送字节数 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //待发送字节数消息,小于通道配置的写buf size if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //更新通道可写状态 setWritable(invokeLater); } }
//ChannelConfig
/** * Returns the low water mark of the write buffer. Once the number of bytes * queued in the write buffer exceeded the * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then * dropped down below this value, {@link Channel#isWritable()} will start to return * {@code true} again. 一旦通道待发送字节数大于通道写buf的 high water mark,则丢弃如下值,Channel#isWritable将返回true */ int getWriteBufferLowWaterMark();
//更新通道可写状态 private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }
//更新通道待发送字节数,另一个版本
/** * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ void decrementPendingOutboundBytes(long size) { decrementPendingOutboundBytes(size, true, true); }
从上面可以出,添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,
如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
再来看移除操作:
/** * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no * flushed message exists at the time this method is called it will return {@code false} to signal that no more * messages are ready to be handled. 移除当前消息,并标记通道异步任务为成功,并返回true。如果没有刷新消息存在,则返回false,表示没有消息需要处理 */ public boolean remove() { Entry e = flushedEntry; if (e == null) { //刷新消息链为空,则清除NioBuffer clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; //移除写请求Entry removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. //写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件 ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } //取消,则回收 // recycle the entry e.recycle(); return true; }
移除操作有几点要看:
1.
if (e == null) { //刷新消息链为空,则清除NioBuffer clearNioBuffers(); return false; }
// Clear all ByteBuffer from the array so these can be GC'ed. // See https://github.com/netty/netty/issues/3837 private void clearNioBuffers() { int count = nioBufferCount; if (count > 0) { //重置nio buf计数器,填充线程本地nio buf数组为空。 nioBufferCount = 0; Arrays.fill(NIO_BUFFERS.get(), 0, count, null); } }
2.
//移除写请求Entry removeEntry(e);
private void removeEntry(Entry e) { if (-- flushed == 0) {//刷新链为空 // processed everything flushedEntry = null; if (e == tailEntry) {//链尾 tailEntry = null; unflushedEntry = null; } } else { //否则,刷新链头往后移一位 flushedEntry = e.next; } }
从上面可以看出移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,
更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
再来看发送消息异常时,移除写请求
/** * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable} * and return {@code true}. If no flushed message exists at the time this method is called it will return * {@code false} to signal that no more messages are ready to be handled. */ public boolean remove(Throwable cause) { return remove0(cause, true); } private boolean remove0(Throwable cause, boolean notifyWritability) { Entry e = flushedEntry; if (e == null) { //刷新消息链为空,则清除NioBuffer clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; //移除写请求Entry removeEntry(e); if (!e.cancelled) { // only release message, fail and decrement if it was not canceled before. //写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件 ReferenceCountUtil.safeRelease(msg); safeFail(promise, cause); decrementPendingOutboundBytes(size, false, notifyWritability); } //取消,则回收 // recycle the entry e.recycle(); return true; }
/** * Removes the fully written entries and update the reader index of the partially written entry. * This operation assumes all messages in this buffer is {@link ByteBuf}. 从刷新写请求链表,移除writtenBytes个字节数 */ public void removeBytes(long writtenBytes) { for (;;) { Object msg = current();//获取当前写请求消息 if (!(msg instanceof ByteBuf)) { //写请求非ByteBuf实例,且writtenBytes为0 assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //获取消息buf的读指针 final int readerIndex = buf.readerIndex(); //获取buf中可读的字节数 final int readableBytes = buf.writerIndex() - readerIndex; //如果可读字节数小于,需要移除的字节数 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { //则更新写请求任务进度 progress(readableBytes); //更新移除字节数 writtenBytes -= readableBytes; } //移除链头写请求消息 remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { //如果可读字节数大于需要移除的字节数,则移动消息buf的读索引到readerIndex + (int) writtenBytes位置 buf.readerIndex(readerIndex + (int) writtenBytes); //则更新写请求任务进度 progress(writtenBytes); } break; } } //最后清除nio buffer clearNioBuffers(); }
/** * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written. 返回当前需要发送的消息 */ public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
/** * Notify the {@link ChannelPromise} of the current message about writing progress. 获取通道刷新任务的进度 */ public void progress(long amount) { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { long progress = e.progress + amount; e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } }
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
再来看将刷新链上的写请求消息,添加到nio buffer数组中:
/** * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned * array and the total number of readable bytes of the NIO buffers respectively. 将刷新链中的写请求对象消息放到nio buf数组中。#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度 和可读字节数 * * Note that the returned array is reused and thus should not escape * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. 返回的nio buf将会被 NioSocketChannel#doWrite方法重用 * */ public ByteBuffer[] nioBuffers() { long nioBufferSize = 0;//nio buf数组中的字节数 int nioBufferCount = 0;//nio buf数组长度 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //获取通道Outbound缓存区线程本地的niobuf数组 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; //遍历刷新链,链上的写请求Entry的消息必须为ByteBuf while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { //在写请求没有取消的情况下,获取写请求消息buf,及buf的读索引,和可读字节数 ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { if (Integer.MAX_VALUE - readableBytes < nioBufferSize) { //如果消息buf可读字节数+nioBufferSize大于整数的最大值,则跳出循环 // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will // raise an IOException. On Linux it may work depending on the // architecture and kernel but to be safe we also enforce the limit here. // This said writing more the Integer.MAX_VALUE is not a good idea anyway. // // See also: // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2 // - http://linux.die.net/man/2/writev break; } //更新buf的size nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } //需要buf的数量 int neededSpace = nioBufferCount + count; //如果buf需求数量大于当前nio buf数组 if (neededSpace > nioBuffers.length) { //则扩容nio数组为原来的两倍, nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); //更新nio buf数组 NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { //如果需要的buf数量为1,则获取写请求的buf ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer //如果buf为空,则创建一个buf实例 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } //将消息buf,添加到nio buf数组中 nioBuffers[nioBufferCount ++] = nioBuf; } else { //否则获取写请求的buf数组 ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms // of Object allocation //分配buf数组 entry.bufs = nioBufs = buf.nioBuffers(); } //添加写请求buf数组到通道Outbound缓存区的nio buf数组中 nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } } entry = entry.next; } //更新当前nio buffer 计数器和字节数 this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
1.
//则扩容nio数组 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { // double capacity until it is big enough // See https://github.com/netty/netty/issues/1890 //则扩容nio数组为原来的两倍 newCapacity <<= 1; if (newCapacity < 0) { throw new IllegalStateException(); } } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; //拷贝原始中size buf到新的的buf数组中 System.arraycopy(array, 0, newArray, 0, size); return newArray; }
2.
//添加写请求buf数组到通道Outbound缓存区的nio buf数组中 private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { //遍历添加的buf数组,添加到缓存区的nio buf数组中 for (ByteBuffer nioBuf: nioBufs) { if (nioBuf == null) { break; } nioBuffers[nioBufferCount ++] = nioBuf; } return nioBufferCount; }
从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
再来看其他方法:
//刷新失败 void failFlushed(Throwable cause, boolean notify) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) // // See https://github.com/netty/netty/issues/1501 if (inFail) { //已失败,直接返回 return; } try { inFail = true; for (;;) { //移除刷新链头部 if (!remove0(cause, notify)) { break; } } } finally { inFail = false; } }
//关闭Outbound缓存区 void close(final ClosedChannelException cause) { if (inFail) { //已将刷新失败,则创建关闭任务线程,委托给事件循环执行 channel.eventLoop().execute(new Runnable() { @Override public void run() { close(cause); } }); return; } inFail = true; if (channel.isOpen()) { throw new IllegalStateException("close() must be invoked after the channel is closed."); } if (!isEmpty()) { throw new IllegalStateException("close() must be invoked after all flushed writes are handled."); } // Release all unflushed messages. try { Entry e = unflushedEntry; //遍历未刷新的写请求,更新写任务失败 while (e != null) { // Just decrease; do not trigger any events via decrementPendingOutboundBytes() int size = e.pendingSize; //更新通道待发送的字节数 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (!e.cancelled) { //如果写任务没有取消,则释放消息,更新任务状态为失败 ReferenceCountUtil.safeRelease(e.msg); safeFail(e.promise, cause); } //回收写请求 e = e.recycleAndGetNext(); } } finally { inFail = false; } //清除Nio buf数组 clearNioBuffers(); }
//更新任务成功 private static void safeSuccess(ChannelPromise promise) { // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return // false. PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger); }
//PromiseNotificationUtil
/** * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails. */ public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p); } else { logger.warn( "Failed to mark a promise as success because it has failed already: {}, unnotified cause:", p, err); } } }
//更新任务失败 private static void safeFail(ChannelPromise promise, Throwable cause) { // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return // false. PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger); }
//回收缓冲区 @Deprecated public void recycle() { // NOOP } //通道Outbound缓冲区中可写的字节数 public long totalPendingWriteBytes() { return totalPendingSize; } /** * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()} * was called. 此方法必须在#nioBuffers方法后调用,用于获取当前缓冲区可写的字节buf数组的长度 */ public int nioBufferCount() { return nioBufferCount; } /** * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()} * was called. 此方法必须在#nioBuffers方法后调用,用于获取当前缓冲区可写的字节buf数组中的字节数 */ */ public long nioBufferSize() { return nioBufferSize; } /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. 缓冲区是否可写,当且仅当#totalPendingWriteBytes方法的通道可写字节数,不超过通道的写watermark,同时 用户没有使用#setUserDefinedWritability,设置可写标志为false */ public boolean isWritable() { return unwritable == 0; } /** * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to * {@code true}. 获取用于定义的可写标志 */ public boolean getUserDefinedWritability(int index) { return (unwritable & writabilityMask(index)) == 0; } //可写掩码 private static int writabilityMask(int index) { if (index < 1 || index > 31) { throw new IllegalArgumentException("index: " + index + " (expected: 1~31)"); } return 1 << index; } /** * Sets a user-defined writability flag at the specified index. 根据可写状态,设置用户定义的可写标志 */ public void setUserDefinedWritability(int index, boolean writable) { if (writable) { setUserDefinedWritability(index); } else { clearUserDefinedWritability(index); } } //设置用户定义的可写标志 private void setUserDefinedWritability(int index) { final int mask = ~writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue & mask; //更新可写状态 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { //通知通道可写状态已改变 fireChannelWritabilityChanged(true); } break; } } } //清除可写状态 private void clearUserDefinedWritability(int index) { final int mask = writabilityMask(index); for (;;) { final int oldValue = unwritable; final int newValue = oldValue | mask; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(true); } break; } } } /** * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}. 返回通道Outbound缓冲区中需要刷新的消息数 */ public int size() { return flushed; } /** * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false} * otherwise. 如果没有需要刷新的消息,则返回true */ public boolean isEmpty() { return flushed == 0; } /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. 返回直到通道不可写,通道Outbound缓冲区还可以写多少字节的数。如果通道不可写,则返回0 */ public long bytesBeforeUnwritable() { //通道写缓存区大小-当前缓冲区字节数 long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize; // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? bytes : 0; } return 0; } /** * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. 获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0 */ public long bytesBeforeWritable() { //当前缓冲区字节数-通道写缓存的rLowWaterMark long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark(); // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? 0 : bytes; } return 0; }
/** * Call {@link MessageProcessor#processMessage(Object)} for each flushed message * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)} * returns {@code false} or there are no more flushed messages to process. 当刷新消息时,调用消息处理器的处理消息方法#processMessage,处理每个消息,直到#processMessage方法 返回false,或没有消息需要刷新 */ public void forEachFlushedMessage(MessageProcessor processor) throws Exception { if (processor == null) { throw new NullPointerException("processor"); } Entry entry = flushedEntry; if (entry == null) { return; } //遍历刷新链表 do { if (!entry.cancelled) { //如果写任务没有取消,则处理消息 if (!processor.processMessage(entry.msg)) { return; } } entry = entry.next; } while (isFlushedEntry(entry)); } //判断写请求是否在刷新链表上 private boolean isFlushedEntry(Entry e) { return e != null && e != unflushedEntry; } //消息处理器 public interface MessageProcessor { /** * Will be called for each flushed message until it either there are no more flushed messages or this * method returns {@code false}. 在刷新消息时,将会调用 */ boolean processMessage(Object msg) throws Exception; }
总结:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
附:
package io.netty.util; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.lang.ref.WeakReference; import java.util.Arrays; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo; import static java.lang.Math.max; import static java.lang.Math.min; /** * Light-weight object pool based on a thread-local stack. *基于线程本地栈的轻量级对象池 * @param <T> the type of the pooled object */ public abstract class Recycler<T> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class); @SuppressWarnings("rawtypes") private static final Handle NOOP_HANDLE = new Handle() { @Override public void recycle(Object object) { // NOOP } }; private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE); private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement(); private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 32768; // Use 32k instances as default. private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int MAX_DELAYED_QUEUES_PER_THREAD; private static final int LINK_CAPACITY; private static final int RATIO; static { // In the future, we might have different maxCapacity for different object types. // e.g. io.netty.recycler.maxCapacity.writeTask // io.netty.recycler.maxCapacity.outboundBuffer int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD)); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD; } DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread; MAX_SHARED_CAPACITY_FACTOR = max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); MAX_DELAYED_QUEUES_PER_THREAD = max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", // We use the same value as default EventLoop number NettyRuntime.availableProcessors() * 2)); LINK_CAPACITY = safeFindNextPositivePowerOfTwo( max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before. // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation // bursts. RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8)); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled"); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled"); logger.debug("-Dio.netty.recycler.linkCapacity: disabled"); logger.debug("-Dio.netty.recycler.ratio: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR); logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY); logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); } } INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256); } private final int maxCapacityPerThread;//每个线程的最大对象容量 private final int maxSharedCapacityFactor;//容量共享因子 private final int ratioMask; private final int maxDelayedQueuesPerThread; //回收器线程本地对象栈 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } }; protected Recycler() { this(DEFAULT_MAX_CAPACITY_PER_THREAD); } protected Recycler(int maxCapacityPerThread) { this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) { this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1; if (maxCapacityPerThread <= 0) { this.maxCapacityPerThread = 0; this.maxSharedCapacityFactor = 1; this.maxDelayedQueuesPerThread = 0; } else { this.maxCapacityPerThread = maxCapacityPerThread; this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread); } } //获取本线程的对象 @SuppressWarnings("unchecked") public final T get() { if (maxCapacityPerThread == 0) { //创建对象 return newObject((Handle<T>) NOOP_HANDLE); } //否则从栈pop一个handle对象, Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { //handle对象为空,则创建对象 handle = stack.newHandle(); handle.value = newObject(handle); } //否则返回handle对象的值 return (T) handle.value; } //创建对象,在子列扩展 protected abstract T newObject(Handle<T> handle); public interface Handle<T> { void recycle(T object); } static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; private Stack<?> stack; private Object value;//Handle对象值 DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } //回收对象,将对象放入线程本地栈 stack.push(this); } } /** * @deprecated use {@link Handle#recycle(Object)}. 回收对象 */ @Deprecated public final boolean recycle(T o, Handle<T> handle) { if (handle == NOOP_HANDLE) { return false; } DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; } h.recycle(o); return true; } //延时回收队列 private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); } }; static final class Stack<T> { // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other // than the stack owner recycles: when we run out of items in our stack we iterate this collection // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst // still recycling all items. final Recycler<T> parent; final Thread thread; final AtomicInteger availableSharedCapacity; final int maxDelayedQueues; private final int maxCapacity; private final int ratioMask; private DefaultHandle<?>[] elements; private int size; private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; ... } // a queue that makes only moderate guarantees about visibility: items are seen in the correct order, // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain private static final class WeakOrderQueue { static final WeakOrderQueue DUMMY = new WeakOrderQueue(); // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex. @SuppressWarnings("serial") private static final class Link extends AtomicInteger { private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY]; private int readIndex; private Link next; } // chain of data items private Link head, tail; // pointer to another queue of delayed items for the same stack private WeakOrderQueue next; private final WeakReference<Thread> owner; private final int id = ID_GENERATOR.getAndIncrement(); private final AtomicInteger availableSharedCapacity; private WeakOrderQueue() { owner = null; availableSharedCapacity = null; } private WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. availableSharedCapacity = stack.availableSharedCapacity; } ... } ... }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1325netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2065netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2456netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1321netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1321netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1602netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1849netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1401netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2845netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2188netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2043netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1084netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1881netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1223netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1206netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 961netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1314netty Inboudn/Outbound通道Inv ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1086netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1861netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1886netty Inboudn/Outbound通道Invoker ...
相关推荐
3. **ByteBuf**:Netty自定义的缓冲区,优于Java的ByteBuffer,提供了一种高效且易于使用的内存管理机制,支持直接缓冲区和堆缓冲区,并且可以动态扩容。 4. **Pipeline(处理器链)**:Netty的处理器链设计允许在...
8. **零拷贝**:Netty 通过直接缓冲区和内存映射文件实现了零拷贝技术,减少了数据在内核空间和用户空间之间的复制,提高了性能。 9. **多协议支持**:Netty 支持多种网络协议,如 HTTP、FTP、SMTP、TCP、UDP 等,...
1. **ByteBuf**: Netty提供了自己的字节缓冲区实现ByteBuf,相较于Java NIO中的ByteBuffer,ByteBuf更易用且高效。它支持两种模式:直接缓冲和堆缓冲,可以根据需求选择最适合的内存分配方式。 2. **Channel**: ...
4. **ByteBuf**:ByteBuf是Netty中的缓冲区,提供了一种高效的数据操作接口。HeapByteBuf允许访问其底层的byte数组,而DirectByteBuf则直接在Java堆外内存中存储数据,通常用于减少内存拷贝,提高性能。调用array()...
6. **ByteBuf**: Netty的字节缓冲区,比Java的ByteBuffer更强大,提供了更多的功能和优化。 在这个"netty-test-master"项目中,我们可以期待以下关键知识点: 1. **基本服务器和客户端搭建**: 项目可能包含一个...
- Netty引入了ByteBuf作为数据缓冲区,优于Java的ByteBuffer,提供更方便的操作接口,避免了内存复制,提高了性能。 3. **Channel与Pipeline**: - Channel代表网络连接,负责读写数据。Pipeline则是一系列处理器...
- Netty的数据传输基于`ByteBuf`,它是Netty提供的高效字节缓冲区,优于Java的`ByteBuffer`。 - 在ChannelHandlerContext中,我们可以调用`writeAndFlush()`方法来发送数据。`writeAndFlush()`会将数据写入缓冲区...
- Netty是基于Java NIO(非阻塞I/O)构建的,理解Java NIO的基本原理,包括通道(Channels)、缓冲区(Buffers)和选择器(Selectors)至关重要。 - 非阻塞I/O的优势在于能够处理大量并发连接,提高系统资源利用率...
3. **ByteBuf**:Netty 提供的高效字节缓冲区,优于传统的 Java `ByteBuffer`。ByteBuf 支持读写索引管理,更易于操作二进制数据。 4. **Pipeline**:Netty 使用 ChannelHandler 组成的链式处理结构,每个 ...
- **ByteBuf**:Netty自定义的缓冲区,比Java的ByteBuffer更高效,支持读写分离,便于内存管理。 - **Decoder** 和 **Encoder**:解码器和编码器,用于数据的转换,例如将字节流转为对象或对象转为字节流。 - **...
此外,《Netty权威指南》还会详细讲解Netty的缓冲区(ByteBuf)机制,它比Java的ByteBuffer更高效,提供了丰富的操作方法。同时,书中也会涉及Netty的编解码器(Codec)设计,如何使用它们来处理各种复杂的协议,如...
- **ByteBuf**:Netty提供的高效字节缓冲区,优于Java的ByteBuffer,支持读写分离,提供了更多的操作API。 3. **Netty的工作流程** - **连接建立**:客户端通过Bootstrap创建连接请求,服务器端通过...
- Netty 4.0 的 JAR 文件被拆分为多个模块,每个模块专注于特定的功能,如缓冲区、传输、处理器等。这使得用户可以根据需求选择引入必要的模块,减少依赖。 2. **通用 API 的改变**: - `ChannelBuffer` 被替换为...