`
Donald_Draper
  • 浏览: 981454 次
社区版块
存档分类
最新评论

netty Unpooled字节buf分配器

 
阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
引言
上一篇我们看了抽象字节buf分配器,先来回顾一下:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。

今天我们来看抽象字节buf分配器一个具体实现非池类字节buf分配器UnpooledByteBufAllocator
/**
 * Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
 */
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    //字节buf分配器Metric
    private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
    private final boolean disableLeakDetector;//是否关闭资源泄漏探测

    /**
     * Default instance which uses leak-detection for direct buffers.
     默认非池类字节buf分配器
     */
    public static final UnpooledByteBufAllocator DEFAULT =
            new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());

    /**
     * Create a new instance which uses leak-detection for direct buffers.
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     */
    public UnpooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, false);
    }

    /**
     * Create a new instance
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     * @param disableLeakDetector {@code true} if the leak-detection should be disabled completely for this
     *                            allocator. This can be useful if the user just want to depend on the GC to handle
     *                            direct buffers when not explicit released.
     */
    public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
        super(preferDirect);
        this.disableLeakDetector = disableLeakDetector;
    }
}


来看一下度量器
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
    final LongCounter directCounter = PlatformDependent.newLongCounter();//direct内存使用计数器
    final LongCounter heapCounter = PlatformDependent.newLongCounter();//Heap内存使用计数器
    @Override
    public long usedHeapMemory() {
        return heapCounter.value();
    }
    @Override
    public long usedDirectMemory() {
        return directCounter.value();
    }
    @Override
    public String toString() {
        return StringUtil.simpleClassName(this) +
                "(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')';
    }
}


//LongCounter Long计数器
/**
 * Counter for long.
 */
public interface LongCounter {
    void add(long delta);
    void increment();
    void decrement();
    long value();
}

来看创建堆buf
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

堆buf有两种,我们分别来看
先看InstrumentedUnpooledUnsafeHeapByteBuf
1.
//InstrumentedUnpooledUnsafeHeapByteBuf
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
    InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    byte[] allocateArray(int initialCapacity) {
        byte[] bytes = super.allocateArray(initialCapacity);
	//更新buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
        return bytes;
    }

    @Override
    void freeArray(byte[] array) {
        int length = array.length;
        super.freeArray(array);
	//释放buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
    }
}


//UnpooledUnsafeHeapByteBuf
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }
    ...
}

2.
再来看另一种堆buf InstrumentedUnpooledHeapByteBuf
//InstrumentedUnpooledHeapByteBuf
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
    InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    byte[] allocateArray(int initialCapacity) {
        byte[] bytes = super.allocateArray(initialCapacity);
	//更新buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
        return bytes;
    }

    @Override
    void freeArray(byte[] array) {
        int length = array.length;
        super.freeArray(array);
	//释放buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
    }
}

我们来看一下Unpooled堆字节buf:
/**
 * Big endian Java heap buffer implementation.
 */
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    byte[] array;//存储数据的字节数组
    private ByteBuffer tmpNioBuf;//临时nio 字节部分

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");

        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }

    /**
     * Creates a new heap buffer with an existing byte array.
     *
     * @param initialArray the initial underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");
        checkNotNull(initialArray, "initialArray");

        if (initialArray.length > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
        }

        this.alloc = alloc;
        setArray(initialArray);
        setIndex(0, initialArray.length);
    }
    //分配initialCapacity容量的字节数组
    byte[] allocateArray(int initialCapacity) {
        return new byte[initialCapacity];
    }
    ...
    @Override
    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.length);
        System.arraycopy(array, index, dst, dstIndex, length);
        return this;
    }
    ....
    @Override
    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.capacity());
        if (src.hasMemoryAddress()) {
            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
        } else  if (src.hasArray()) {
            setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
        } else {
            src.getBytes(srcIndex, array, index, length);
        }
        return this;
    }

    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.length);
        System.arraycopy(src, srcIndex, array, index, length);
        return this;
    }
    ...

    @Override
    public ByteBuffer nioBuffer(int index, int length) {
        ensureAccessible();
        return ByteBuffer.wrap(array, index, length).slice();
    }

    @Override
    public ByteBuffer[] nioBuffers(int index, int length) {
        return new ByteBuffer[] { nioBuffer(index, length) };
    }

    @Override
    public ByteBuffer internalNioBuffer(int index, int length) {
        checkIndex(index, length);
        return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
    }
    ...
    private ByteBuffer internalNioBuffer() {
        ByteBuffer tmpNioBuf = this.tmpNioBuf;
        if (tmpNioBuf == null) {
            this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
        }
        return tmpNioBuf;
    }
    ...
     @Override
    protected void deallocate() {
        freeArray(array);
        array = null;
    }
    //待子类扩展
    void freeArray(byte[] array) {
        // NOOP
    }
}

从上面可以看出,非池类堆字节buf,实际为一个字节数组。

//堆内存使用更新与释放,委托给内部度量器
void incrementHeap(int amount) {
    metric.heapCounter.add(amount);
}
void decrementHeap(int amount) {
    metric.heapCounter.add(-amount);
}


再来看分配direct buf:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = PlatformDependent.useDirectBufferNoCleaner() ?
                new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

从上面来看direct buf有三种,分别为InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,
InstrumentedUnpooledUnsafeDirectByteBuf,InstrumentedUnpooledDirectByteBuf

我们分别来看这三种:
1.
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
        extends UnpooledUnsafeNoCleanerDirectByteBuf {
    InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
        int capacity = oldBuffer.capacity();
        ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity);
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}

//UnpooledUnsafeNoCleanerDirectByteBuf
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {

    UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }
    ...
}

再来看第二种:
2.
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
    InstrumentedUnpooledUnsafeDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}

//UnpooledUnsafeDirectByteBuf
/**
 * A NIO {@link ByteBuffer} based buffer.  It is recommended to use {@link Unpooled#directBuffer(int)}
 * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
 * constructor explicitly.
 */
public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    private ByteBuffer tmpNioBuf;
    private int capacity;//容量
    private boolean doNotFree;//是否需要释放内存
    ByteBuffer buffer;//内部nio 字节buf
    long memoryAddress;
    /**
     * Creates a new direct buffer.
     *
     * @param initialCapacity the initial capacity of the underlying direct buffer
     * @param maxCapacity     the maximum capacity of the underlying direct buffer
     */
    protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
    }

    /**
     * Creates a new direct buffer by wrapping the specified initial buffer.
     *
     * @param maxCapacity the maximum capacity of the underlying direct buffer
     */
    protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
        // We never try to free the buffer if it was provided by the end-user as we not know if this is an duplicate or
        // an slice. This is done to prevent an IllegalArgumentException when using Java9 as Unsafe.invokeCleaner(...)
        // will check if the given buffer is either an duplicate or slice and in this case throw an
        // IllegalArgumentException.
        //
        // See http://hg.openjdk.java.net/jdk9/hs-demo/jdk/file/0d2ab72ba600/src/jdk.unsupported/share/classes/
        // sun/misc/Unsafe.java#l1250
        //
        // We also call slice() explicitly here to preserve behaviour with previous netty releases.
        this(alloc, initialBuffer.slice(), maxCapacity, false);
    }

    UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity, boolean doFree) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialBuffer == null) {
            throw new NullPointerException("initialBuffer");
        }
        if (!initialBuffer.isDirect()) {
            throw new IllegalArgumentException("initialBuffer is not a direct buffer.");
        }
        if (initialBuffer.isReadOnly()) {
            throw new IllegalArgumentException("initialBuffer is a read-only buffer.");
        }

        int initialCapacity = initialBuffer.remaining();
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        doNotFree = !doFree;
        setByteBuffer(initialBuffer.order(ByteOrder.BIG_ENDIAN), false);
        writerIndex(initialCapacity);
    }

    /**
     * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
     分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf
     */
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
   ...
}

从上面我们可以看出,非池类Direct buf,实际为一个nio 字节buf。
再来看第三种:
3.
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
    InstrumentedUnpooledDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}



//UnpooledDirectByteBuf
/**
 * A NIO {@link ByteBuffer} based buffer.  It is recommended to use {@link Unpooled#directBuffer(int)}
 * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
 * constructor explicitly.
 */
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;

    private ByteBuffer buffer;//内部字节buf
    private ByteBuffer tmpNioBuf;
    private int capacity;//容量
    private boolean doNotFree;//是否需要释放内存

    /**
     * Creates a new direct buffer.
     *
     * @param initialCapacity the initial capacity of the underlying direct buffer
     * @param maxCapacity     the maximum capacity of the underlying direct buffer
     */
    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }

    /**
     * Creates a new direct buffer by wrapping the specified initial buffer.
     *
     * @param maxCapacity the maximum capacity of the underlying direct buffer
     */
    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialBuffer == null) {
            throw new NullPointerException("initialBuffer");
        }
        if (!initialBuffer.isDirect()) {
            throw new IllegalArgumentException("initialBuffer is not a direct buffer.");
        }
        if (initialBuffer.isReadOnly()) {
            throw new IllegalArgumentException("initialBuffer is a read-only buffer.");
        }

        int initialCapacity = initialBuffer.remaining();
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        doNotFree = true;
        setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
        writerIndex(initialCapacity);
    }

    /**
     * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
     分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf
     */
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
    ...
}


再来看更新direct 内存使用量,更新操作,直接委托给内部度量器:
void incrementDirect(int amount) {
    metric.directCounter.add(amount);
}

void decrementDirect(int amount) {
    metric.directCounter.add(-amount);
}

再来看复合buf

@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
    CompositeByteBuf buf = new CompositeByteBuf(this, false, maxNumComponents);
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
    CompositeByteBuf buf = new CompositeByteBuf(this, true, maxNumComponents);
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}




总结:

非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

附:
我们再来看一下Unpooled:
/**
 * Creates a new {@link ByteBuf} by allocating new space or by wrapping
 * or copying existing byte arrays, byte buffers and a string.
 *Unpooled用于创建一个新的字节buf,包装和拷贝一个已经存在的字节数组,字节buf或String
 * <h3>Use static import</h3>
 * This classes is intended to be used with Java 5 static import statement:
 *
 * <pre>
 * import static io.netty.buffer.{@link Unpooled}.*;
 *
 * {@link ByteBuf} heapBuffer    = buffer(128);
 * {@link ByteBuf} directBuffer  = directBuffer(256);
 * {@link ByteBuf} wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]);
 * {@link ByteBuf} copiedBuffer  = copiedBuffer({@link ByteBuffer}.allocate(128));
 * </pre>
 *
 * <h3>Allocating a new buffer</h3>
 *
 * Three buffer types are provided out of the box.
 *
 * [list]
 * [*]{@link #buffer(int)} allocates a new fixed-capacity heap buffer.

 * [*]{@link #directBuffer(int)} allocates a new fixed-capacity direct buffer.

 * [/list]
 *
 * <h3>Creating a wrapped buffer</h3>
 *包装buf,为底层buf的视图,所有底层buf的改变,包装buf都可以看到
 * Wrapped buffer is a buffer which is a view of one or more existing
 * byte arrays and byte buffers.  Any changes in the content of the original
 * array or buffer will be visible in the wrapped buffer.  Various wrapper
 * methods are provided and their name is all {@code wrappedBuffer()}.
 * You might want to take a look at the methods that accept varargs closely if
 * you want to create a buffer which is composed of more than one array to
 * reduce the number of memory copy.
 *
 * <h3>Creating a copied buffer</h3>
 *拷贝buf,并不会共享数据,底层buf数据的改变,拷贝buf看不到
 * Copied buffer is a deep copy of one or more existing byte arrays, byte
 * buffers or a string.  Unlike a wrapped buffer, there's no shared data
 * between the original data and the copied buffer.  Various copy methods are
 * provided and their name is all {@code copiedBuffer()}.  It is also convenient
 * to use this operation to merge multiple buffers into one buffer.
 */
public final class Unpooled {
    //字节buf分配器,默认为UnpooledByteBufAllocator.DEFAULT
    private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;

    /**
     * Big endian byte order.
     */
    public static final ByteOrder BIG_ENDIAN = ByteOrder.BIG_ENDIAN;

    /**
     * Little endian byte order.
     */
    public static final ByteOrder LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN;

    /**
     * A buffer whose capacity is {@code 0}.
     */
    public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0);

    static {
        assert EMPTY_BUFFER instanceof EmptyByteBuf: "EMPTY_BUFFER must be an EmptyByteBuf.";
    }
}

再来看分配buf:

/**
 * Creates a new big-endian Java heap buffer with reasonably small initial capacity, which
 * expands its capacity boundlessly on demand.
 */
public static ByteBuf buffer() {
    return ALLOC.heapBuffer();
}

/**
 * Creates a new big-endian direct buffer with reasonably small initial capacity, which
 * expands its capacity boundlessly on demand.
 */
public static ByteBuf directBuffer() {
    return ALLOC.directBuffer();
}

/**
 * Creates a new big-endian Java heap buffer with the specified {@code capacity}, which
 * expands its capacity boundlessly on demand.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0}.
 */
public static ByteBuf buffer(int initialCapacity) {
    return ALLOC.heapBuffer(initialCapacity);
}

/**
 * Creates a new big-endian direct buffer with the specified {@code capacity}, which
 * expands its capacity boundlessly on demand.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0}.
 */
public static ByteBuf directBuffer(int initialCapacity) {
    return ALLOC.directBuffer(initialCapacity);
}

/**
 * Returns a new big-endian composite buffer with no components.
 */
public static CompositeByteBuf compositeBuffer() {
    return compositeBuffer(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS);
}

/**
 * Returns a new big-endian composite buffer with no components.
 */
public static CompositeByteBuf compositeBuffer(int maxNumComponents) {
    return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}

从上面来看Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

包装buf相关的方法:
/**
 * Creates a new big-endian buffer which wraps the specified {@code array}.
 * A modification on the specified array's content will be visible to the
 * returned buffer.
 */
public static ByteBuf wrappedBuffer(byte[] array) {
    if (array.length == 0) {
        return EMPTY_BUFFER;
    }
    return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}


/**
 * Creates a new buffer which wraps the specified buffer's readable bytes.
 * A modification on the specified buffer's content will be visible to the
 * returned buffer.
 * @param buffer The buffer to wrap. Reference count ownership of this variable is transfered to this method.
 * @return The readable portion of the {@code buffer}, or an empty buffer if there is no readable portion.
 * The caller is responsible for releasing this buffer.
 */
public static ByteBuf wrappedBuffer(ByteBuf buffer) {
    if (buffer.isReadable()) {
        return buffer.slice();
    } else {
        buffer.release();
        return EMPTY_BUFFER;
    }
}

/**
 * Creates a new big-endian composite buffer which wraps the readable bytes of the
 * specified buffers without copying them.  A modification on the content
 * of the specified buffers will be visible to the returned buffer.
 * @param maxNumComponents Advisement as to how many independent buffers are allowed to exist before
 * consolidation occurs.
 * @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method.
 * @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer.
 */
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
    switch (buffers.length) {
    case 0:
        break;
    case 1:
        ByteBuf buffer = buffers[0];
        if (buffer.isReadable()) {
            return wrappedBuffer(buffer.order(BIG_ENDIAN));
        } else {
            buffer.release();
        }
        break;
    default:
        for (int i = 0; i < buffers.length; i++) {
            ByteBuf buf = buffers[i];
            if (buf.isReadable()) {
                return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length);
            }
            buf.release();
        }
        break;
    }
    return EMPTY_BUFFER;
}

...

拷贝buf相关的方法:

**
 * Creates a new big-endian buffer whose content is a copy of the
 * specified {@code array}.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0} and {@code array.length} respectively.
 */
public static ByteBuf copiedBuffer(byte[] array) {
    if (array.length == 0) {
        return EMPTY_BUFFER;
    }
    return wrappedBuffer(array.clone());
}

/**
 * Creates a new buffer whose content is a copy of the specified
 * {@code buffer}'s readable bytes.  The new buffer's {@code readerIndex}
 * and {@code writerIndex} are {@code 0} and {@code buffer.readableBytes}
 * respectively.
 */
public static ByteBuf copiedBuffer(ByteBuf buffer) {
    int readable = buffer.readableBytes();
    if (readable > 0) {
        ByteBuf copy = buffer(readable);
        copy.writeBytes(buffer, buffer.readerIndex(), readable);
        return copy;
    } else {
        return EMPTY_BUFFER;
    }
}


/**
 * Return a unreleasable view on the given {@link ByteBuf} which will just ignore release and retain calls.
 返回一个不可释放的buf
 */
public static ByteBuf unreleasableBuffer(ByteBuf buf) {
    return new UnreleasableByteBuf(buf);
}


//UnreleasableByteBuf
final class UnreleasableByteBuf extends WrappedByteBuf {

    private SwappedByteBuf swappedBuf;
    ...
     @Override
    public ByteBuf retain(int increment) {
        return this;
    }

    @Override
    public ByteBuf retain() {
        return this;
    }

    @Override
    public ByteBuf touch() {
        return this;
    }

    @Override
    public ByteBuf touch(Object hint) {
        return this;
    }

    @Override
    public boolean release() {
        return false;
    }

    @Override
    public boolean release(int decrement) {
        return false;
    }
}

//WrappedByteBuf,理解为字节静态代理
class WrappedByteBuf extends ByteBuf {

    protected final ByteBuf buf; 
    @Override
    public final int readerIndex() {
        return buf.readerIndex();
    }

    @Override
    public final ByteBuf readerIndex(int readerIndex) {
        buf.readerIndex(readerIndex);
        return this;
    }

    @Override
    public final int writerIndex() {
        return buf.writerIndex();
    }

    @Override
    public final ByteBuf writerIndex(int writerIndex) {
        buf.writerIndex(writerIndex);
        return this;
    }
    ...
}


/**
 * Wrap the given {@link ByteBuf}s in an unmodifiable {@link ByteBuf}. Be aware the returned {@link ByteBuf} will
 * not try to slice the given {@link ByteBuf}s to reduce GC-Pressure.
 包装buf为不可修改字节buf
 */
public static ByteBuf unmodifiableBuffer(ByteBuf... buffers) {
    return new FixedCompositeByteBuf(ALLOC, buffers);
}

//FixedCompositeByteBuf
/**
 * {@link ByteBuf} implementation which allows to wrap an array of {@link ByteBuf} in a read-only mode.
 * This is useful to write an array of {@link ByteBuf}s.
 */
final class FixedCompositeByteBuf extends AbstractReferenceCountedByteBuf {
    private static final ByteBuf[] EMPTY = { Unpooled.EMPTY_BUFFER };
    private final int nioBufferCount;
    private final int capacity;
    private final ByteBufAllocator allocator;
    private final ByteOrder order;
    private final Object[] buffers;
    private final boolean direct;
    ...
}



//Unpooled





  • 大小: 86.2 KB
  • 大小: 81.8 KB
0
0
分享到:
评论

相关推荐

    基于netty4 的udp字节数据接收服务

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4 构建基于 UDP(用户数据报协议)的数据接收服务,以及如何实现...

    java实现基于netty 的udp字节数据接收服务

    在Netty中,处理UDP数据接收服务不仅限于上述的基本步骤,还可以利用Netty的丰富功能进行扩展,如添加自定义的协议编解码器、实现负载均衡、健康检查等。总之,Netty为构建高效、灵活的UDP服务提供了一整套强大的...

    netty编码器,解码器实例

    Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...

    Netty进制转换乱码问题

    5. **ByteToMessageDecoder**:Netty提供了一种方便的解码器`ByteToMessageDecoder`,可以自定义解码逻辑。如果接收到的数据格式复杂,可以编写一个子类并覆盖`decode()`方法,确保在解码过程中正确处理字符编码。 ...

    Netty之自定义编解码器.zip

    本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...

    Netty+自定义Protobuf编解码器

    在Netty中,编解码器是处理网络数据流的关键组件,它们负责将原始字节流转换为易于处理的对象,反之亦然。通常,Netty使用管道(Pipeline)来传递和处理这些字节流,每个管道包含一系列的处理节点,即Handler。编码...

    netty 在java中的字节码转换

    netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。

    netty官网学习手册中文版

    - **ByteBuf**:Netty自定义的高效字节缓冲区,比Java的ByteBuffer更易用且性能更好。 - **ChannelHandler**:处理器接口,用于实现I/O事件的处理逻辑。 - **Bootstrap**:启动器,用于配置并启动服务器或客户端...

    netty实现的聊天代码

    Netty 提供了多种编解码器,如 `StringDecoder` 和 `StringEncoder`,可以方便地将字符串数据转换成 ByteBuf(Netty 的字节缓冲区),反之亦然。在本示例中,可能已经使用了这些编解码器来处理消息。 为了实现实时...

    Netty 框架学习 —— 编解码器框架(csdn)————程序.pdf

    在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...

    netty服务器解析16进制数据

    分析这个代码,我们可以看到Netty如何创建服务器、设置管道、以及如何定义和使用自定义的解码器和编码器来处理16进制数据。 通过上述步骤,Netty服务器可以轻松地解析16进制数据,从而支持各种网络协议,无论它们是...

    netty所需要得jar包

    - Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...

    NettyDemo Netty使用实例,对象传递调用

    对于Netty来说,编码器和解码器的实现通常需要遵循以下步骤: 1. 创建自定义的编码器类,继承MessageToByteEncoder,重写encode方法,将Java对象转换为ByteBuf。 2. 创建自定义的解码器类,继承...

    netty-netty-4.1.69.Final.tar.gz

    2. **高效性**:Netty通过减少对象创建和内存复制来优化性能,例如,它的ByteBuf类提供了高效的字节缓冲区管理。 3. **灵活性**:Netty支持多种网络协议,如TCP、UDP、HTTP、WebSocket、FTP等,以及自定义协议。它...

    跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty.zip

    2. **Netty架构**:Netty采用了反应器模式,包含Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)等组件,构建了高效的事件驱动模型。...

    Netty进阶之路-跟着案例学Netty

    在高级特性部分,书籍会涉及Netty的编解码器,如LineBasedFrameDecoder用于处理以换行符分隔的协议,以及Delimiters用于识别特定分隔符的协议。同时,自定义编解码器的编写也是进阶学习的重点,这使得Netty能适应...

    netty分隔符和定长解码器的应用

    在Netty中,解码器是处理接收数据的重要组件,用于将接收到的原始字节流转换为可操作的对象。本篇将深入探讨Netty中的两种解码器:分隔符解码器(DelimiterBasedFrameDecoder)和定长解码器...

    netty4.0源码,netty例子,netty api文档

    3. **ByteBuf**:Netty提供的ByteBuf作为字节缓冲区,比Java的ByteBuffer更加灵活和高效。它支持读写指针独立移动,避免了不必要的内存复制。 4. **Pipeline**:Netty的ChannelHandler链是通过Pipeline实现的。...

    整合netty实时通讯

    - Netty 的 ByteBuf 提供了高效的内存管理,避免了频繁的字节数组拷贝。 2. **WebSocket 协议** - WebSocket 协议通过 HTTP/HTTPS 协议握手建立连接,之后即可在连接上进行双向数据传输,无需重复的 HTTP 请求/...

    netty实战教程、netty代码demo

    5. **EventLoop** 和 **EventLoopGroup**:线程模型,EventLoop 负责执行 Channel 上的 I/O 操作和事件处理,EventLoopGroup 是 EventLoop 的管理器,负责分配和调度 EventLoop。 三、Netty 的事件驱动模型 Netty ...

Global site tag (gtag.js) - Google Analytics