  • 浏览: 988649 次

netty 通道Outbound缓冲区

netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
      抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
      关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
     写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加消息到OutBound Buf中。
     刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
在抽象通道的变量声明中,我们看到一个有Outbound buf,在抽象Unsafe的中的写消息,实际为将消息添加的Outbound buf中,刷新操作,即将Outbound buf中为未刷新的消息队列添加到刷新队列中,然后发送刷新队列消息。
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
 * outbound write requests.
 抽象通道用通道Outbound  buf 存放写请求消息
 * All methods must be called by a transport implementation from an I/O thread, except the following ones:
 * [list]
 * [*]{@link #size()} and {@link #isEmpty()}

 * [*]{@link #isWritable()}

 * [*]{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}

 * [/list]

public final class ChannelOutboundBuffer {
    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    //Entry buf 头部数据size
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
    //通道Outbound buf 线程本地Buf,存放刷新链表中的写请求消息
    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
   //buf 关联通道
    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    // The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;刷新写请求链的链头
    // The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;//未刷新的写请求链的链头
    // The Entry which represents the tail of the buffer
    private Entry tailEntry;
    // The number of flushed entries that are not written yet
    private int flushed;//刷新Entry链上待发送的写请求数

    private int nioBufferCount;//当前待发送的消息buf数量
    private long nioBufferSize;//当前待发送的所有消息buf的字节数

    private boolean inFail;//是否刷新失败
    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

    private volatile long totalPendingSize;

    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    private volatile int unwritable;//通道写状态
    private volatile Runnable fireChannelWritabilityChangedTask;

    ChannelOutboundBuffer(AbstractChannel channel) {
        this.channel = channel;


我们来看一下buf Entry的定义
static final class Entry {
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);

    private final Handle<Entry> handle;//Entry 对象回收Handle
    Entry next;//后继
    Object msg;//消息对象
    ByteBuffer[] bufs;//存放消息的字节buf数组
    ByteBuffer buf;//存放消息的buf
    ChannelPromise promise;//写请求任务
    long progress;
    long total;//消息size
    int pendingSize;//Entry消息字节数
    int count = -1;//消息当前buf的数量
    boolean cancelled;

    private Entry(Handle<Entry> handle) {
        this.handle = handle;
    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//待发送的消息字节数
        entry.total = total;
        entry.promise = promise;
        return entry;
    int cancel() {
        if (!cancelled) {
            cancelled = true;
            int pSize = pendingSize;
            // release message and replace with an empty buffer
            msg = Unpooled.EMPTY_BUFFER;

            pendingSize = 0;
            total = 0;
            progress = 0;
            bufs = null;
            buf = null;
            return pSize;
        return 0;
    void recycle() {
        next = null;
        bufs = null;
        buf = null;
        msg = null;
        promise = null;
        progress = 0;
        total = 0;
        pendingSize = 0;
        count = -1;
        cancelled = false;
    Entry recycleAndGetNext() {
        Entry next = this.next;
        return next;


 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);

private static long total(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    if (msg instanceof FileRegion) {
        return ((FileRegion) msg).count();
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    return -1;

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {

 * Returns the high water mark of the write buffer.  If the number of bytes
 * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
 * will start to return {@code false}.
int getWriteBufferHighWaterMark();

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {

private void fireChannelWritabilityChanged(boolean invokeLater) {
    final ChannelPipeline pipeline = channel.pipeline();
    if (invokeLater) {
        Runnable task = fireChannelWritabilityChangedTask;
        if (task == null) {
            fireChannelWritabilityChangedTask = task = new Runnable() {
                public void run() {
    } else {


  * Increment the pending bytes which will be written at some point.
  * This method is thread-safe!
 void incrementPendingOutboundBytes(long size) {
     incrementPendingOutboundBytes(size, true);

 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
 * and so you will be able to handle them.
public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;

int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    //待发送字节数消息,小于通道配置的写buf size
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {

 * Returns the low water mark of the write buffer.  Once the number of bytes
 * queued in the write buffer exceeded the
 * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
 * dropped down below this value, {@link Channel#isWritable()} will start to return
 * {@code true} again.
 一旦通道待发送字节数大于通道写buf的 high water mark,则丢弃如下值,Channel#isWritable将返回true
int getWriteBufferLowWaterMark();

private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {

 * Decrement the pending bytes which will be written at some point.
 * This method is thread-safe!
void decrementPendingOutboundBytes(long size) {
    decrementPendingOutboundBytes(size, true, true);

如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。

 * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
 * flushed message exists at the time this method is called it will return {@code false} to signal that no more
 * messages are ready to be handled.
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        return false;
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        decrementPendingOutboundBytes(size, false, true);
    // recycle the entry

    return true;

if (e == null) {
    return false;

// Clear all ByteBuffer from the array so these can be GC'ed.
// See https://github.com/netty/netty/issues/3837
private void clearNioBuffers() {
    int count = nioBufferCount;
    if (count > 0) {
        //重置nio buf计数器,填充线程本地nio buf数组为空。
        nioBufferCount = 0;
        Arrays.fill(NIO_BUFFERS.get(), 0, count, null);


private void removeEntry(Entry e) {
    if (-- flushed == 0) {//刷新链为空
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {//链尾
            tailEntry = null;
            unflushedEntry = null;
    } else {
        flushedEntry = e.next;


  * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
  * and return {@code true}. If no   flushed message exists at the time this method is called it will return
  * {@code false} to signal that no more messages are ready to be handled.
 public boolean remove(Throwable cause) {
     return remove0(cause, true);

 private boolean remove0(Throwable cause, boolean notifyWritability) {
     Entry e = flushedEntry;
     if (e == null) {
         return false;
     Object msg = e.msg;

     ChannelPromise promise = e.promise;
     int size = e.pendingSize;

     if (!e.cancelled) {
         // only release message, fail and decrement if it was not canceled before.
         safeFail(promise, cause);
         decrementPendingOutboundBytes(size, false, notifyWritability);
     // recycle the entry

     return true;

 * Removes the fully written entries and update the reader index of the partially written entry.
 * This operation assumes all messages in this buffer is {@link ByteBuf}.

public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();//获取当前写请求消息
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                writtenBytes -= readableBytes;
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
	       //如果可读字节数大于需要移除的字节数,则移动消息buf的读索引到readerIndex + (int) writtenBytes位置
                buf.readerIndex(readerIndex + (int) writtenBytes);
    //最后清除nio buffer

 * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;

    return entry.msg;

 * Notify the {@link ChannelPromise} of the current message about writing progress.
public void progress(long amount) {
    Entry e = flushedEntry;
    assert e != null;
    ChannelPromise p = e.promise;
    if (p instanceof ChannelProgressivePromise) {
        long progress = e.progress + amount;
        e.progress = progress;
        ((ChannelProgressivePromise) p).tryProgress(progress, e.total);


再来看将刷新链上的写请求消息,添加到nio buffer数组中:
 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
 * array and the total number of readable bytes of the NIO buffers respectively.
 将刷新链中的写请求对象消息放到nio buf数组中。#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度
 * Note that the returned array is reused and thus should not escape
 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
 * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
 返回的nio buf将会被 NioSocketChannel#doWrite方法重用

public ByteBuffer[] nioBuffers() {
    long nioBufferSize = 0;//nio buf数组中的字节数
    int nioBufferCount = 0;//nio buf数组长度
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
    Entry entry = flushedEntry;
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
        if (!entry.cancelled) {
            ByteBuf buf = (ByteBuf) entry.msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            if (readableBytes > 0) {
                if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
                    // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the
                    // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then
                    // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will
                    // raise an IOException. On Linux it may work depending on the
                    // architecture and kernel but to be safe we also enforce the limit here.
                    // This said writing more the Integer.MAX_VALUE is not a good idea anyway.
                    // See also:
                    // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                    // - http://linux.die.net/man/2/writev
                nioBufferSize += readableBytes;
                int count = entry.count;
                if (count == -1) {
                    //noinspection ConstantValueVariableUse
                    entry.count = count =  buf.nioBufferCount();
                int neededSpace = nioBufferCount + count;
		//如果buf需求数量大于当前nio buf数组
                if (neededSpace > nioBuffers.length) {
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
		    //更新nio buf数组
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                if (count == 1) {
                    ByteBuffer nioBuf = entry.buf;
                    if (nioBuf == null) {
                        // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                        // derived buffer
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
		    //将消息buf,添加到nio buf数组中
                    nioBuffers[nioBufferCount ++] = nioBuf;
                } else {
                    ByteBuffer[] nioBufs = entry.bufs;
                    if (nioBufs == null) {
                        // cached ByteBuffers as they may be expensive to create in terms
                        // of Object allocation
                        entry.bufs = nioBufs = buf.nioBuffers();
		    //添加写请求buf数组到通道Outbound缓存区的nio buf数组中
                    nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
        entry = entry.next;
    //更新当前nio buffer 计数器和字节数
    this.nioBufferCount = nioBufferCount;
    this.nioBufferSize = nioBufferSize;

    return nioBuffers;

private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
    int newCapacity = array.length;
    do {
        // double capacity until it is big enough
        // See https://github.com/netty/netty/issues/1890
        newCapacity <<= 1;

        if (newCapacity < 0) {
            throw new IllegalStateException();

    } while (neededSpace > newCapacity);
    ByteBuffer[] newArray = new ByteBuffer[newCapacity];
    //拷贝原始中size buf到新的的buf数组中
    System.arraycopy(array, 0, newArray, 0, size);

    return newArray;

//添加写请求buf数组到通道Outbound缓存区的nio buf数组中
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
    //遍历添加的buf数组,添加到缓存区的nio buf数组中
    for (ByteBuffer nioBuf: nioBufs) {
        if (nioBuf == null) {
        nioBuffers[nioBufferCount ++] = nioBuf;
    return nioBufferCount;

从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

void failFlushed(Throwable cause, boolean notify) {
    // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
    // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
    // indirectly (usually by closing the channel.)
    // See https://github.com/netty/netty/issues/1501
    if (inFail) {
    try {
        inFail = true;
        for (;;) {
            if (!remove0(cause, notify)) {
    } finally {
        inFail = false;

void close(final ClosedChannelException cause) {
    if (inFail) {
        channel.eventLoop().execute(new Runnable() {
            public void run() {

    inFail = true;

    if (channel.isOpen()) {
        throw new IllegalStateException("close() must be invoked after the channel is closed.");

    if (!isEmpty()) {
        throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");

    // Release all unflushed messages.
    try {
        Entry e = unflushedEntry;
        while (e != null) {
            // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
            int size = e.pendingSize;
            TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);

            if (!e.cancelled) {
                safeFail(e.promise, cause);
            e = e.recycleAndGetNext();
    } finally {
        inFail = false;
    //清除Nio buf数组

private static void safeSuccess(ChannelPromise promise) {
    // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
    // false.
    PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);

 * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails.
public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) {
    if (!p.trySuccess(result) && logger != null) {
        Throwable err = p.cause();
        if (err == null) {
            logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p);
        } else {
                    "Failed to mark a promise as success because it has failed already: {}, unnotified cause:",
                    p, err);

private static void safeFail(ChannelPromise promise, Throwable cause) {
    // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
    // false.
    PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);

public void recycle() {
    // NOOP
public long totalPendingWriteBytes() {
    return totalPendingSize;

 * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
 * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()}
 * was called.
public int nioBufferCount() {
    return nioBufferCount;

 * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
 * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()}
 * was called.
public long nioBufferSize() {
    return nioBufferSize;

 * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
 * not exceed the write watermark of the {@link Channel} and
 * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
 * {@code false}.
public boolean isWritable() {
    return unwritable == 0;

 * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
 * {@code true}.
public boolean getUserDefinedWritability(int index) {
    return (unwritable & writabilityMask(index)) == 0;

private static int writabilityMask(int index) {
    if (index < 1 || index > 31) {
        throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
    return 1 << index;

 * Sets a user-defined writability flag at the specified index.
public void setUserDefinedWritability(int index, boolean writable) {
    if (writable) {
    } else {
private void setUserDefinedWritability(int index) {
    final int mask = ~writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
private void clearUserDefinedWritability(int index) {
    final int mask = writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {

 * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
public int size() {
    return flushed;

 * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
 * otherwise.
public boolean isEmpty() {
    return flushed == 0;

 * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
 * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
public long bytesBeforeUnwritable() {
    long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
    // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
    // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
    // together. totalPendingSize will be updated before isWritable().
    if (bytes > 0) {
        return isWritable() ? bytes : 0;
    return 0;

 * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
 * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
public long bytesBeforeWritable() {
    long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
    // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
    // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
    // together. totalPendingSize will be updated before isWritable().
    if (bytes > 0) {
        return isWritable() ? 0 : bytes;
    return 0;

 * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
 * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
 * returns {@code false} or there are no more flushed messages to process.
public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
    if (processor == null) {
        throw new NullPointerException("processor");
    Entry entry = flushedEntry;
    if (entry == null) {
    do {
        if (!entry.cancelled) {
            if (!processor.processMessage(entry.msg)) {
        entry = entry.next;
    } while (isFlushedEntry(entry));
private boolean isFlushedEntry(Entry e) {
    return e != null && e != unflushedEntry;
public interface MessageProcessor {
     * Will be called for each flushed message until it either there are no more flushed messages or this
     * method returns {@code false}.
    boolean processMessage(Object msg) throws Exception;



添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。



将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;

 * Light-weight object pool based on a thread-local stack.
 * @param <T> the type of the pooled object
public abstract class Recycler<T> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);

    private static final Handle NOOP_HANDLE = new Handle() {
        public void recycle(Object object) {
            // NOOP
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
    private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
    private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 32768; // Use 32k instances as default.
    private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
    private static final int INITIAL_CAPACITY;
    private static final int MAX_SHARED_CAPACITY_FACTOR;
    private static final int MAX_DELAYED_QUEUES_PER_THREAD;
    private static final int LINK_CAPACITY;
    private static final int RATIO;

    static {
        // In the future, we might have different maxCapacity for different object types.
        // e.g. io.netty.recycler.maxCapacity.writeTask
        //      io.netty.recycler.maxCapacity.outboundBuffer
        int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
                SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
        if (maxCapacityPerThread < 0) {
            maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;

        DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;


                        // We use the same value as default EventLoop number
                        NettyRuntime.availableProcessors() * 2));

        LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
                max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));

        // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
        // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
        // bursts.
        RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

        if (logger.isDebugEnabled()) {
            if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
                logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
                logger.debug("-Dio.netty.recycler.ratio: disabled");
            } else {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
                logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
                logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);


    private final int maxCapacityPerThread;//每个线程的最大对象容量
    private final int maxSharedCapacityFactor;//容量共享因子
    private final int ratioMask;
    private final int maxDelayedQueuesPerThread;
    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);

    protected Recycler() {

    protected Recycler(int maxCapacityPerThread) {
        this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
        this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
                       int ratio, int maxDelayedQueuesPerThread) {
        ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
        if (maxCapacityPerThread <= 0) {
            this.maxCapacityPerThread = 0;
            this.maxSharedCapacityFactor = 1;
            this.maxDelayedQueuesPerThread = 0;
        } else {
            this.maxCapacityPerThread = maxCapacityPerThread;
            this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
            this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        return (T) handle.value;

    protected abstract T newObject(Handle<T> handle);

    public interface Handle<T> {
        void recycle(T object);

    static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;

        private Stack<?> stack;
        private Object value;//Handle对象值

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;

        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
     * @deprecated use {@link Handle#recycle(Object)}.
    public final boolean recycle(T o, Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;

        DefaultHandle<T> h = (DefaultHandle<T>) handle;
        if (h.stack.parent != this) {
            return false;

        return true;
    private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
            new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
        protected Map<Stack<?>, WeakOrderQueue> initialValue() {
            return new WeakHashMap<Stack<?>, WeakOrderQueue>();

     static final class Stack<T> {

        // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
        // than the stack owner recycles: when we run out of items in our stack we iterate this collection
        // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
        // still recycling all items.
        final Recycler<T> parent;
        final Thread thread;
        final AtomicInteger availableSharedCapacity;
        final int maxDelayedQueues;

        private final int maxCapacity;
        private final int ratioMask;
        private DefaultHandle<?>[] elements;
        private int size;
        private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
    // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
    // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
    private static final class WeakOrderQueue {

        static final WeakOrderQueue DUMMY = new WeakOrderQueue();

        // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
        private static final class Link extends AtomicInteger {
            private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

            private int readIndex;
            private Link next;

        // chain of data items
        private Link head, tail;
        // pointer to another queue of delayed items for the same stack
        private WeakOrderQueue next;
        private final WeakReference<Thread> owner;
        private final int id = ID_GENERATOR.getAndIncrement();
        private final AtomicInteger availableSharedCapacity;

        private WeakOrderQueue() {
            owner = null;
            availableSharedCapacity = null;

        private WeakOrderQueue(Stack<?> stack, Thread thread) {
            head = tail = new Link();
            owner = new WeakReference<Thread>(thread);

            // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
            // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
            // Stack itself GCed.
            availableSharedCapacity = stack.availableSharedCapacity;



    3. **ByteBuf**:Netty自定义的缓冲区,优于Java的ByteBuffer,提供了一种高效且易于使用的内存管理机制,支持直接缓冲区和堆缓冲区,并且可以动态扩容。 4. **Pipeline(处理器链)**:Netty的处理器链设计允许在...


    8. **零拷贝**:Netty 通过直接缓冲区和内存映射文件实现了零拷贝技术,减少了数据在内核空间和用户空间之间的复制,提高了性能。 9. **多协议支持**:Netty 支持多种网络协议,如 HTTP、FTP、SMTP、TCP、UDP 等,...

    netty 实战-netty_in_action.zip

    1. **ByteBuf**: Netty提供了自己的字节缓冲区实现ByteBuf,相较于Java NIO中的ByteBuffer,ByteBuf更易用且高效。它支持两种模式:直接缓冲和堆缓冲,可以根据需求选择最适合的内存分配方式。 2. **Channel**: ...


    4. **ByteBuf**:ByteBuf是Netty中的缓冲区,提供了一种高效的数据操作接口。HeapByteBuf允许访问其底层的byte数组,而DirectByteBuf则直接在Java堆外内存中存储数据,通常用于减少内存拷贝,提高性能。调用array()...


    6. **ByteBuf**: Netty的字节缓冲区,比Java的ByteBuffer更强大,提供了更多的功能和优化。 在这个"netty-test-master"项目中,我们可以期待以下关键知识点: 1. **基本服务器和客户端搭建**: 项目可能包含一个...


    - Netty引入了ByteBuf作为数据缓冲区,优于Java的ByteBuffer,提供更方便的操作接口,避免了内存复制,提高了性能。 3. **Channel与Pipeline**: - Channel代表网络连接,负责读写数据。Pipeline则是一系列处理器...


    - Netty的数据传输基于`ByteBuf`,它是Netty提供的高效字节缓冲区,优于Java的`ByteBuffer`。 - 在ChannelHandlerContext中,我们可以调用`writeAndFlush()`方法来发送数据。`writeAndFlush()`会将数据写入缓冲区...


    - Netty是基于Java NIO(非阻塞I/O)构建的,理解Java NIO的基本原理,包括通道(Channels)、缓冲区(Buffers)和选择器(Selectors)至关重要。 - 非阻塞I/O的优势在于能够处理大量并发连接,提高系统资源利用率...


    3. **ByteBuf**:Netty 提供的高效字节缓冲区,优于传统的 Java `ByteBuffer`。ByteBuf 支持读写索引管理,更易于操作二进制数据。 4. **Pipeline**:Netty 使用 ChannelHandler 组成的链式处理结构,每个 ...


    - **ByteBuf**:Netty自定义的缓冲区,比Java的ByteBuffer更高效,支持读写分离,便于内存管理。 - **Decoder** 和 **Encoder**:解码器和编码器,用于数据的转换,例如将字节流转为对象或对象转为字节流。 - **...




    - **ByteBuf**:Netty提供的高效字节缓冲区,优于Java的ByteBuffer,支持读写分离,提供了更多的操作API。 3. **Netty的工作流程** - **连接建立**:客户端通过Bootstrap创建连接请求,服务器端通过...


    - Netty 4.0 的 JAR 文件被拆分为多个模块,每个模块专注于特定的功能,如缓冲区、传输、处理器等。这使得用户可以根据需求选择引入必要的模块,减少依赖。 2. **通用 API 的改变**: - `ChannelBuffer` 被替换为...

Global site tag (gtag.js) - Google Analytics