`

Netty ByteBuf的使用

 
阅读更多

Netty的ByteBuf主要用于网络传输,有读写两个index。

 *      +-------------------+------------------+------------------+

 *      | discardable bytes |  readable bytes  |  writable bytes  |

 *      |                   |     (CONTENT)    |                  |

 *      +-------------------+------------------+------------------+

 *      |                   |                  |                  |

 *      0      <=      readerIndex   <=   writerIndex    <=    capacity


 我们可以看出,大致分为几类ByteBuf,一类是Pooled,UnPooled。另一个是Heap,Direct。

public abstract class AbstractByteBuf extends ByteBuf {

    static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);

    int readerIndex; //读的索引
    int writerIndex;//写的索引
    private int markedReaderIndex;//标记读的索引
    private int markedWriterIndex;//标记写的索引

    private int maxCapacity;//最大容量
}

   PooledByteBuf,通过capacity和deallocate方法我们可以发现都是用 chunk.arena来分配内存和释放内存的

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

    private final Recycler.Handle recyclerHandle;

    protected PoolChunk<T> chunk;
    protected long handle;
    protected T memory;
    protected int offset;
    protected int length;
    int maxLength;

    private ByteBuffer tmpNioBuf;

   @Override
    public final ByteBuf capacity(int newCapacity) {
        ensureAccessible();

        // If the request capacity does not require reallocation, just update the length of the memory.
        if (chunk.unpooled) {
            if (newCapacity == length) {
                return this;
            }
        } else {
            if (newCapacity > length) {
                if (newCapacity <= maxLength) {
                    length = newCapacity;
                    return this;
                }
            } else if (newCapacity < length) {
                if (newCapacity > maxLength >>> 1) {
                    if (maxLength <= 512) {
                        if (newCapacity > maxLength - 16) {
                            length = newCapacity;
                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                            return this;
                        }
                    } else { // > 512 (i.e. >= 1024)
                        length = newCapacity;
                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                        return this;
                    }
                }
            } else {
                return this;
            }
        }

        // Reallocation required.
        chunk.arena.reallocate(this, newCapacity, true);
        return this;
    }

    @Override
    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
            this.handle = -1;
            memory = null;
            chunk.arena.free(chunk, handle, maxLength);
            recycle();
        }
    }
}

    PooledDirectByteBuf 是缓存的DirectByteBuf. 用RECYCLER来做回收重复利用。调用memory的get方法获取数据。memory是一个DirectByteBuffer对象。

final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
        @Override
        protected PooledDirectByteBuf newObject(Handle handle) {
            return new PooledDirectByteBuf(handle, 0);
        }
    };

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }

    @Override
    protected byte _getByte(int index) {
        return memory.get(idx(index));
    }

    @Override
    protected short _getShort(int index) {
        return memory.getShort(idx(index));
    }
}

  PooledUnsafeDirectByteBuf和PooledDirectByteBuf的区别是没有使用DirectByteBuffer的接口方法去获取数据,而是直接用PlatformDependent的方法获取数据

final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;

    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }
    private long memoryAddress;

  private void initMemoryAddress() {
        memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;
    }
  
    @Override
    protected byte _getByte(int index) {
        return PlatformDependent.getByte(addr(index));
    }

    @Override
    protected short _getShort(int index) {
        short v = PlatformDependent.getShort(addr(index));
        return NATIVE_ORDER? v : Short.reverseBytes(v);
    }
}

 PooledHeapByteBuf是直接用堆内分配的byte数组来存储数据,因此可以直接通过偏移量来读取数据

final class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

    private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        @Override
        protected PooledHeapByteBuf newObject(Handle handle) {
            return new PooledHeapByteBuf(handle, 0);
        }
    };

    static PooledHeapByteBuf newInstance(int maxCapacity) {
        PooledHeapByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }
    @Override
    protected byte _getByte(int index) {
        return memory[idx(index)];
    }

    @Override
    protected short _getShort(int index) {
        index = idx(index);
        return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
    }
}

   UnpooledHeapByteBuf和PooledHeapByteBuf的区别是UnpooledHeapByteBuf不会用RECYCLER回收器

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    private byte[] array;
    private ByteBuffer tmpNioBuf;

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
    }
}

   Netty会调用PoolArea的allocate方法创建ByteBuf对象

abstract class PoolArena<T> {

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            if (isTiny(normCapacity)) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            synchronized (this) {
                final PoolSubpage<T> head = table[tableIdx];
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    return;
                }
            }
        } else if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
            return;
        }
        allocateNormal(buf, reqCapacity, normCapacity);
    }
}

 

 

  

 

  • 大小: 74.4 KB
分享到:
评论

相关推荐

    对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解1

    Netty ByteBuf 的零拷贝(Zero-Copy)理解 Netty 中的零拷贝(Zero-Copy)是指在操作数据时,不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域,这样可以减少 CPU 的负载和内存带宽的占用。 Zero-Copy 通常...

    Netty 演示 (Netty案例大全).zip

    丢弃服务器Netty 实现时间服务器Java ByteBuffer使用案例Netty ByteBuf使用案例Netty ByteBuf 的清晰使用模式Netty实现无连接协议Echo服务器、客户端Java线程池示例Java Reactor 示例基于自定义换行的解码器TCP...

    bytebuf-rs:类似于Netty ByteBuf的ByteBuf的Rust实现

    ByteBuf是Java的Netty框架中的一个核心组件,它是一个高效、线程安全的字节缓冲区,用于处理网络I/O。在Rust编程语言中,`bytebuf-rs`项目是一个模仿Netty ByteBuf设计的库,旨在提供类似的性能特性和功能。本文将...

    netty 在java中的字节码转换

    netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。

    Netty4 使用

    2. **高效的缓冲区**:Netty 提供了ByteBuf,这是一个高性能的字节缓冲区,支持零拷贝,降低了内存拷贝带来的开销。 3. **管道(Pipeline)机制**:Netty 的数据传输基于Pipeline,每个连接都有一个处理链,可以...

    NettyDemo Netty使用实例,对象传递调用

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...掌握Netty的使用,将极大地提升我们在分布式系统、游戏服务器、RPC服务等领域的开发能力。

    ByteBuf源码分析

    将Netty ByteBuf的相关类,进行梳理总结、源码分析,通过思维导图的注释看源码!

    android netty使用demo

    总之,"android netty使用demo"提供了一个基础的Android Netty应用实例,展示了如何在Android环境中使用Netty实现长连接通信。然而,实际开发中,还需要考虑更多的细节和优化,以满足复杂的网络应用场景。

    Netty进制转换乱码问题

    Netty的ByteBuf支持大端序和小端序,确保两端使用相同顺序。 7. **调试与日志**:在解决问题的过程中,日志是非常重要的工具。通过打印接收到的字节数组和解码后的字符串,可以帮助定位问题。 总结,Netty中的进制...

    spring+netty+mybatis整合实例

    ByteBuf response = Unpooled.copiedBuffer(user.toString(), CharsetUtil.UTF_8); ctx.writeAndFlush(response); } } ``` 最后,使用Spring的`ClassPathXmlApplicationContext`来加载配置并启动服务器: ```...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    ByteBuf是Netty提供的字节缓冲区,它是Java NIO ByteBuffer的一个增强版。ByteBuf提供了一种更高效的内存管理方式,支持读写索引独立,避免了BufferOverflow和BufferUnderflow等异常。它还支持堆内和堆外内存,以及...

    netty服务器解析16进制数据

    Netty 提供了 `ByteBuf` 类作为缓冲区,用于高效地存储和操作字节数据。`ByteBuf` 支持读写操作,可以用于处理16进制数据。通过 `ByteBuf` 的方法,如 `readByte()`、`writeByte()`、`readInt()` 和 `writeInt()` ...

    使用Netty搭建WebSocket服务器,可修改单包大小限制

    首先,我们需要了解Netty的ByteBuf和ChannelHandlerContext。ByteBuf是Netty中的缓冲区,用于存储和读写网络数据。ChannelHandlerContext则提供了与通道相关的操作,包括读写数据、处理事件等。 在构建WebSocket...

    jt808netty版解析部分源码

    在Java环境中,使用Netty框架实现JT808协议的解析部分源码,可以为开发者提供一种高效且灵活的处理方式。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发可伸缩的网络应用。 源码分析: 1. **协议...

    跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty.zip

    - **Buffer**:Netty的ByteBuf提供了高效的数据存储和传输功能,优于Java原生的ByteBuffer。 4. **Netty编程模型**:包括客户端和服务端的创建、连接建立、数据读写、异常处理等,通过编写自定义的Handler来实现...

    Netty实战 电子版.pdf_java_netty_服务器_

    3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持直接内存和堆内存操作,避免了频繁的内存复制。 4. **零拷贝**:Netty通过使用FileRegion实现零拷贝,减少了CPU...

    Netty进阶之路-跟着案例学Netty

    此外,Netty的ByteBuf类是高效内存管理的关键,它提供了缓冲区的读写操作,避免了不必要的数据拷贝。 在高级特性部分,书籍会涉及Netty的编解码器,如LineBasedFrameDecoder用于处理以换行符分隔的协议,以及...

    netty 4.1 英文api 帮助文档 + 用户指南 chm

    这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...

    使用Netty4实现多线程的消息分发

    1. **ByteBuf**:Netty 使用 ByteBuf 替代了传统的 ByteBuffer,提供了更高效的数据缓冲区管理,减少了内存复制和对象创建的开销。 2. **零拷贝**:Netty 支持零拷贝技术,如 FileRegion,减少数据在内核和用户空间...

    Netty使用初步

    《Netty使用初步》 Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种分布式系统、云计算平台以及游戏服务器等场景。本篇文章将深入探讨Netty...

Global site tag (gtag.js) - Google Analytics