- 浏览: 978851 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
引言
上一篇我们看了抽象字节buf分配器,先来回顾一下:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
今天我们来看抽象字节buf分配器一个具体实现非池类字节buf分配器UnpooledByteBufAllocator
来看一下度量器
//LongCounter Long计数器
来看创建堆buf
堆buf有两种,我们分别来看
先看InstrumentedUnpooledUnsafeHeapByteBuf
1.
//InstrumentedUnpooledUnsafeHeapByteBuf
//UnpooledUnsafeHeapByteBuf
2.
再来看另一种堆buf InstrumentedUnpooledHeapByteBuf
//InstrumentedUnpooledHeapByteBuf
我们来看一下Unpooled堆字节buf:
从上面可以看出,非池类堆字节buf,实际为一个字节数组。
//堆内存使用更新与释放,委托给内部度量器
再来看分配direct buf:
从上面来看direct buf有三种,分别为InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,
InstrumentedUnpooledUnsafeDirectByteBuf,InstrumentedUnpooledDirectByteBuf
我们分别来看这三种:
1.
//UnpooledUnsafeNoCleanerDirectByteBuf
再来看第二种:
2.
//UnpooledUnsafeDirectByteBuf
从上面我们可以看出,非池类Direct buf,实际为一个nio 字节buf。
再来看第三种:
3.
//UnpooledDirectByteBuf
再来看更新direct 内存使用量,更新操作,直接委托给内部度量器:
再来看复合buf
总结:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
附:
我们再来看一下Unpooled:
再来看分配buf:
从上面来看Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
包装buf相关的方法:
...
拷贝buf相关的方法:
//UnreleasableByteBuf
//WrappedByteBuf,理解为字节静态代理
//FixedCompositeByteBuf
//Unpooled
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
引言
上一篇我们看了抽象字节buf分配器,先来回顾一下:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
今天我们来看抽象字节buf分配器一个具体实现非池类字节buf分配器UnpooledByteBufAllocator
/** * Simplistic {@link ByteBufAllocator} implementation that does not pool anything. */ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { //字节buf分配器Metric private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric(); private final boolean disableLeakDetector;//是否关闭资源泄漏探测 /** * Default instance which uses leak-detection for direct buffers. 默认非池类字节buf分配器 */ public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); /** * Create a new instance which uses leak-detection for direct buffers. * * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than * a heap buffer */ public UnpooledByteBufAllocator(boolean preferDirect) { this(preferDirect, false); } /** * Create a new instance * * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than * a heap buffer * @param disableLeakDetector {@code true} if the leak-detection should be disabled completely for this * allocator. This can be useful if the user just want to depend on the GC to handle * direct buffers when not explicit released. */ public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) { super(preferDirect); this.disableLeakDetector = disableLeakDetector; } }
来看一下度量器
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric { final LongCounter directCounter = PlatformDependent.newLongCounter();//direct内存使用计数器 final LongCounter heapCounter = PlatformDependent.newLongCounter();//Heap内存使用计数器 @Override public long usedHeapMemory() { return heapCounter.value(); } @Override public long usedDirectMemory() { return directCounter.value(); } @Override public String toString() { return StringUtil.simpleClassName(this) + "(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')'; } }
//LongCounter Long计数器
/** * Counter for long. */ public interface LongCounter { void add(long delta); void increment(); void decrement(); long value(); }
来看创建堆buf
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
堆buf有两种,我们分别来看
先看InstrumentedUnpooledUnsafeHeapByteBuf
1.
//InstrumentedUnpooledUnsafeHeapByteBuf
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf { InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override byte[] allocateArray(int initialCapacity) { byte[] bytes = super.allocateArray(initialCapacity); //更新buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override void freeArray(byte[] array) { int length = array.length; super.freeArray(array); //释放buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
//UnpooledUnsafeHeapByteBuf
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf { /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } ... }
2.
再来看另一种堆buf InstrumentedUnpooledHeapByteBuf
//InstrumentedUnpooledHeapByteBuf
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf { InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override byte[] allocateArray(int initialCapacity) { byte[] bytes = super.allocateArray(initialCapacity); //更新buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override void freeArray(byte[] array) { int length = array.length; super.freeArray(array); //释放buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
我们来看一下Unpooled堆字节buf:
/** * Big endian Java heap buffer implementation. */ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; byte[] array;//存储数据的字节数组 private ByteBuffer tmpNioBuf;//临时nio 字节部分 /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } /** * Creates a new heap buffer with an existing byte array. * * @param initialArray the initial underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); checkNotNull(initialArray, "initialArray"); if (initialArray.length > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity)); } this.alloc = alloc; setArray(initialArray); setIndex(0, initialArray.length); } //分配initialCapacity容量的字节数组 byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } ... @Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; } .... @Override public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.capacity()); if (src.hasMemoryAddress()) { PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length); } else if (src.hasArray()) { setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); } else { src.getBytes(srcIndex, array, index, length); } return this; } @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } ... @Override public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); } @Override public ByteBuffer[] nioBuffers(int index, int length) { return new ByteBuffer[] { nioBuffer(index, length) }; } @Override public ByteBuffer internalNioBuffer(int index, int length) { checkIndex(index, length); return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length); } ... private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array); } return tmpNioBuf; } ... @Override protected void deallocate() { freeArray(array); array = null; } //待子类扩展 void freeArray(byte[] array) { // NOOP } }
从上面可以看出,非池类堆字节buf,实际为一个字节数组。
//堆内存使用更新与释放,委托给内部度量器
void incrementHeap(int amount) { metric.heapCounter.add(amount); } void decrementHeap(int amount) { metric.heapCounter.add(-amount); }
再来看分配direct buf:
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = PlatformDependent.useDirectBufferNoCleaner() ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
从上面来看direct buf有三种,分别为InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,
InstrumentedUnpooledUnsafeDirectByteBuf,InstrumentedUnpooledDirectByteBuf
我们分别来看这三种:
1.
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeNoCleanerDirectByteBuf { InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) { int capacity = oldBuffer.capacity(); ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledUnsafeNoCleanerDirectByteBuf
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } ... }
再来看第二种:
2.
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { InstrumentedUnpooledUnsafeDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledUnsafeDirectByteBuf
/** * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the * constructor explicitly. */ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private ByteBuffer tmpNioBuf; private int capacity;//容量 private boolean doNotFree;//是否需要释放内存 ByteBuffer buffer;//内部nio 字节buf long memoryAddress; /** * Creates a new direct buffer. * * @param initialCapacity the initial capacity of the underlying direct buffer * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); } /** * Creates a new direct buffer by wrapping the specified initial buffer. * * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) { // We never try to free the buffer if it was provided by the end-user as we not know if this is an duplicate or // an slice. This is done to prevent an IllegalArgumentException when using Java9 as Unsafe.invokeCleaner(...) // will check if the given buffer is either an duplicate or slice and in this case throw an // IllegalArgumentException. // // See http://hg.openjdk.java.net/jdk9/hs-demo/jdk/file/0d2ab72ba600/src/jdk.unsupported/share/classes/ // sun/misc/Unsafe.java#l1250 // // We also call slice() explicitly here to preserve behaviour with previous netty releases. this(alloc, initialBuffer.slice(), maxCapacity, false); } UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity, boolean doFree) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialBuffer == null) { throw new NullPointerException("initialBuffer"); } if (!initialBuffer.isDirect()) { throw new IllegalArgumentException("initialBuffer is not a direct buffer."); } if (initialBuffer.isReadOnly()) { throw new IllegalArgumentException("initialBuffer is a read-only buffer."); } int initialCapacity = initialBuffer.remaining(); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; doNotFree = !doFree; setByteBuffer(initialBuffer.order(ByteOrder.BIG_ENDIAN), false); writerIndex(initialCapacity); } /** * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. 分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf */ protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } ... }
从上面我们可以看出,非池类Direct buf,实际为一个nio 字节buf。
再来看第三种:
3.
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf { InstrumentedUnpooledDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledDirectByteBuf
/** * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the * constructor explicitly. */ public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private ByteBuffer buffer;//内部字节buf private ByteBuffer tmpNioBuf; private int capacity;//容量 private boolean doNotFree;//是否需要释放内存 /** * Creates a new direct buffer. * * @param initialCapacity the initial capacity of the underlying direct buffer * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); } /** * Creates a new direct buffer by wrapping the specified initial buffer. * * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialBuffer == null) { throw new NullPointerException("initialBuffer"); } if (!initialBuffer.isDirect()) { throw new IllegalArgumentException("initialBuffer is not a direct buffer."); } if (initialBuffer.isReadOnly()) { throw new IllegalArgumentException("initialBuffer is a read-only buffer."); } int initialCapacity = initialBuffer.remaining(); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; doNotFree = true; setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN)); writerIndex(initialCapacity); } /** * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. 分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf */ protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } ... }
再来看更新direct 内存使用量,更新操作,直接委托给内部度量器:
void incrementDirect(int amount) { metric.directCounter.add(amount); } void decrementDirect(int amount) { metric.directCounter.add(-amount); }
再来看复合buf
@Override public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this, false, maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); } @Override public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this, true, maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
总结:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
附:
我们再来看一下Unpooled:
/** * Creates a new {@link ByteBuf} by allocating new space or by wrapping * or copying existing byte arrays, byte buffers and a string. *Unpooled用于创建一个新的字节buf,包装和拷贝一个已经存在的字节数组,字节buf或String * <h3>Use static import</h3> * This classes is intended to be used with Java 5 static import statement: * * <pre> * import static io.netty.buffer.{@link Unpooled}.*; * * {@link ByteBuf} heapBuffer = buffer(128); * {@link ByteBuf} directBuffer = directBuffer(256); * {@link ByteBuf} wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]); * {@link ByteBuf} copiedBuffer = copiedBuffer({@link ByteBuffer}.allocate(128)); * </pre> * * <h3>Allocating a new buffer</h3> * * Three buffer types are provided out of the box. * * [list] * [*]{@link #buffer(int)} allocates a new fixed-capacity heap buffer. * [*]{@link #directBuffer(int)} allocates a new fixed-capacity direct buffer. * [/list] * * <h3>Creating a wrapped buffer</h3> *包装buf,为底层buf的视图,所有底层buf的改变,包装buf都可以看到 * Wrapped buffer is a buffer which is a view of one or more existing * byte arrays and byte buffers. Any changes in the content of the original * array or buffer will be visible in the wrapped buffer. Various wrapper * methods are provided and their name is all {@code wrappedBuffer()}. * You might want to take a look at the methods that accept varargs closely if * you want to create a buffer which is composed of more than one array to * reduce the number of memory copy. * * <h3>Creating a copied buffer</h3> *拷贝buf,并不会共享数据,底层buf数据的改变,拷贝buf看不到 * Copied buffer is a deep copy of one or more existing byte arrays, byte * buffers or a string. Unlike a wrapped buffer, there's no shared data * between the original data and the copied buffer. Various copy methods are * provided and their name is all {@code copiedBuffer()}. It is also convenient * to use this operation to merge multiple buffers into one buffer. */ public final class Unpooled { //字节buf分配器,默认为UnpooledByteBufAllocator.DEFAULT private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT; /** * Big endian byte order. */ public static final ByteOrder BIG_ENDIAN = ByteOrder.BIG_ENDIAN; /** * Little endian byte order. */ public static final ByteOrder LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN; /** * A buffer whose capacity is {@code 0}. */ public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0); static { assert EMPTY_BUFFER instanceof EmptyByteBuf: "EMPTY_BUFFER must be an EmptyByteBuf."; } }
再来看分配buf:
/** * Creates a new big-endian Java heap buffer with reasonably small initial capacity, which * expands its capacity boundlessly on demand. */ public static ByteBuf buffer() { return ALLOC.heapBuffer(); } /** * Creates a new big-endian direct buffer with reasonably small initial capacity, which * expands its capacity boundlessly on demand. */ public static ByteBuf directBuffer() { return ALLOC.directBuffer(); } /** * Creates a new big-endian Java heap buffer with the specified {@code capacity}, which * expands its capacity boundlessly on demand. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0}. */ public static ByteBuf buffer(int initialCapacity) { return ALLOC.heapBuffer(initialCapacity); } /** * Creates a new big-endian direct buffer with the specified {@code capacity}, which * expands its capacity boundlessly on demand. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0}. */ public static ByteBuf directBuffer(int initialCapacity) { return ALLOC.directBuffer(initialCapacity); } /** * Returns a new big-endian composite buffer with no components. */ public static CompositeByteBuf compositeBuffer() { return compositeBuffer(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS); } /** * Returns a new big-endian composite buffer with no components. */ public static CompositeByteBuf compositeBuffer(int maxNumComponents) { return new CompositeByteBuf(ALLOC, false, maxNumComponents); }
从上面来看Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
包装buf相关的方法:
/** * Creates a new big-endian buffer which wraps the specified {@code array}. * A modification on the specified array's content will be visible to the * returned buffer. */ public static ByteBuf wrappedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return new UnpooledHeapByteBuf(ALLOC, array, array.length); } /** * Creates a new buffer which wraps the specified buffer's readable bytes. * A modification on the specified buffer's content will be visible to the * returned buffer. * @param buffer The buffer to wrap. Reference count ownership of this variable is transfered to this method. * @return The readable portion of the {@code buffer}, or an empty buffer if there is no readable portion. * The caller is responsible for releasing this buffer. */ public static ByteBuf wrappedBuffer(ByteBuf buffer) { if (buffer.isReadable()) { return buffer.slice(); } else { buffer.release(); return EMPTY_BUFFER; } } /** * Creates a new big-endian composite buffer which wraps the readable bytes of the * specified buffers without copying them. A modification on the content * of the specified buffers will be visible to the returned buffer. * @param maxNumComponents Advisement as to how many independent buffers are allowed to exist before * consolidation occurs. * @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method. * @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer. */ public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) { switch (buffers.length) { case 0: break; case 1: ByteBuf buffer = buffers[0]; if (buffer.isReadable()) { return wrappedBuffer(buffer.order(BIG_ENDIAN)); } else { buffer.release(); } break; default: for (int i = 0; i < buffers.length; i++) { ByteBuf buf = buffers[i]; if (buf.isReadable()) { return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length); } buf.release(); } break; } return EMPTY_BUFFER; }
...
拷贝buf相关的方法:
** * Creates a new big-endian buffer whose content is a copy of the * specified {@code array}. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0} and {@code array.length} respectively. */ public static ByteBuf copiedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return wrappedBuffer(array.clone()); } /** * Creates a new buffer whose content is a copy of the specified * {@code buffer}'s readable bytes. The new buffer's {@code readerIndex} * and {@code writerIndex} are {@code 0} and {@code buffer.readableBytes} * respectively. */ public static ByteBuf copiedBuffer(ByteBuf buffer) { int readable = buffer.readableBytes(); if (readable > 0) { ByteBuf copy = buffer(readable); copy.writeBytes(buffer, buffer.readerIndex(), readable); return copy; } else { return EMPTY_BUFFER; } }
/** * Return a unreleasable view on the given {@link ByteBuf} which will just ignore release and retain calls. 返回一个不可释放的buf */ public static ByteBuf unreleasableBuffer(ByteBuf buf) { return new UnreleasableByteBuf(buf); }
//UnreleasableByteBuf
final class UnreleasableByteBuf extends WrappedByteBuf { private SwappedByteBuf swappedBuf; ... @Override public ByteBuf retain(int increment) { return this; } @Override public ByteBuf retain() { return this; } @Override public ByteBuf touch() { return this; } @Override public ByteBuf touch(Object hint) { return this; } @Override public boolean release() { return false; } @Override public boolean release(int decrement) { return false; } }
//WrappedByteBuf,理解为字节静态代理
class WrappedByteBuf extends ByteBuf { protected final ByteBuf buf; @Override public final int readerIndex() { return buf.readerIndex(); } @Override public final ByteBuf readerIndex(int readerIndex) { buf.readerIndex(readerIndex); return this; } @Override public final int writerIndex() { return buf.writerIndex(); } @Override public final ByteBuf writerIndex(int writerIndex) { buf.writerIndex(writerIndex); return this; } ... }
/** * Wrap the given {@link ByteBuf}s in an unmodifiable {@link ByteBuf}. Be aware the returned {@link ByteBuf} will * not try to slice the given {@link ByteBuf}s to reduce GC-Pressure. 包装buf为不可修改字节buf */ public static ByteBuf unmodifiableBuffer(ByteBuf... buffers) { return new FixedCompositeByteBuf(ALLOC, buffers); }
//FixedCompositeByteBuf
/** * {@link ByteBuf} implementation which allows to wrap an array of {@link ByteBuf} in a read-only mode. * This is useful to write an array of {@link ByteBuf}s. */ final class FixedCompositeByteBuf extends AbstractReferenceCountedByteBuf { private static final ByteBuf[] EMPTY = { Unpooled.EMPTY_BUFFER }; private final int nioBufferCount; private final int capacity; private final ByteBufAllocator allocator; private final ByteOrder order; private final Object[] buffers; private final boolean direct; ... }
//Unpooled
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1313netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2051netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1311netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1305netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1592netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1842netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1393netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2823netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2174netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2033netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1076netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1876netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1216netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1201netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 954netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1306netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2187netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1071netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1851netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1871netty Inboudn/Outbound通道Invoker ...
相关推荐
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4 构建基于 UDP(用户数据报协议)的数据接收服务,以及如何实现...
在Netty中,处理UDP数据接收服务不仅限于上述的基本步骤,还可以利用Netty的丰富功能进行扩展,如添加自定义的协议编解码器、实现负载均衡、健康检查等。总之,Netty为构建高效、灵活的UDP服务提供了一整套强大的...
Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...
5. **ByteToMessageDecoder**:Netty提供了一种方便的解码器`ByteToMessageDecoder`,可以自定义解码逻辑。如果接收到的数据格式复杂,可以编写一个子类并覆盖`decode()`方法,确保在解码过程中正确处理字符编码。 ...
本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...
分析这个代码,我们可以看到Netty如何创建服务器、设置管道、以及如何定义和使用自定义的解码器和编码器来处理16进制数据。 通过上述步骤,Netty服务器可以轻松地解析16进制数据,从而支持各种网络协议,无论它们是...
在Netty中,编解码器是处理网络数据流的关键组件,它们负责将原始字节流转换为易于处理的对象,反之亦然。通常,Netty使用管道(Pipeline)来传递和处理这些字节流,每个管道包含一系列的处理节点,即Handler。编码...
netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。
- **ByteBuf**:Netty自定义的高效字节缓冲区,比Java的ByteBuffer更易用且性能更好。 - **ChannelHandler**:处理器接口,用于实现I/O事件的处理逻辑。 - **Bootstrap**:启动器,用于配置并启动服务器或客户端...
Netty 提供了多种编解码器,如 `StringDecoder` 和 `StringEncoder`,可以方便地将字符串数据转换成 ByteBuf(Netty 的字节缓冲区),反之亦然。在本示例中,可能已经使用了这些编解码器来处理消息。 为了实现实时...
在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...
- Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...
对于Netty来说,编码器和解码器的实现通常需要遵循以下步骤: 1. 创建自定义的编码器类,继承MessageToByteEncoder,重写encode方法,将Java对象转换为ByteBuf。 2. 创建自定义的解码器类,继承...
2. **Netty架构**:Netty采用了反应器模式,包含Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)等组件,构建了高效的事件驱动模型。...
在高级特性部分,书籍会涉及Netty的编解码器,如LineBasedFrameDecoder用于处理以换行符分隔的协议,以及Delimiters用于识别特定分隔符的协议。同时,自定义编解码器的编写也是进阶学习的重点,这使得Netty能适应...
在Netty中,解码器是处理接收数据的重要组件,用于将接收到的原始字节流转换为可操作的对象。本篇将深入探讨Netty中的两种解码器:分隔符解码器(DelimiterBasedFrameDecoder)和定长解码器...
3. **ByteBuf**:Netty提供的ByteBuf作为字节缓冲区,比Java的ByteBuffer更加灵活和高效。它支持读写指针独立移动,避免了不必要的内存复制。 4. **Pipeline**:Netty的ChannelHandler链是通过Pipeline实现的。...
2. **高效性**:Netty通过减少对象创建和内存复制来优化性能,例如,它的ByteBuf类提供了高效的字节缓冲区管理。 3. **灵活性**:Netty支持多种网络协议,如TCP、UDP、HTTP、WebSocket、FTP等,以及自定义协议。它...
- Netty 的 ByteBuf 提供了高效的内存管理,避免了频繁的字节数组拷贝。 2. **WebSocket 协议** - WebSocket 协议通过 HTTP/HTTPS 协议握手建立连接,之后即可在连接上进行双向数据传输,无需重复的 HTTP 请求/...
5. **EventLoop** 和 **EventLoopGroup**:线程模型,EventLoop 负责执行 Channel 上的 I/O 操作和事件处理,EventLoopGroup 是 EventLoop 的管理器,负责分配和调度 EventLoop。 三、Netty 的事件驱动模型 Netty ...