- 浏览: 984409 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:[url]http://donald-draper.iteye.com/blog/2394619[/url
引言:
上一篇文章我们看了,Unpooled字节buf分配器,先来回顾一下:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
今天来看一下Pooled字节buf非配器,不做深入研究,仅仅窥探一下,由于本人当前能力有限,只能简单看一下,由于Pooled字节分配器可能涉及到很多内存相关的概念,可以参考一下连接:
深入浅出Netty内存管理PoolChunk:http://blog.jobbole.com/106001/
Netty4 中的内存管理:http://www.cnblogs.com/ungshow/p/3541737.html
Netty5源码学习之buffer篇(一)PooledHeapByteBuf :https://yq.aliyun.com/articles/55623
Netty系列之Netty百万级推送服务设计要点:http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/
关于内存分配策略可以参考这篇文章,
http://www.360doc.com/content/13/0915/09/8363527_314549128.shtml
现在对内存这段研究的不交少,有兴趣的可以先在国内搜索一些内存的相关策略,
在去国外搜索具体内存分配策略相关的论文或专业解析,由于本人还没有深入到操作系统分配这一块,
有时间研究一下,希望这不是接口。
下面两篇文章是具体的应用和内存分配策略比较,虽然有点粗糙,重要的是我们要吸收精华部分,
扯远了,扯了蛋了,疼...
浅谈redis采用不同内存分配器tcmalloc和jemalloc:http://www.jb51.net/article/100575.htm
jemalloc优化MySQL、Nginx内存管理:https://blog.linuxeye.cn/356.html
PooledByteBufAllocator:分配heap、direct buffer
PoolArena:一块逻辑上的内存池,用来管理和组织buffer的,内部数据结构较复杂。
PoolChunk: 管理实际的底层内存,内部已内存Page组成
默认情况下,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。
看了上面的文章,简单理一下,我们使用内存,实际为机器内存的Memory Mapping Region区域,
PoolArena可以理解为mmap中内存分配区,分配区由内存块PoolChunk组成,内存块以内存Page管理内存,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。
来看Pooled 字节buf分配器
//创建内存分配区
从上面可以看出Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区,每个Region的内存块size为chunkSize,每个内存块内存页大小,默认为8k。
来看创建堆buf:
来看从堆分配区获取堆buf,PoolArena同时为Pool buf分配区量器,获取buf,实际是从PoolThreadCache中获取
//PoolArenaMetric
从上面来看,PoolArena根据容量来决定创建tiny,small还是Normal buf:
我们以Normal为例:
从线程本地缓存获取buf
//PoolThreadCache
//PoolChunk,内存块
//PooledByteBuf
再来看分配其他两种分配tiny和small:
再来看创建direct类型buf:
这个思路与创建堆buf思路一致。
我们来简单看一Pooled 堆和direct buf
先来看堆分配区和direct分配区,分配buf
//direct buf 分配区
从上面可以看出Pool 堆buf为,PooledUnsafeHeapByteBuf、PooledHeapByteBuf
direct buf为PooledUnsafeDirectByteBuf、PooledDirectByteBuf。
我们分别来简单看一下这四种buf:
//PooledUnsafeHeapByteBuf
//PooledHeapByteBuf
//PooledUnsafeDirectByteBuf
//PooledDirectByteBuf
//PooledByteBuf
在简单看一下字节buf内存的回收器Recycler
从上面可以看出,Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将
对象放在一个线程本地栈中管理。
再来简单看一下线程本地buf缓存池:
//线程本地buf缓存
再来简单看一下buf缓存池度量器:
再来看Pooledd字节分配器的其他方法:
//PooledByteBufAllocator
总结:
Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:[url]http://donald-draper.iteye.com/blog/2394619[/url
引言:
上一篇文章我们看了,Unpooled字节buf分配器,先来回顾一下:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
今天来看一下Pooled字节buf非配器,不做深入研究,仅仅窥探一下,由于本人当前能力有限,只能简单看一下,由于Pooled字节分配器可能涉及到很多内存相关的概念,可以参考一下连接:
深入浅出Netty内存管理PoolChunk:http://blog.jobbole.com/106001/
Netty4 中的内存管理:http://www.cnblogs.com/ungshow/p/3541737.html
Netty5源码学习之buffer篇(一)PooledHeapByteBuf :https://yq.aliyun.com/articles/55623
Netty系列之Netty百万级推送服务设计要点:http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/
关于内存分配策略可以参考这篇文章,
http://www.360doc.com/content/13/0915/09/8363527_314549128.shtml
现在对内存这段研究的不交少,有兴趣的可以先在国内搜索一些内存的相关策略,
在去国外搜索具体内存分配策略相关的论文或专业解析,由于本人还没有深入到操作系统分配这一块,
有时间研究一下,希望这不是接口。
下面两篇文章是具体的应用和内存分配策略比较,虽然有点粗糙,重要的是我们要吸收精华部分,
扯远了,扯了蛋了,疼...
浅谈redis采用不同内存分配器tcmalloc和jemalloc:http://www.jb51.net/article/100575.htm
jemalloc优化MySQL、Nginx内存管理:https://blog.linuxeye.cn/356.html
PooledByteBufAllocator:分配heap、direct buffer
PoolArena:一块逻辑上的内存池,用来管理和组织buffer的,内部数据结构较复杂。
PoolChunk: 管理实际的底层内存,内部已内存Page组成
默认情况下,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。
看了上面的文章,简单理一下,我们使用内存,实际为机器内存的Memory Mapping Region区域,
PoolArena可以理解为mmap中内存分配区,分配区由内存块PoolChunk组成,内存块以内存Page管理内存,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。
来看Pooled 字节buf分配器
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { private static final int DEFAULT_NUM_HEAP_ARENA; private static final int DEFAULT_NUM_DIRECT_ARENA; private static final int DEFAULT_PAGE_SIZE; private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk private static final int DEFAULT_TINY_CACHE_SIZE;//默认 tiny buf 缓存size private static final int DEFAULT_SMALL_CACHE_SIZE;//默认 small buf 缓存size private static final int DEFAULT_NORMAL_CACHE_SIZE;//默认正常buf 缓存size private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;//是否为所有线程使用buf缓存 private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); static { //获取默认内存页size int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192); Throwable pageSizeFallbackCause = null; try { validateAndCalculatePageShifts(defaultPageSize); } catch (Throwable t) { pageSizeFallbackCause = t; defaultPageSize = 8192; } DEFAULT_PAGE_SIZE = defaultPageSize; int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11); Throwable maxOrderFallbackCause = null; try { validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder); } catch (Throwable t) { maxOrderFallbackCause = t; defaultMaxOrder = 11; } DEFAULT_MAX_ORDER = defaultMaxOrder; // Determine reasonable default for nHeapArena and nDirectArena. // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory. final Runtime runtime = Runtime.getRuntime(); /* * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as * allocation and de-allocation needs to be synchronized on the PoolArena. * * See https://github.com/netty/netty/issues/3888. */ //最小内存分配区的数量,默认最小缓冲buf数量为处理器的2倍 final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2; final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER; DEFAULT_NUM_HEAP_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numHeapArenas", (int) Math.min( defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3))); DEFAULT_NUM_DIRECT_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numDirectArenas", (int) Math.min( defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); // cache sizes 默认tiny为512,small为256,normal为64 DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in // 'Scalable memory allocation using jemalloc',默认最大缓存容量为32kb DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt( "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024); // the number of threshold of allocations when cached entries will be freed up if not frequently used //分配次数阈值,可以可以理解当缓存多久不用时,释放 DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( "io.netty.allocator.cacheTrimInterval", 8192); //默认开启线程buf缓存 DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean( "io.netty.allocator.useCacheForAllThreads", true); DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt( "io.netty.allocator.directMemoryCacheAlignment", 0); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); if (pageSizeFallbackCause == null) { logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE); } else { logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause); } if (maxOrderFallbackCause == null) { logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER); } else { logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); } logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE); logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS); } } public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); private final PoolArena<byte[]>[] heapArenas;//堆buf分配区 private final PoolArena<ByteBuffer>[] directArenas;//direct buf缓存分配区 private final int tinyCacheSize;//tiny buf 缓存size private final int smallCacheSize;//small buf 缓存size private final int normalCacheSize;//正常buf 缓存size private final List<PoolArenaMetric> heapArenaMetrics;//堆buf分配区度量器 private final List<PoolArenaMetric> directArenaMetrics;//direct buf分配区度量器 private final PoolThreadLocalCache threadCache;//线程本地字节buf缓存 private final int chunkSize;//分配区内存块size private final PooledByteBufAllocatorMetric metric;//buf 分配器,度量器 public PooledByteBufAllocator() { this(false); } @SuppressWarnings("deprecation") public PooledByteBufAllocator(boolean preferDirect) { this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); } ... public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); //线程buf 缓存为PoolThreadLocalCache threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); if (nHeapArena < 0) { throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)"); } if (nDirectArena < 0) { throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)"); } if (directMemoryCacheAlignment < 0) { throw new IllegalArgumentException("directMemoryCacheAlignment: " + directMemoryCacheAlignment + " (expected: >= 0)"); } if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) { throw new IllegalArgumentException("directMemoryCacheAlignment is not supported"); } if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { throw new IllegalArgumentException("directMemoryCacheAlignment: " + directMemoryCacheAlignment + " (expected: power of two)"); } int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0) { //创建堆buf 分配区 heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { //包装pool缓存为PoolArena.HeapArena PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; //添加pool 堆buf 分配区到堆分配区度量集 metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null; heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0) { //创建direct buf 分配区 directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { //包装pool缓存为PoolArena.DirectArena PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; //添加pool direct buf 分配区到堆分配区度量集 metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } metric = new PooledByteBufAllocatorMetric(this); } }
//创建内存分配区
@SuppressWarnings("unchecked") private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }
从上面可以看出Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区,每个Region的内存块size为chunkSize,每个内存块内存页大小,默认为8k。
来看创建堆buf:
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<byte[]> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { //从堆bu分配区,创建一个堆buf buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } //追踪字节buf资源泄漏情况 return toLeakAwareBuffer(buf); }
来看从堆分配区获取堆buf,PoolArena同时为Pool buf分配区量器,获取buf,实际是从PoolThreadCache中获取
abstract class PoolArena<T> implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); ... PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建Pooled buf allocate(cache, buf, reqCapacity);//从缓冲获取堆buf return buf; } //创建Pooled buf,待子类扩展 protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity); //从缓冲获取堆buf private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); //容量小于页size,即tiny 或small buf if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table;//sub page pool boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } ... } //正常bufsize if (normCapacity <= chunkSize) { if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; } } ... } //堆buf分配区 static final class HeapArena extends PoolArena<byte[]> { HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); } //创建Pooled堆buf @Override protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) { return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) : PooledHeapByteBuf.newInstance(maxCapacity); } ... } //direct buf 缓存 static final class DirectArena extends PoolArena<ByteBuffer> { DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); } //创建Pooled Direct buf @Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } ... } }
//PoolArenaMetric
public interface PoolArenaMetric { /** * Returns the number of thread caches backed by this arena. 返回缓存此arena的线程数 */ int numThreadCaches(); /** * Returns the number of tiny sub-pages for the arena. tiny sub page数量 */ int numTinySubpages(); /** * Returns the number of small sub-pages for the arena. small sub page数量 */ int numSmallSubpages(); /** * Returns the number of chunk lists for the arena. 分配区,内存块数量 */ int numChunkLists(); ... }
从上面来看,PoolArena根据容量来决定创建tiny,small还是Normal buf:
我们以Normal为例:
从线程本地缓存获取buf
//PoolThreadCache
final class PoolThreadCache { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); final PoolArena<byte[]> heapArena;//堆分配区 final PoolArena<ByteBuffer> directArena;//direct buf分配区 // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;//tiny subpage 堆缓存 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;//tiny subpage direct缓存 private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;//small subpage 堆缓存 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;//small subpage direct缓存 private final MemoryRegionCache<byte[]>[] normalHeapCaches;//normal subpage 堆缓存 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;//normal subpage direct缓存 // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private final Thread deathWatchThread; private final Runnable freeTask; private int allocations; ... /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { //从内存域缓存,创建buf return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } //获取对应的缓存region private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { if (area.isDirect()) { int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } int idx = log2(normCapacity >> numShiftsNormalHeap); return cache(normalHeapCaches, idx); } private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; } //从内存域缓存,创建buf @SuppressWarnings({ "unchecked", "rawtypes" }) private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { // no cache found so just return false here return false; } boolean allocated = cache.allocate(buf, reqCapacity); if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } return allocated; } //内存region缓存 private abstract static class MemoryRegionCache<T> { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations; /** * Allocate something out of the cache if possible and remove the entry from the cache. */ public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { //从缓冲buf队列poll一个buf Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); // allocations is not thread-safe which is fine as this is only called from the same thread all time. ++ allocations; return true; } /** * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. 在 */ protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity); ... } } /** * Cache used for buffers which are backed by NORMAL size. 正常size buf的内存Region 缓存 */ private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> { NormalMemoryRegionCache(int size) { super(size, SizeClass.Normal); } //初始化Pooled字节buf @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBuf(buf, handle, reqCapacity); } } ... }
//PoolChunk,内存块
final class PoolChunk<T> implements PoolChunkMetric { private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; final PoolArena<T> arena;//关联缓存池 final T memory; final boolean unpooled; final int offset; private final byte[] memoryMap; private final byte[] depthMap; private final PoolSubpage<T>[] subpages;//内存页 /** Used to determine if the requested capacity is equal to or greater than pageSize. */ private final int subpageOverflowMask; private final int pageSize;//内存页size private final int pageShifts; private final int maxOrder; private final int chunkSize; private final int log2ChunkSize; private final int maxSubpageAllocs; /** Used to mark memory as unusable */ private final byte unusable; private int freeBytes; PoolChunkList<T> parent; PoolChunk<T> prev; PoolChunk<T> next; //初始化Pooled字节buf void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } ... }
//PooledByteBuf
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf { private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle; protected PoolChunk<T> chunk;//内存块 protected long handle; protected T memory; protected int offset; protected int length; int maxLength; PoolThreadCache cache; private ByteBuffer tmpNioBuf; private ByteBufAllocator allocator; ... void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, handle, offset, length, maxLength, cache); } void initUnpooled(PoolChunk<T> chunk, int length) { init0(chunk, 0, chunk.offset, length, length, null); } private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; this.chunk = chunk; memory = chunk.memory; allocator = chunk.arena.parent; this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; } }
再来看分配其他两种分配tiny和small:
/** * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); } //获取tiny内存域缓存 private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } //获取Small内存域缓存 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) { int idx = PoolArena.smallIdx(normCapacity); if (area.isDirect()) { return cache(smallSubPageDirectCaches, idx); } return cache(smallSubPageHeapCaches, idx); } private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; } /** * Cache used for buffers which are backed by TINY or SMALL size. */ private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> { SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); } @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, handle, reqCapacity); } }
再来看创建direct类型buf:
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { //从direct分配区,分配一个direct buf buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } //追踪字节buf资源泄漏情况 return toLeakAwareBuffer(buf); }
这个思路与创建堆buf思路一致。
我们来简单看一Pooled 堆和direct buf
先来看堆分配区和direct分配区,分配buf
//堆buf分配区 static final class HeapArena extends PoolArena<byte[]> { HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); } //创建Pooled堆buf @Override protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) { return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) : PooledHeapByteBuf.newInstance(maxCapacity); } ... }
//direct buf 分配区
static final class DirectArena extends PoolArena<ByteBuffer> { DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); } //创建Pooled Direct buf @Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } ... }
从上面可以看出Pool 堆buf为,PooledUnsafeHeapByteBuf、PooledHeapByteBuf
direct buf为PooledUnsafeDirectByteBuf、PooledDirectByteBuf。
我们分别来简单看一下这四种buf:
//PooledUnsafeHeapByteBuf
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf { private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() { @Override//创建buf protected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) { return new PooledUnsafeHeapByteBuf(handle, 0); } }; static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) { PooledUnsafeHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象 buf.reuse(maxCapacity);//重置Pooled字节buf return buf; } .... }
//PooledHeapByteBuf
class PooledHeapByteBuf extends PooledByteBuf<byte[]> { private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override//创建buf protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) { return new PooledHeapByteBuf(handle, 0); } }; static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象 buf.reuse(maxCapacity);//重置Pooled字节buf return buf; } ... }
//PooledUnsafeDirectByteBuf
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override//创建buf protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { return new PooledUnsafeDirectByteBuf(handle, 0); } }; static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象 buf.reuse(maxCapacity);//重置Pooled字节buf return buf; } ... }
//PooledDirectByteBuf
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override//创建buf protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) { return new PooledDirectByteBuf(handle, 0); } }; static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象 buf.reuse(maxCapacity);//重置Pooled字节buf return buf; } ... }
//PooledByteBuf
重用Pooled字节buf之前,必须调用#reuse方法 /** * Method must be called before reuse this {@link PooledByteBufAllocator} */ final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1);//重置引用计数器 setIndex0(0, 0);//重置读写索引 discardMarks();//丢弃读写索引标记 }
在简单看一下字节buf内存的回收器Recycler
/** * Light-weight object pool based on a thread-local stack. * * @param <T> the type of the pooled object */ public abstract class Recycler<T> { ... //线程本地对象栈 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } }; @SuppressWarnings("unchecked") public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } //获取线程本地对象栈 Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop();//从对象栈中获取对象handle if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; } static final class Stack<T> { final Recycler<T> parent; final Thread thread; final AtomicInteger availableSharedCapacity; final int maxDelayedQueues; private final int maxCapacity; private final int ratioMask; private DefaultHandle<?>[] elements; private int size; private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; //创建对象Hanlde DefaultHandle<T> newHandle() { return new DefaultHandle<T>(this); } } //创建对象 protected abstract T newObject(Handle<T> handle); ... }
从上面可以看出,Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将
对象放在一个线程本地栈中管理。
再来简单看一下线程本地buf缓存池:
//线程buf 缓存为PoolThreadLocalCache threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
//线程本地buf缓存
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { private final boolean useCacheForAllThreads; PoolThreadLocalCache(boolean useCacheForAllThreads) { this.useCacheForAllThreads = useCacheForAllThreads; } //初始化线程本地buf缓存 @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) { return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching for non FastThreadLocalThreads. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } //获取最少被线程使用的buf 缓存 private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) { if (arenas == null || arenas.length == 0) { return null; } PoolArena<T> minArena = arenas[0]; for (int i = 1; i < arenas.length; i++) { PoolArena<T> arena = arenas[i]; if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { minArena = arena; } } return minArena; } @Override protected void onRemoval(PoolThreadCache threadCache) { threadCache.free(); } }
再来简单看一下buf缓存池度量器:
metric = new PooledByteBufAllocatorMetric(this);
/** * Exposed metric for {@link PooledByteBufAllocator}. */ @SuppressWarnings("deprecation") public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetric { private final PooledByteBufAllocator allocator; PooledByteBufAllocatorMetric(PooledByteBufAllocator allocator) { this.allocator = allocator; } /** * Return the number of heap arenas. 返回堆缓存计数器 */ public int numHeapArenas() { return allocator.numHeapArenas(); } /** * Return the number of direct arenas. 返回direct缓存计数器 */ public int numDirectArenas() { return allocator.numDirectArenas(); } /** * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. 堆buf缓存度量器 */ public List<PoolArenaMetric> heapArenas() { return allocator.heapArenas(); } /** * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool. direct buf缓存度量器 */ public List<PoolArenaMetric> directArenas() { return allocator.directArenas(); } /** * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. 使用此Pooled字节分配器的线程本地缓存数 */ public int numThreadLocalCaches() { return allocator.numThreadLocalCaches(); } /** * Return the size of the tiny cache. tiny缓存大小 */ public int tinyCacheSize() { return allocator.tinyCacheSize(); } /** * Return the size of the small cache. small缓存大小 */ public int smallCacheSize() { return allocator.smallCacheSize(); } /** * Return the size of the normal cache. normal缓存大小 */ public int normalCacheSize() { return allocator.normalCacheSize(); } /** * Return the chunk size for an arena. */ public int chunkSize() { return allocator.chunkSize(); } //堆内存使用量 @Override public long usedHeapMemory() { return allocator.usedHeapMemory(); } //direct内存使用量 @Override public long usedDirectMemory() { return allocator.usedDirectMemory(); } @Override public String toString() { StringBuilder sb = new StringBuilder(256); sb.append(StringUtil.simpleClassName(this)) .append("(usedHeapMemory: ").append(usedHeapMemory()) .append("; usedDirectMemory: ").append(usedDirectMemory()) .append("; numHeapArenas: ").append(numHeapArenas()) .append("; numDirectArenas: ").append(numDirectArenas()) .append("; tinyCacheSize: ").append(tinyCacheSize()) .append("; smallCacheSize: ").append(smallCacheSize()) .append("; normalCacheSize: ").append(normalCacheSize()) .append("; numThreadLocalCaches: ").append(numThreadLocalCaches()) .append("; chunkSize: ").append(chunkSize()).append(')'); return sb.toString(); } }
再来看Pooledd字节分配器的其他方法:
//PooledByteBufAllocator
/** * Return the number of heap arenas. * * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}. */ @Deprecated public int numHeapArenas() { return heapArenaMetrics.size(); } /** * Return the number of direct arenas. * * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}. */ @Deprecated public int numDirectArenas() { return directArenaMetrics.size(); } /** * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. * * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}. */ @Deprecated public List<PoolArenaMetric> heapArenas() { return heapArenaMetrics; } /** * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool. * * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}. */ @Deprecated public List<PoolArenaMetric> directArenas() { return directArenaMetrics; } /** * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. * * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}. */ @Deprecated public int numThreadLocalCaches() { PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas; if (arenas == null) { return 0; } int total = 0; for (PoolArena<?> arena : arenas) { total += arena.numThreadCaches.get(); } return total; } /** * Return the size of the tiny cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}. */ @Deprecated public int tinyCacheSize() { return tinyCacheSize; } /** * Return the size of the small cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}. */ @Deprecated public int smallCacheSize() { return smallCacheSize; } /** * Return the size of the normal cache. * * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}. */ @Deprecated public int normalCacheSize() { return normalCacheSize; } /** * Return the chunk size for an arena. * * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}. */ @Deprecated public final int chunkSize() { return chunkSize; } final long usedHeapMemory() { return usedMemory(heapArenas); } final long usedDirectMemory() { return usedMemory(directArenas); } private static long usedMemory(PoolArena<?>... arenas) { if (arenas == null) { return -1; } long used = 0; for (PoolArena<?> arena : arenas) { used += arena.numActiveBytes(); if (used < 0) { return Long.MAX_VALUE; } } return used; } final PoolThreadCache threadCache() { return threadCache.get(); } /** * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive * and so should not called too frequently. */ public String dumpStats() { int heapArenasLen = heapArenas == null ? 0 : heapArenas.length; StringBuilder buf = new StringBuilder(512) .append(heapArenasLen) .append(" heap arena(s):") .append(StringUtil.NEWLINE); if (heapArenasLen > 0) { for (PoolArena<byte[]> a: heapArenas) { buf.append(a); } } int directArenasLen = directArenas == null ? 0 : directArenas.length; buf.append(directArenasLen) .append(" direct arena(s):") .append(StringUtil.NEWLINE); if (directArenasLen > 0) { for (PoolArena<ByteBuffer> a: directArenas) { buf.append(a); } } return buf.toString(); }
总结:
Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2452netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1318netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1599netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2839netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2041netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1878netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1220netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 959netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1084netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1858netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1884netty Inboudn/Outbound通道Invoker ...
相关推荐
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4 构建基于 UDP(用户数据报协议)的数据接收服务,以及如何实现...
在Netty中,处理UDP数据接收服务不仅限于上述的基本步骤,还可以利用Netty的丰富功能进行扩展,如添加自定义的协议编解码器、实现负载均衡、健康检查等。总之,Netty为构建高效、灵活的UDP服务提供了一整套强大的...
5. **ByteToMessageDecoder**:Netty提供了一种方便的解码器`ByteToMessageDecoder`,可以自定义解码逻辑。如果接收到的数据格式复杂,可以编写一个子类并覆盖`decode()`方法,确保在解码过程中正确处理字符编码。 ...
Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...
本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...
在Netty中,编解码器是处理网络数据流的关键组件,它们负责将原始字节流转换为易于处理的对象,反之亦然。通常,Netty使用管道(Pipeline)来传递和处理这些字节流,每个管道包含一系列的处理节点,即Handler。编码...
netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。
- **ByteBuf**:Netty自定义的高效字节缓冲区,比Java的ByteBuffer更易用且性能更好。 - **ChannelHandler**:处理器接口,用于实现I/O事件的处理逻辑。 - **Bootstrap**:启动器,用于配置并启动服务器或客户端...
Netty 提供了多种编解码器,如 `StringDecoder` 和 `StringEncoder`,可以方便地将字符串数据转换成 ByteBuf(Netty 的字节缓冲区),反之亦然。在本示例中,可能已经使用了这些编解码器来处理消息。 为了实现实时...
对于这些协议,Netty 提供了相应的编解码器,如 HttpObjectDecoder 和 WebSocketFrameDecoder,它们使得处理这些复杂协议变得非常简单。 此外,Netty 的性能优化也是其受欢迎的原因之一。例如,零拷贝技术通过避免...
在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...
分析这个代码,我们可以看到Netty如何创建服务器、设置管道、以及如何定义和使用自定义的解码器和编码器来处理16进制数据。 通过上述步骤,Netty服务器可以轻松地解析16进制数据,从而支持各种网络协议,无论它们是...
- Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...
对于Netty来说,编码器和解码器的实现通常需要遵循以下步骤: 1. 创建自定义的编码器类,继承MessageToByteEncoder,重写encode方法,将Java对象转换为ByteBuf。 2. 创建自定义的解码器类,继承...
在Netty中,解码器是处理接收数据的重要组件,用于将接收到的原始字节流转换为可操作的对象。本篇将深入探讨Netty中的两种解码器:分隔符解码器(DelimiterBasedFrameDecoder)和定长解码器...
3. **ByteBuf**:Netty提供的ByteBuf作为字节缓冲区,比Java的ByteBuffer更加灵活和高效。它支持读写指针独立移动,避免了不必要的内存复制。 4. **Pipeline**:Netty的ChannelHandler链是通过Pipeline实现的。...
2. **高效性**:Netty通过减少对象创建和内存复制来优化性能,例如,它的ByteBuf类提供了高效的字节缓冲区管理。 3. **灵活性**:Netty支持多种网络协议,如TCP、UDP、HTTP、WebSocket、FTP等,以及自定义协议。它...
在高级特性部分,书籍会涉及Netty的编解码器,如LineBasedFrameDecoder用于处理以换行符分隔的协议,以及Delimiters用于识别特定分隔符的协议。同时,自定义编解码器的编写也是进阶学习的重点,这使得Netty能适应...
- Netty 的 ByteBuf 提供了高效的内存管理,避免了频繁的字节数组拷贝。 2. **WebSocket 协议** - WebSocket 协议通过 HTTP/HTTPS 协议握手建立连接,之后即可在连接上进行双向数据传输,无需重复的 HTTP 请求/...
2. **Netty架构**:Netty采用了反应器模式,包含Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)等组件,构建了高效的事件驱动模型。...