Netty的ByteBuf主要用于网络传输,有读写两个index。
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
我们可以看出,大致分为几类ByteBuf,一类是Pooled,UnPooled。另一个是Heap,Direct。
public abstract class AbstractByteBuf extends ByteBuf { static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class); int readerIndex; //读的索引 int writerIndex;//写的索引 private int markedReaderIndex;//标记读的索引 private int markedWriterIndex;//标记写的索引 private int maxCapacity;//最大容量 }
PooledByteBuf,通过capacity和deallocate方法我们可以发现都是用 chunk.arena来分配内存和释放内存的
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf { private final Recycler.Handle recyclerHandle; protected PoolChunk<T> chunk; protected long handle; protected T memory; protected int offset; protected int length; int maxLength; private ByteBuffer tmpNioBuf; @Override public final ByteBuf capacity(int newCapacity) { ensureAccessible(); // If the request capacity does not require reallocation, just update the length of the memory. if (chunk.unpooled) { if (newCapacity == length) { return this; } } else { if (newCapacity > length) { if (newCapacity <= maxLength) { length = newCapacity; return this; } } else if (newCapacity < length) { if (newCapacity > maxLength >>> 1) { if (maxLength <= 512) { if (newCapacity > maxLength - 16) { length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } else { // > 512 (i.e. >= 1024) length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } } else { return this; } } // Reallocation required. chunk.arena.reallocate(this, newCapacity, true); return this; } @Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, handle, maxLength); recycle(); } } }
PooledDirectByteBuf 是缓存的DirectByteBuf. 用RECYCLER来做回收重复利用。调用memory的get方法获取数据。memory是一个DirectByteBuffer对象。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject(Handle handle) { return new PooledDirectByteBuf(handle, 0); } }; static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } @Override protected byte _getByte(int index) { return memory.get(idx(index)); } @Override protected short _getShort(int index) { return memory.getShort(idx(index)); } }
PooledUnsafeDirectByteBuf和PooledDirectByteBuf的区别是没有使用DirectByteBuffer的接口方法去获取数据,而是直接用PlatformDependent的方法获取数据
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle handle) { return new PooledUnsafeDirectByteBuf(handle, 0); } }; static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } private long memoryAddress; private void initMemoryAddress() { memoryAddress = PlatformDependent.directBufferAddress(memory) + offset; } @Override protected byte _getByte(int index) { return PlatformDependent.getByte(addr(index)); } @Override protected short _getShort(int index) { short v = PlatformDependent.getShort(addr(index)); return NATIVE_ORDER? v : Short.reverseBytes(v); } }
PooledHeapByteBuf是直接用堆内分配的byte数组来存储数据,因此可以直接通过偏移量来读取数据
final class PooledHeapByteBuf extends PooledByteBuf<byte[]> { private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle handle) { return new PooledHeapByteBuf(handle, 0); } }; static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } @Override protected byte _getByte(int index) { return memory[idx(index)]; } @Override protected short _getShort(int index) { index = idx(index); return (short) (memory[index] << 8 | memory[index + 1] & 0xFF); } }
UnpooledHeapByteBuf和PooledHeapByteBuf的区别是UnpooledHeapByteBuf不会用RECYCLER回收器
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private byte[] array; private ByteBuffer tmpNioBuf; /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { this(alloc, new byte[initialCapacity], 0, 0, maxCapacity); } }
Netty会调用PoolArea的allocate方法创建ByteBuf对象
abstract class PoolArena<T> { static final int numTinySubpagePools = 512 >>> 4; final PooledByteBufAllocator parent; private final int maxOrder; final int pageSize; final int pageShifts; final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] smallSubpagePools; private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; if (isTiny(normCapacity)) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } synchronized (this) { final PoolSubpage<T> head = table[tableIdx]; final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); return; } } } else if (normCapacity <= chunkSize) { if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); return; } allocateNormal(buf, reqCapacity, normCapacity); } }
相关推荐
Netty ByteBuf 的零拷贝(Zero-Copy)理解 Netty 中的零拷贝(Zero-Copy)是指在操作数据时,不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域,这样可以减少 CPU 的负载和内存带宽的占用。 Zero-Copy 通常...
丢弃服务器Netty 实现时间服务器Java ByteBuffer使用案例Netty ByteBuf使用案例Netty ByteBuf 的清晰使用模式Netty实现无连接协议Echo服务器、客户端Java线程池示例Java Reactor 示例基于自定义换行的解码器TCP...
ByteBuf是Java的Netty框架中的一个核心组件,它是一个高效、线程安全的字节缓冲区,用于处理网络I/O。在Rust编程语言中,`bytebuf-rs`项目是一个模仿Netty ByteBuf设计的库,旨在提供类似的性能特性和功能。本文将...
netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。
2. **高效的缓冲区**:Netty 提供了ByteBuf,这是一个高性能的字节缓冲区,支持零拷贝,降低了内存拷贝带来的开销。 3. **管道(Pipeline)机制**:Netty 的数据传输基于Pipeline,每个连接都有一个处理链,可以...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...掌握Netty的使用,将极大地提升我们在分布式系统、游戏服务器、RPC服务等领域的开发能力。
将Netty ByteBuf的相关类,进行梳理总结、源码分析,通过思维导图的注释看源码!
总之,"android netty使用demo"提供了一个基础的Android Netty应用实例,展示了如何在Android环境中使用Netty实现长连接通信。然而,实际开发中,还需要考虑更多的细节和优化,以满足复杂的网络应用场景。
Netty的ByteBuf支持大端序和小端序,确保两端使用相同顺序。 7. **调试与日志**:在解决问题的过程中,日志是非常重要的工具。通过打印接收到的字节数组和解码后的字符串,可以帮助定位问题。 总结,Netty中的进制...
ByteBuf response = Unpooled.copiedBuffer(user.toString(), CharsetUtil.UTF_8); ctx.writeAndFlush(response); } } ``` 最后,使用Spring的`ClassPathXmlApplicationContext`来加载配置并启动服务器: ```...
ByteBuf是Netty提供的字节缓冲区,它是Java NIO ByteBuffer的一个增强版。ByteBuf提供了一种更高效的内存管理方式,支持读写索引独立,避免了BufferOverflow和BufferUnderflow等异常。它还支持堆内和堆外内存,以及...
Netty 提供了 `ByteBuf` 类作为缓冲区,用于高效地存储和操作字节数据。`ByteBuf` 支持读写操作,可以用于处理16进制数据。通过 `ByteBuf` 的方法,如 `readByte()`、`writeByte()`、`readInt()` 和 `writeInt()` ...
首先,我们需要了解Netty的ByteBuf和ChannelHandlerContext。ByteBuf是Netty中的缓冲区,用于存储和读写网络数据。ChannelHandlerContext则提供了与通道相关的操作,包括读写数据、处理事件等。 在构建WebSocket...
在Java环境中,使用Netty框架实现JT808协议的解析部分源码,可以为开发者提供一种高效且灵活的处理方式。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发可伸缩的网络应用。 源码分析: 1. **协议...
- **Buffer**:Netty的ByteBuf提供了高效的数据存储和传输功能,优于Java原生的ByteBuffer。 4. **Netty编程模型**:包括客户端和服务端的创建、连接建立、数据读写、异常处理等,通过编写自定义的Handler来实现...
3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持直接内存和堆内存操作,避免了频繁的内存复制。 4. **零拷贝**:Netty通过使用FileRegion实现零拷贝,减少了CPU...
此外,Netty的ByteBuf类是高效内存管理的关键,它提供了缓冲区的读写操作,避免了不必要的数据拷贝。 在高级特性部分,书籍会涉及Netty的编解码器,如LineBasedFrameDecoder用于处理以换行符分隔的协议,以及...
这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...
1. **ByteBuf**:Netty 使用 ByteBuf 替代了传统的 ByteBuffer,提供了更高效的数据缓冲区管理,减少了内存复制和对象创建的开销。 2. **零拷贝**:Netty 支持零拷贝技术,如 FileRegion,减少数据在内核和用户空间...
《Netty使用初步》 Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种分布式系统、云计算平台以及游戏服务器等场景。本篇文章将深入探讨Netty...