`
Donald_Draper
  • 浏览: 980888 次
社区版块
存档分类
最新评论

netty 抽象字节buf分配器

阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
引言
上一篇文章,我们看了复合buf概念,先来回顾一下:

复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。

在netty 默认通道配置初始化(http://donald-draper.iteye.com/blog/2393504
这篇文章,我们看到字节buf是由字节分配器来分配的,默认字节分配器,在ByteBufUtil中定义,我们先回到
通道配置字节buf分配器的定义:
//字节buf分配器
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;


public final class ByteBufUtil {  
     static final ByteBufAllocator DEFAULT_ALLOCATOR;//默认字节分配器
     static {  
            String allocType = SystemPropertyUtil.get(  
                    "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");  
            allocType = allocType.toLowerCase(Locale.US).trim();  
            ByteBufAllocator alloc;  
            if ("unpooled".equals(allocType)) {  
                alloc = UnpooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: {}", allocType);  
            } else if ("pooled".equals(allocType)) {  
                alloc = PooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: {}", allocType);  
            } else {  
                alloc = PooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);  
            }  
            DEFAULT_ALLOCATOR = alloc;  
      
            THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);  
            logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);  
      
            MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);  
            logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);  
        }  
    }  


从字节buf工具类,可以看出,如果系统属性io.netty.allocator.type,配置的为unpooled,
则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,
对于Android平台,默认为UnpooledByteBufAllocator。

//UnpooledByteBufAllocator
 public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {  
      
        private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();  
        private final boolean disableLeakDetector;  
      
        /** 
         * Default instance which uses leak-detection for direct buffers. 
         */  
        public static final UnpooledByteBufAllocator DEFAULT =  
                new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());  
            ...  
    }  


//PooledByteBufAllocator
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {  
     public static final PooledByteBufAllocator DEFAULT =  
                new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());  
           ...  
}  


由于上面两个方法继承了AbstractByteBufAllocator,我们先来看一下抽象字节buf分配器,

/**
 * Skeletal {@link ByteBufAllocator} implementation to extend.
 */
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    static final int DEFAULT_INITIAL_CAPACITY = 256;//初始化容量
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;//最大容量
    static final int DEFAULT_MAX_COMPONENTS = 16;//复合buf最大组件buf数量
    static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page 容量调整阈值
    private final boolean directByDefault;//分配buf是否为direct类型
    private final ByteBuf emptyBuf;

    /**
     * Instance use heap buffers by default
     */
    protected AbstractByteBufAllocator() {
        this(false);
    }

    /**
     * Create new instance
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     */
    protected AbstractByteBufAllocator(boolean preferDirect) {
        directByDefault = preferDirect && PlatformDependent.hasUnsafe();
        emptyBuf = new EmptyByteBuf(this);
    }
}

来看创建字节buf:
@Override
public ByteBuf buffer() {
    if (directByDefault) {
        return directBuffer();
    }
    return heapBuffer();
}

@Override
public ByteBuf buffer(int initialCapacity) {
    if (directByDefault) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
    if (directByDefault) {
        return directBuffer(initialCapacity, maxCapacity);
    }
    return heapBuffer(initialCapacity, maxCapacity);
}

创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型

先来看分配direct类型buf
@Override
public ByteBuf directBuffer() {
    return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
/**
 * Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
 待子类扩展
 */
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);


再来看分配heap类型buf:
@Override
public ByteBuf heapBuffer() {
    return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
}
/**
 * Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
 待子类扩展
 */
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);


从上面可以看出创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。

来看创建ioBufer
@Override
public ByteBuf ioBuffer() {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(DEFAULT_INITIAL_CAPACITY);
    }
    return heapBuffer(DEFAULT_INITIAL_CAPACITY);
}

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity, maxCapacity);
    }
    return heapBuffer(initialCapacity, maxCapacity);
}


从上面可以看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型



再来看创建复合buf:

@Override
public CompositeByteBuf compositeBuffer() {
    if (directByDefault) {
        return compositeDirectBuffer();
    }
    return compositeHeapBuffer();
}

@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
    if (directByDefault) {
        return compositeDirectBuffer(maxNumComponents);
    }
    return compositeHeapBuffer(maxNumComponents);
}

从上面可以看出创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;

来看创建heap类型复合buf
@Override
public CompositeByteBuf compositeHeapBuffer() {
    return compositeHeapBuffer(DEFAULT_MAX_COMPONENTS);
}

@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
    return toLeakAwareBuffer(new CompositeByteBuf(this, false, maxNumComponents));
}

//追踪复合buf内存泄漏
protected static CompositeByteBuf toLeakAwareBuffer(CompositeByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}


//SimpleLeakAwareCompositeByteBuf
class SimpleLeakAwareCompositeByteBuf extends WrappedCompositeByteBuf {

    final ResourceLeakTracker<ByteBuf> leak;//内部资源泄漏探测器

    SimpleLeakAwareCompositeByteBuf(CompositeByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped);
        this.leak = ObjectUtil.checkNotNull(leak, "leak");
    }
    ...
 }

//追踪字节buf内存泄漏
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}

再来看创建direct类型复合buf
@Override
public CompositeByteBuf compositeDirectBuffer() {
    return compositeDirectBuffer(DEFAULT_MAX_COMPONENTS);
}

@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
    return toLeakAwareBuffer(new CompositeByteBuf(this, true, maxNumComponents));
}


从上面可以看出创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。

再来看一下计算新容量
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    //如果新容量大于阈值,
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {//新容量+阈值大于最大容量
            newCapacity = maxCapacity;//则容量为最大容量
        } else {
            newCapacity += threshold;//否则增量为阈值
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    //否则从容量64开始,每次扩展一倍,直至大于最小新容量
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

再来看字节buf分配器度量提供者ByteBufAllocatorMetricProvider

public interface ByteBufAllocatorMetricProvider {

    /**
     * Returns a {@link ByteBufAllocatorMetric} for a {@link ByteBufAllocator}.
     返回一个字节buf分配器度量器
     */
    ByteBufAllocatorMetric metric();
}


public interface ByteBufAllocatorMetric {
    /**
     * Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
     返回字节buf分配使用的堆内存
     */
    long usedHeapMemory();

    /**
     * Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
     返回字节buf分配使用的direct内存
     */
    long usedDirectMemory();
}

总结:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
0
0
分享到:
评论

相关推荐

    基于netty4 的udp字节数据接收服务

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4 构建基于 UDP(用户数据报协议)的数据接收服务,以及如何实现...

    java实现基于netty 的udp字节数据接收服务

    在Netty中,处理UDP数据接收服务不仅限于上述的基本步骤,还可以利用Netty的丰富功能进行扩展,如添加自定义的协议编解码器、实现负载均衡、健康检查等。总之,Netty为构建高效、灵活的UDP服务提供了一整套强大的...

    netty编码器,解码器实例

    Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...

    Netty进制转换乱码问题

    5. **ByteToMessageDecoder**:Netty提供了一种方便的解码器`ByteToMessageDecoder`,可以自定义解码逻辑。如果接收到的数据格式复杂,可以编写一个子类并覆盖`decode()`方法,确保在解码过程中正确处理字符编码。 ...

    Netty之自定义编解码器.zip

    本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...

    netty官网学习手册中文版

    - **ByteBuf**:Netty自定义的高效字节缓冲区,比Java的ByteBuffer更易用且性能更好。 - **ChannelHandler**:处理器接口,用于实现I/O事件的处理逻辑。 - **Bootstrap**:启动器,用于配置并启动服务器或客户端...

    Netty 框架学习 —— 编解码器框架(csdn)————程序.pdf

    在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...

    Netty+自定义Protobuf编解码器

    在Netty中,编解码器是处理网络数据流的关键组件,它们负责将原始字节流转换为易于处理的对象,反之亦然。通常,Netty使用管道(Pipeline)来传递和处理这些字节流,每个管道包含一系列的处理节点,即Handler。编码...

    netty 在java中的字节码转换

    netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。

    netty实现的聊天代码

    Netty 提供了多种编解码器,如 `StringDecoder` 和 `StringEncoder`,可以方便地将字符串数据转换成 ByteBuf(Netty 的字节缓冲区),反之亦然。在本示例中,可能已经使用了这些编解码器来处理消息。 为了实现实时...

    netty4.0源码,netty例子,netty api文档

    3. **ByteBuf**:Netty提供的ByteBuf作为字节缓冲区,比Java的ByteBuffer更加灵活和高效。它支持读写指针独立移动,避免了不必要的内存复制。 4. **Pipeline**:Netty的ChannelHandler链是通过Pipeline实现的。...

    netty实战教程、netty代码demo

    Netty 是由 JBoss 组织开源的一个网络通信框架,基于 Java NIO(非阻塞I/O)构建,提供了一套高度抽象和优化的 API,使得开发者可以轻松地处理网络连接的各种复杂情况。Netty 提供了多种传输类型,如 TCP、UDP 以及...

    netty服务器解析16进制数据

    分析这个代码,我们可以看到Netty如何创建服务器、设置管道、以及如何定义和使用自定义的解码器和编码器来处理16进制数据。 通过上述步骤,Netty服务器可以轻松地解析16进制数据,从而支持各种网络协议,无论它们是...

    Netty权威指南-Netty源码

    对于这些协议,Netty 提供了相应的编解码器,如 HttpObjectDecoder 和 WebSocketFrameDecoder,它们使得处理这些复杂协议变得非常简单。 此外,Netty 的性能优化也是其受欢迎的原因之一。例如,零拷贝技术通过避免...

    netty所需要得jar包

    - Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...

    netty-4.1_javaNetty_netty_服务器_

    3. **编解码器**:Netty 提供了丰富的编解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于将原始字节流转换为有意义的对象,以及将对象编码回字节流。 4. **异常处理**:学习如何在 Pipeline 中捕获...

    深入浅出Netty_netty_

    《深入浅出Netty》是一本专注于讲解Netty框架的编程指南,非常适合初学者入门。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书通过详实的代码案例,...

    netty分隔符和定长解码器的应用

    在Netty中,解码器是处理接收数据的重要组件,用于将接收到的原始字节流转换为可操作的对象。本篇将深入探讨Netty中的两种解码器:分隔符解码器(DelimiterBasedFrameDecoder)和定长解码器...

    NettyDemo Netty使用实例,对象传递调用

    对于Netty来说,编码器和解码器的实现通常需要遵循以下步骤: 1. 创建自定义的编码器类,继承MessageToByteEncoder,重写encode方法,将Java对象转换为ByteBuf。 2. 创建自定义的解码器类,继承...

    netty-4.1.4-jar包

    4. **ByteBuf**:Netty提供的高效字节缓冲区,提供了比Java的ByteBuffer更易用和高效的API,支持零拷贝和内存池管理。 5. **Decoder** 和 **Encoder**:用于在网络数据和应用对象之间进行编码和解码,例如将HTTP请求...

Global site tag (gtag.js) - Google Analytics