netty Pooled字节buf分配器

非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

PooledByteBufAllocator:分配heap、direct buffer
PoolChunk: 管理实际的底层内存,内部已内存Page组成
默认情况下,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

看了上面的文章,简单理一下,我们使用内存,实际为机器内存的Memory Mapping Region区域,
PoolArena可以理解为mmap中内存分配区,分配区由内存块PoolChunk组成,内存块以内存Page管理内存,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

来看Pooled 字节buf分配器
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    private static final int DEFAULT_NUM_HEAP_ARENA;
    private static final int DEFAULT_NUM_DIRECT_ARENA;

    private static final int DEFAULT_PAGE_SIZE;
    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
    private static final int DEFAULT_TINY_CACHE_SIZE;//默认 tiny buf 缓存size
    private static final int DEFAULT_SMALL_CACHE_SIZE;//默认 small buf 缓存size
    private static final int DEFAULT_NORMAL_CACHE_SIZE;//默认正常buf 缓存size
    private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
    private static final int DEFAULT_CACHE_TRIM_INTERVAL;
    private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;//是否为所有线程使用buf缓存
    private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;

    private static final int MIN_PAGE_SIZE = 4096;
    private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);

    static {
        int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
        Throwable pageSizeFallbackCause = null;
        try {
        } catch (Throwable t) {
            pageSizeFallbackCause = t;
            defaultPageSize = 8192;
        DEFAULT_PAGE_SIZE = defaultPageSize;

        int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
        Throwable maxOrderFallbackCause = null;
        try {
            validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
        } catch (Throwable t) {
            maxOrderFallbackCause = t;
            defaultMaxOrder = 11;
        DEFAULT_MAX_ORDER = defaultMaxOrder;

        // Determine reasonable default for nHeapArena and nDirectArena.
        // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
        final Runtime runtime = Runtime.getRuntime();

         * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
         * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
         * allocation and de-allocation needs to be synchronized on the PoolArena.
         * See https://github.com/netty/netty/issues/3888.
        final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                        (int) Math.min(
                                runtime.maxMemory() / defaultChunkSize / 2 / 3)));
        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
                        (int) Math.min(
                                PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

        // cache sizes 默认tiny为512,small为256,normal为64
        DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
        DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
        DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

        // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
        // 'Scalable memory allocation using jemalloc',默认最大缓存容量为32kb
        DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
                "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

        // the number of threshold of allocations when cached entries will be freed up if not frequently used
        DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
                "io.netty.allocator.cacheTrimInterval", 8192);
        DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
                "io.netty.allocator.useCacheForAllThreads", true);

        DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
                "io.netty.allocator.directMemoryCacheAlignment", 0);

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
            logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
            if (pageSizeFallbackCause == null) {
                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
            } else {
                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
            if (maxOrderFallbackCause == null) {
                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
            } else {
                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
            logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
            logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
            logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
            logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
    public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

    private final PoolArena<byte[]>[] heapArenas;//堆buf分配区
    private final PoolArena<ByteBuffer>[] directArenas;//direct buf缓存分配区
    private final int tinyCacheSize;//tiny buf 缓存size
    private final int smallCacheSize;//small buf 缓存size   
    private final int normalCacheSize;//正常buf 缓存size    
    private final List<PoolArenaMetric> heapArenaMetrics;//堆buf分配区度量器
    private final List<PoolArenaMetric> directArenaMetrics;//direct buf分配区度量器
    private final PoolThreadLocalCache threadCache;//线程本地字节buf缓存
    private final int chunkSize;//分配区内存块size
    private final PooledByteBufAllocatorMetric metric;//buf 分配器,度量器
    public PooledByteBufAllocator() {

    public PooledByteBufAllocator(boolean preferDirect) {
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
	//线程buf 缓存为PoolThreadLocalCache
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        if (nHeapArena < 0) {
            throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
        if (nDirectArena < 0) {
            throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");

        if (directMemoryCacheAlignment < 0) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: >= 0)");
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");

        int pageShifts = validateAndCalculatePageShifts(pageSize);
        if (nHeapArena > 0) {
	   //创建堆buf 分配区
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                heapArenas[i] = arena;
		//添加pool 堆buf 分配区到堆分配区度量集
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();

        if (nDirectArena > 0) {
	    //创建direct buf 分配区
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
		//添加pool direct buf 分配区到堆分配区度量集
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        metric = new PooledByteBufAllocatorMetric(this);

 private static <T> PoolArena<T>[] newArenaArray(int size) {
     return new PoolArena[size];

从上面可以看出Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区,每个Region的内存块size为chunkSize,每个内存块内存页大小,默认为8k。

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    final ByteBuf buf;
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    return toLeakAwareBuffer(buf);

来看从堆分配区获取堆buf,PoolArena同时为Pool buf分配区量器,获取buf,实际是从PoolThreadCache中获取
abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建Pooled buf
        allocate(cache, buf, reqCapacity);//从缓冲获取堆buf
        return buf;
    //创建Pooled buf,待子类扩展
    protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
        //容量小于页size,即tiny 或small buf
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;//sub page pool

            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
     static final class HeapArena extends PoolArena<byte[]> {

        HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
                int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
            super(parent, pageSize, maxOrder, pageShifts, chunkSize,
        protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
            return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
                    : PooledHeapByteBuf.newInstance(maxCapacity);
    //direct buf 缓存
     static final class DirectArena extends PoolArena<ByteBuffer> {

        DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
                int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
            super(parent, pageSize, maxOrder, pageShifts, chunkSize,
	//创建Pooled Direct buf
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);

public interface PoolArenaMetric {

     * Returns the number of thread caches backed by this arena.
    int numThreadCaches();
     * Returns the number of tiny sub-pages for the arena.
     tiny sub page数量
    int numTinySubpages();
     * Returns the number of small sub-pages for the arena.
     small sub page数量
    int numSmallSubpages();
     * Returns the number of chunk lists for the arena.
    int numChunkLists();

从上面来看,PoolArena根据容量来决定创建tiny,small还是Normal buf:
final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    final PoolArena<byte[]> heapArena;//堆分配区
    final PoolArena<ByteBuffer> directArena;//direct buf分配区
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;//tiny subpage 堆缓存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;//tiny subpage direct缓存
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;//small subpage 堆缓存
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;//small subpage direct缓存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;//normal subpage 堆缓存
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;//normal subpage direct缓存

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;

    private final Thread deathWatchThread;
    private final Runnable freeTask;

    private int allocations;
     * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
    boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
    private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
        if (area.isDirect()) {
            int idx = log2(normCapacity >> numShiftsNormalDirect);
            return cache(normalDirectCaches, idx);
        int idx = log2(normCapacity >> numShiftsNormalHeap);
        return cache(normalHeapCaches, idx);

    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
        if (cache == null || idx > cache.length - 1) {
            return null;
        return cache[idx];
     @SuppressWarnings({ "unchecked", "rawtypes" })
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
        return allocated;
     private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;
         * Allocate something out of the cache if possible and remove the entry from the cache.
       public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
             Entry<T> entry = queue.poll();
             if (entry == null) {
                 return false;
             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
             ++ allocations;
             return true;
          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
                                             PooledByteBuf<T> buf, int reqCapacity);
         * Cache used for buffers which are backed by NORMAL size.
	 正常size buf的内存Region 缓存
        private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
            NormalMemoryRegionCache(int size) {
                super(size, SizeClass.Normal);
            protected void initBuf(
                    PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
                chunk.initBuf(buf, handle, reqCapacity);

final class PoolChunk<T> implements PoolChunkMetric {

    private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;

    final PoolArena<T> arena;//关联缓存池
    final T memory;
    final boolean unpooled;
    final int offset;

    private final byte[] memoryMap;
    private final byte[] depthMap;
    private final PoolSubpage<T>[] subpages;//内存页
    /** Used to determine if the requested capacity is equal to or greater than pageSize. */
    private final int subpageOverflowMask;
    private final int pageSize;//内存页size
    private final int pageShifts;
    private final int maxOrder;
    private final int chunkSize;
    private final int log2ChunkSize;
    private final int maxSubpageAllocs;
    /** Used to mark memory as unusable */
    private final byte unusable;

    private int freeBytes;

    PoolChunkList<T> parent;
    PoolChunk<T> prev;
    PoolChunk<T> next;
    void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

    private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;

    protected PoolChunk<T> chunk;//内存块
    protected long handle;
    protected T memory;
    protected int offset;
    protected int length;
    int maxLength;
    PoolThreadCache cache;
    private ByteBuffer tmpNioBuf;
    private ByteBufAllocator allocator;
    void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, handle, offset, length, maxLength, cache);

    void initUnpooled(PoolChunk<T> chunk, int length) {
        init0(chunk, 0, chunk.offset, length, length, null);

    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;

 * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);

 * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    return cache(tinySubPageHeapCaches, idx);
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.smallIdx(normCapacity);
    if (area.isDirect()) {
        return cache(smallSubPageDirectCaches, idx);
    return cache(smallSubPageHeapCaches, idx);
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    return cache[idx];

 * Cache used for buffers which are backed by TINY or SMALL size.
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
    SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
        super(size, sizeClass);

    protected void initBuf(
            PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
        chunk.initBufWithSubpage(buf, handle, reqCapacity);

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        //从direct分配区,分配一个direct buf
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    return toLeakAwareBuffer(buf);


我们来简单看一Pooled 堆和direct buf

 static final class HeapArena extends PoolArena<byte[]> {

    HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
            int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
        super(parent, pageSize, maxOrder, pageShifts, chunkSize,
    protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
        return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
                : PooledHeapByteBuf.newInstance(maxCapacity);

//direct buf 分配区
 static final class DirectArena extends PoolArena<ByteBuffer> {

    DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
            int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
        super(parent, pageSize, maxOrder, pageShifts, chunkSize,
	//创建Pooled Direct buf
    protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
        if (HAS_UNSAFE) {
            return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
        } else {
            return PooledDirectByteBuf.newInstance(maxCapacity);

从上面可以看出Pool 堆buf为,PooledUnsafeHeapByteBuf、PooledHeapByteBuf
direct buf为PooledUnsafeDirectByteBuf、PooledDirectByteBuf。

final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {

    private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {
        protected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {
            return new PooledUnsafeHeapByteBuf(handle, 0);

    static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
        PooledUnsafeHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        return buf;

class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

    private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
            return new PooledHeapByteBuf(handle, 0);

    static PooledHeapByteBuf newInstance(int maxCapacity) {
        PooledHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        return buf;

final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        return buf;

final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
        protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
            return new PooledDirectByteBuf(handle, 0);

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        return buf;

 * Method must be called before reuse this {@link PooledByteBufAllocator}
final void reuse(int maxCapacity) {
    setIndex0(0, 0);//重置读写索引


 * Light-weight object pool based on a thread-local stack.
 * @param <T> the type of the pooled object
public abstract class Recycler<T> {
   private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();//从对象栈中获取对象handle
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        return (T) handle.value;
    static final class Stack<T> {
        final Recycler<T> parent;
        final Thread thread;
        final AtomicInteger availableSharedCapacity;
        final int maxDelayedQueues;

        private final int maxCapacity;
        private final int ratioMask;
        private DefaultHandle<?>[] elements;
        private int size;
        private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
	 DefaultHandle<T> newHandle() {
            return new DefaultHandle<T>(this);
     protected abstract T newObject(Handle<T> handle);


//线程buf 缓存为PoolThreadLocalCache
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    private final boolean useCacheForAllThreads;

    PoolThreadLocalCache(boolean useCacheForAllThreads) {
        this.useCacheForAllThreads = useCacheForAllThreads;
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
            return new PoolThreadCache(
                    heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
        // No caching for non FastThreadLocalThreads.
        return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
    //获取最少被线程使用的buf 缓存
     private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
        if (arenas == null || arenas.length == 0) {
            return null;
        PoolArena<T> minArena = arenas[0];
        for (int i = 1; i < arenas.length; i++) {
            PoolArena<T> arena = arenas[i];
            if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                minArena = arena;

        return minArena;
    protected void onRemoval(PoolThreadCache threadCache) {


metric = new PooledByteBufAllocatorMetric(this);

 * Exposed metric for {@link PooledByteBufAllocator}.
public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
    private final PooledByteBufAllocator allocator;

    PooledByteBufAllocatorMetric(PooledByteBufAllocator allocator) {
        this.allocator = allocator;
     * Return the number of heap arenas.
    public int numHeapArenas() {
        return allocator.numHeapArenas();
     * Return the number of direct arenas.
    public int numDirectArenas() {
        return allocator.numDirectArenas();
     * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
    public List<PoolArenaMetric> heapArenas() {
        return allocator.heapArenas();
     * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
     direct buf缓存度量器
    public List<PoolArenaMetric> directArenas() {
        return allocator.directArenas();
     * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
    public int numThreadLocalCaches() {
        return allocator.numThreadLocalCaches();
     * Return the size of the tiny cache.
    public int tinyCacheSize() {
        return allocator.tinyCacheSize();
     * Return the size of the small cache.
    public int smallCacheSize() {
        return allocator.smallCacheSize();
     * Return the size of the normal cache.
    public int normalCacheSize() {
        return allocator.normalCacheSize();
     * Return the chunk size for an arena.
    public int chunkSize() {
        return allocator.chunkSize();
    public long usedHeapMemory() {
        return allocator.usedHeapMemory();
    public long usedDirectMemory() {
        return allocator.usedDirectMemory();
    public String toString() {
        StringBuilder sb = new StringBuilder(256);
                .append("(usedHeapMemory: ").append(usedHeapMemory())
                .append("; usedDirectMemory: ").append(usedDirectMemory())
                .append("; numHeapArenas: ").append(numHeapArenas())
                .append("; numDirectArenas: ").append(numDirectArenas())
                .append("; tinyCacheSize: ").append(tinyCacheSize())
                .append("; smallCacheSize: ").append(smallCacheSize())
                .append("; normalCacheSize: ").append(normalCacheSize())
                .append("; numThreadLocalCaches: ").append(numThreadLocalCaches())
                .append("; chunkSize: ").append(chunkSize()).append(')');
        return sb.toString();

 * Return the number of heap arenas.
 * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}.
public int numHeapArenas() {
    return heapArenaMetrics.size();

 * Return the number of direct arenas.
 * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}.
public int numDirectArenas() {
    return directArenaMetrics.size();

 * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
 * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}.
public List<PoolArenaMetric> heapArenas() {
    return heapArenaMetrics;

 * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
 * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}.
public List<PoolArenaMetric> directArenas() {
    return directArenaMetrics;

 * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
 * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}.
public int numThreadLocalCaches() {
    PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;
    if (arenas == null) {
        return 0;

    int total = 0;
    for (PoolArena<?> arena : arenas) {
        total += arena.numThreadCaches.get();

    return total;

 * Return the size of the tiny cache.
 * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}.
public int tinyCacheSize() {
    return tinyCacheSize;

 * Return the size of the small cache.
 * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}.
public int smallCacheSize() {
    return smallCacheSize;

 * Return the size of the normal cache.
 * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}.
public int normalCacheSize() {
    return normalCacheSize;

 * Return the chunk size for an arena.
 * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}.
public final int chunkSize() {
    return chunkSize;

final long usedHeapMemory() {
    return usedMemory(heapArenas);

final long usedDirectMemory() {
    return usedMemory(directArenas);

private static long usedMemory(PoolArena<?>... arenas) {
    if (arenas == null) {
        return -1;
    long used = 0;
    for (PoolArena<?> arena : arenas) {
        used += arena.numActiveBytes();
        if (used < 0) {
            return Long.MAX_VALUE;
    return used;

final PoolThreadCache threadCache() {
    return threadCache.get();

 * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
 * and so should not called too frequently.
public String dumpStats() {
    int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;
    StringBuilder buf = new StringBuilder(512)
            .append(" heap arena(s):")
    if (heapArenasLen > 0) {
        for (PoolArena<byte[]> a: heapArenas) {

    int directArenasLen = directArenas == null ? 0 : directArenas.length;

       .append(" direct arena(s):")
    if (directArenasLen > 0) {
        for (PoolArena<ByteBuffer> a: directArenas) {
    return buf.toString();

Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。



