`
dwj147258
  • 浏览: 194638 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Bytebuffer和ByteBuf比较

阅读更多

一.ByteBuffer

 

  ByteBuffer是JDK NIO中提供的Java.nio.Buffer, 在内存中预留指定大小的存储空间来存放临时数据,其他Buffer 的子类有:CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer 和 ShortBuffer

 1. Buffer

 

   ByteBuffer继承Buffer,Buffer中定义的成员变量。

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. @author Mark Reinhold  
  3. @author JSR-51 Expert Group  
  4. @since 1.4  
  5. */  
  6.   
  7. ublic abstract class Buffer {  
  8.   
  9.    // Invariants: mark <= position <= limit <= capacity  
  10.    private int mark = -1;  
  11.    private int position = 0;  
  12.    private int limit;  
  13.    private int capacity;  
  14.   
  15.    // Used only by direct buffers  
  16.    // NOTE: hoisted here for speed in JNI GetDirectBufferAddress  
  17.    long address;  

 

 

每个Buffer都有以下的属性:

capacity
这个Buffer最多能放多少数据。capacity在buffer被创建的时候指定。

limit
在Buffer上进行的读写操作都不能越过这个下标。当写数据到buffer中时,limit一般和capacity相等,当读数据时,
limit代表buffer中有效数据的长度。

position
读/写操作的当前下标。当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,
buffer会更新下标的值。

mark
一个临时存放的位置下标。调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

这些属性总是满足以下条件:
0 <= mark <= position <= limit <= capacity

 

走个ByteBuffer的小例子

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.nio.ByteBuffer;  
  2.   
  3. public class ByteBufferTest {  
  4.   
  5.     public static void main(String[] args) {  
  6.       
  7.         //实例初始化  
  8.         ByteBuffer buffer = ByteBuffer.allocate(100);  
  9.         String value ="Netty";  
  10.         buffer.put(value.getBytes());  
  11.         buffer.flip();  
  12.         byte[] vArray = new byte[buffer.remaining()];  
  13.         buffer.get(vArray);  
  14.         System.out.println(new String(vArray));  
  15.     }  
  16.   
  17. }  

我们看下调用flip()操作前后的对比

 

+--------------------+-----------------------------------------------------------+

|        Netty          |                                                                           |

+--------------------+-----------------------------------------------------------+

|                          |                                                                           |

0                       position                                                        limit = capacity 

                                      ByteBuffer flip()操作之前

 

 

+--------------------+-----------------------------------------------------------+

|        Netty          |                                                                           |

+--------------------+-----------------------------------------------------------+

|                          |                                                                           |

position              limit                                                                 capacity

                                      ByteBuffer flip()操作之后

 

由于ByteBuffer只有一个position位置指针用户处理读写请求操作,因此每次读写的时候都需要调用flip()和clean()等方法,否则功能将出错。如上图,如果不做flip操作,读取到的将是position到capacity之间的错误内容。当执行flip()操作之后,它的limit被设置为position,position设置为0,capacity不变,由于读取的内容是从position都limit之间,因此它能够正确的读取到之前写入缓冲区的内容。

 

3.Buffer常用的函数

 

 

 clear()
把position设为0,把limit设为capacity,一般在把数据写入Buffer前调用。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public final Buffer clear() {  
  2.         position = 0;  
  3.         limit = capacity;  
  4.         mark = -1;  
  5.         return this;  
  6.     }  

 flip()
把limit设为当前position,把position设为0,一般在从Buffer读出数据前调用。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public final Buffer flip() {  
  2.         limit = position;  
  3.         position = 0;  
  4.         mark = -1;  
  5.         return this;  
  6.     }  

rewind()

把position设为0,limit不变,一般在把数据重写入Buffer前调用。

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public final Buffer rewind() {  
  2.        position = 0;  
  3.        mark = -1;  
  4.        return this;  
  5.    }  

mark()

设置mark的值,mark=position,做个标记。

reset()

还原标记,把mark的值赋值给position。

 

4.ByteBuffer实例化

 

allocate(int capacity)

从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer 。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public static ByteBuffer allocate(int capacity) {  
  2.        if (capacity < 0)  
  3.            throw new IllegalArgumentException();  
  4.        return new HeapByteBuffer(capacity, capacity);  
  5.    }  

allocateDirect(int capacity)
非JVM堆栈而是通过操作系统来创建内存块用作缓冲区,它与当前操作系统能够更好的耦合,因此能进一步提高I/O操作速度。但是分配直接缓冲区的系统开销很大,因此只有在缓冲区较大并长期存在,或者需要经常重用时,才使用这种缓冲区,实现类是DirectByteBuffer。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public static ByteBuffer allocateDirect(int capacity) {  
  2.        return new DirectByteBuffer(capacity);  
  3.    }  

 

wrap(byte[] array)

这个缓冲区的数据会存放在byte数组中,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer 。

wrap(byte[] array, int offset, int length)

在上一个方法的基础上可以指定偏移量和长度,这个offset也就是包装后byteBuffer的position,而length呢就是limit-position的大小,从而我们可以得到limit的位置为length+position(offset),实现类是HeapByteBuffer 。

 

HeapByteBuffer和DirectByteBuffer的总结:前者是内存的分派和回收速度快,可以被JVM自动回收,缺点是如果进行Socket的I/O读写,需要额外做一次内存拷贝,将堆内存对应的缓存区复制到内核中,性能会有一定程序的下降;后者非堆内存,它在堆外进行内存的分配,相比堆内存,它的分配和回收速度会慢一些,但是它写入或者从Socket Channel中读取时,由于少了一次内存复制,速度比堆内存快。经验表明,最佳实践是在I/O通信线程的读写缓冲区使用DirectByteBuffer,后端业务消息的编码模块使用HeapByteBuffer,这样的组合可以达到性能最优。

  

二. ByteBuf

 

先走个小例子

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import io.netty.buffer.ByteBuf;  
  2. import io.netty.buffer.Unpooled;  
  3.   
  4.   
  5. public class ByteBufTest {  
  6.   
  7.     public static void main(String[] args) {  
  8.       
  9.         //实例初始化  
  10.         ByteBuf buffer =   Unpooled.buffer(100);  
  11.         String value ="学习ByteBuf";  
  12.         buffer.writeBytes(value.getBytes());  
  13.         System.out.println("获取readerIndex:"+buffer.readerIndex());  
  14.         System.out.println("获取writerIndex:"+buffer.writerIndex());  
  15.         byte[] vArray = new byte[buffer.writerIndex()];  
  16.         buffer.readBytes(vArray);  
  17.         System.out.println("获取readerIndex:"+buffer.readerIndex());  
  18.         System.out.println("获取writerIndex:"+buffer.writerIndex());  
  19.         System.out.println(new String(vArray));  
  20.           
  21.           
  22.     }  
  23.   
  24. }  

接着看下ByteBuf主要类继承关系

1. AbstractByteBuf

 
AbstractByteBuf继承ByteBuf,AbstractByteBuf中定义了ByteBuf的一些公共属性,像读索引、写索引、mark、最大容量等公共属性,具体定义如下图。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public abstract class AbstractByteBuf extends ByteBuf {  
  2.   
  3.     static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);  
  4.       
  5.     int readerIndex;  //读索引  
  6.     private int writerIndex; //写索引  
  7.   
  8.     private int markedReaderIndex; //,  
  9.     private int markedWriterIndex;  
  10.   
  11.     private int maxCapacity;  
  12.   
  13.     private SwappedByteBuf swappedBuf;  

在AbstractByteBuf中并没有定义ByteBuf的缓冲区实现,因为AbstractByteBuf并不清楚子类到底是基于堆内存还是直接内存。AbstractByteBuf中定义了读写操作方法,这里主要介绍下写方法,ByteBuf写操作支持自动扩容,ByteBuffer而不支持,我们看下writeByte()具体的源码。

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.   public ByteBuf writeByte(int value) {  
  3.       ensureWritable(1);  
  4.       setByte(writerIndex++, value);  
  5.       return this;  
  6.   }  

 

接着调用ensureWritable()方法,是否需要自动扩容。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.    public ByteBuf ensureWritable(int minWritableBytes) {  
  3.        if (minWritableBytes < 0) {  
  4.            throw new IllegalArgumentException(String.format(  
  5.                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));  
  6.        }  
  7.   
  8.        if (minWritableBytes <= writableBytes()) { //writableBytes()计算可写的容量=“capacity() - writerIndex;”  
  9.   
  10.            return this;  
  11.        }  
  12.   
  13.        if (minWritableBytes > maxCapacity - writerIndex) {  
  14.            throw new IndexOutOfBoundsException(String.format(  
  15.                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",  
  16.                    writerIndex, minWritableBytes, maxCapacity, this));  
  17.        }  
  18.   
  19.        // Normalize the current capacity to the power of 2.  
  20.        int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);  
  21.   
  22.        // Adjust to the new capacity.  
  23.        capacity(newCapacity);  
  24.        return this;  
  25.    }  

接着继续调用calculateNewCapacity(),计算自动扩容后容量,即满足要求的最小容量,等于writeIndex+minWritableBytes。

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. private int calculateNewCapacity(int minNewCapacity) {  
  2.         final int maxCapacity = this.maxCapacity;  
  3.         final int threshold = 1048576 * 4// 4 MiB page  
  4.   
  5.         if (minNewCapacity == threshold) {  
  6.             return threshold;  
  7.         }  
  8.   
  9.         // If over threshold, do not double but just increase by threshold.  
  10.         if (minNewCapacity > threshold) {  
  11.             int newCapacity = minNewCapacity / threshold * threshold;  
  12.             if (newCapacity > maxCapacity - threshold) {  
  13.                 newCapacity = maxCapacity;  
  14.             } else {  
  15.                 newCapacity += threshold;  
  16.             }  
  17.             return newCapacity;  
  18.         }  
  19.   
  20.         // Not over threshold. Double up to 4 MiB, starting from 64.  
  21.         int newCapacity = 64;  
  22.         while (newCapacity < minNewCapacity) {  
  23.             newCapacity <<= 1;  
  24.         }  
  25.   
  26.         return Math.min(newCapacity, maxCapacity);  
  27.     }  

首先设置门限值为4MB,当需要的新容量正好等于门限值时,使用门限值作为新的缓存区容量,如果新申请的内存容量大于门限值,不能采用倍增的方式扩张内容(防止内存膨胀和浪费),而是采用每次进步4MB的方式来内存扩张,扩张的时候需要对扩张后的内存和最大内存进行对比,如果大于缓存区的最大长度,则使用maxCapacity作为扩容后的缓存区容量。如果扩容后的新容量小于门限值,则以64为计算进行倍增,知道倍增后的结果大于等于需要的值。

 

重用缓存区,重用已经读取过的缓存区,下面介绍下discardReadBytes()方法的实现进行分析

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.    public ByteBuf discardReadBytes() {  
  3.        ensureAccessible();  
  4.        if (readerIndex == 0) {  
  5.            return this;  
  6.        }  
  7.   
  8.        if (readerIndex != writerIndex) {  
  9.            //复制数组 System.arraycopy(this,readerIndex, ,array,0,writerIndex - readerIndex)  
  10.            setBytes(0this, readerIndex, writerIndex - readerIndex);  
  11.            writerIndex -= readerIndex;  
  12.            adjustMarkers(readerIndex);  
  13.            readerIndex = 0;  
  14.        } else {  
  15.            adjustMarkers(readerIndex);  
  16.            writerIndex = readerIndex = 0;  
  17.        }  
  18.        return this;  
  19.    }  

首先对度索引进行判断,如果为0则说明没有可重用的缓存区,直接返回,如果读索引大于0且读索引不等于写索引,说明缓冲区中既有已经读取过的被丢弃的缓冲区,也有尚未读取的可读取缓存区。调用setBytes(0, this, readerIndex, writerIndex - readerIndex)进行字节数组复制,将尚未读取的字节数组复制到缓冲区的起始位置,然后重新设置读写索引,读索引为0,写索引设置为之前的写索引减去读索引。在设置读写索引的同时,调整markedReaderIndex和markedWriterIndex。

 

接下来看下初始化分配的ByteBuf的结构图

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. *      +-------------------+------------------+------------------+  
  3. *      |                       writable bytes    
  4. *      +-------------------+------------------+------------------+  
  5. *      |                                                         |  
  6. *      0=readerIndex=writerIndex                             capacity  
  7. *  

ByteBuf通过两个位置指针来协助缓冲区的读写操作,读操作使用readerIndex,写操作使用writerIndex。readerIndex和writerIndex的取值一开始都是0,随着数据的写入writerIndex会增加,读取数据会readerIndex增加,但是它不会超出writerIndex。在读取之后,0~readerIndex就视为discard的,调用discardReadBytes()方法,可以释放这部分空间。readerIndex和writerIndex之间的数据是可读的,等价于ByteBuffer position和limit之间的数据。writerIndex和capacity之间的空间是可写的,等价于ByteBuffer  limit和capacity之间的可用空间。

 

 

写入N个字节后的ByteBuf

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. *      +-------------------+------------------+------------------+  
  3. *      |         readable bytes               |  writable bytes  |  
  4. *      +-------------------+------------------+------------------+  
  5. *      |                                                         |  
  6. *      0=readerIndex                      writerIndex        capacity  
  7. *  

 

 

读取M(<N)个字节之后的ByteBuf

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. *      +-------------------+------------------+------------------+  
  3. *      | discardable bytes |  readable bytes  |  writable bytes  |  
  4. *      +-------------------+------------------+------------------+  
  5. *      |                   |                  |                  |  
  6. *      0             M=readerIndex      N=writerIndex         capacity  
  7. *  


调用discardReadBytes操作之后的ByetBuf

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. *      +-------------------+------------------+------------------+  
  3. *      |  readable bytes   |            writable bytes    
  4. *      +-------------------+------------------+------------------+  
  5. *      |                                                         |  
  6. *      0=readerIndex     N-M=writerIndex                      capacity  
  7. *  
 

调用clear操作之后的ByteBuf

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. *  
  2. *      +-------------------+------------------+------------------+  
  3. *      |                       writable bytes(more space)   
  4. *      +-------------------+------------------+------------------+  
  5. *      |                                                         |  
  6. *      0=readerIndex=writerIndex                             capacity  
  7. *  

 

2.AbstractReferenceCountedByteBuf

 

  AbstractReferenceCountedByteBuf继承AbstractByteBuf,从类的名字可以看出该类是对引用进行计数,用于跟踪对象的分配和销毁,做自动内存回收。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {  
  2.   
  3.     private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =  
  4.             AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class"refCnt");  
  5.   
  6.     private static final long REFCNT_FIELD_OFFSET;  
  7.   
  8.     static {  
  9.         long refCntFieldOffset = -1;  
  10.         try {  
  11.             if (PlatformDependent.hasUnsafe()) {  
  12.                 refCntFieldOffset = PlatformDependent.objectFieldOffset(  
  13.                         AbstractReferenceCountedByteBuf.class.getDeclaredField("refCnt"));  
  14.             }  
  15.         } catch (Throwable t) {  
  16.             // Ignored  
  17.         }  
  18.   
  19.         REFCNT_FIELD_OFFSET = refCntFieldOffset;  
  20.     }  
  21.   
  22.     @SuppressWarnings("FieldMayBeFinal")  
  23.     private volatile int refCnt = 1;  

首先看到第一个字段refCntUpdater ,它是AtomicIntegerFieldUpdater类型变量,通过原子方式对成员变量进行更新等操作,以实现线程安全,消除锁。第二个字段是REFCNT_FIELD_OFFSET,它用于标识refCnt字段在AbstractReferenceCountedByteBuf 中内存地址,该地址的获取是JDK实现强相关的,如果是SUN的JDK,它通过sun.misc.Unsafe的objectFieldOffset接口获得的,ByteBuf的实现之类UnpooledUnsafeDirectByteBuf和PooledUnsafeDirectByteBuf会使用这个偏移量。最后定义一个volatile修饰的refCnt字段用于跟踪对象的引用次数,使用volatile是为了解决多线程并发的可见性问题。

 


对象引用计数器,每次调用一次retain,引用计数器就会加一,由于可能存在多线程并发调用的场景,所以他的累计操作必须是线程安全的,看下具体的实现细节。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.   public ByteBuf retain(int increment) {  
  3.       if (increment <= 0) {  
  4.           throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");  
  5.       }  
  6.   
  7.       for (;;) {  
  8.           int refCnt = this.refCnt;  
  9.           if (refCnt == 0) {  
  10.               throw new IllegalReferenceCountException(0, increment);  
  11.           }  
  12.           if (refCnt > Integer.MAX_VALUE - increment) {  
  13.               throw new IllegalReferenceCountException(refCnt, increment);  
  14.           }  
  15.           if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {  
  16.               break;  
  17.           }  
  18.       }  
  19.       return this;  
  20.   }  

通过自旋对引用计数器进行加一操作,由于引用计数器的初始值为1,如果申请和释放操作能保证正确使用,则它的最小值为1。当被释放和被申请的次数相等时,就调用回收方法回收当前的ByteBuf对象。通过compareAndSet进行原子更新,它会使用自己获取的值和期望值进行对比,一样则修改,否则进行自旋,继续尝试直到成功(compareAndSet是操作系统层面提供的原子操作,称为CAS)。释放引用计数器的代码和对象引用计数器类似,释放引用计数器的每次减一,当refCnt==1时意味着申请和释放相等,说明对象引用已经不可达,该对象需要被释放和回收。回收则是通过调用子类的deallocate方法来释放ByteBuf对象。

看下UnpooledHeapByteBuf中deallocate的实现

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.    protected void deallocate() {  
  3.        array = null;  
  4.    }  

看下UnpooledUnsafeDirectByteBuf和UnpooledDirectByteBuf的deallocate实现细节

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.    protected void deallocate() {  
  3.        ByteBuffer buffer = this.buffer;  
  4.        if (buffer == null) {  
  5.            return;  
  6.        }  
  7.   
  8.        this.buffer = null;  
  9.   
  10.        if (!doNotFree) {  
  11.            freeDirect(buffer);  
  12.        }  
  13.    }  

再看freeDirect

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. protected void freeDirect(ByteBuffer buffer) {  
  2.         PlatformDependent.freeDirectBuffer(buffer);  
  3.     }  

再看freeDirectBuffer

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.      * Try to deallocate the specified direct {@link ByteBuffer}.  Please note this method does nothing if 
  3.      * the current platform does not support this operation or the specified buffer is not a direct buffer. 
  4.      */  
  5.     public static void freeDirectBuffer(ByteBuffer buffer) {  
  6.         if (buffer.isDirect()) {  
  7.             if (hasUnsafe()) {  
  8.                 PlatformDependent0.freeDirectBufferUnsafe(buffer);  
  9.             } else {  
  10.                 PlatformDependent0.freeDirectBuffer(buffer);  
  11.             }  
  12.         }  
  13.     }  

PlatformDependent0.freeDirectBufferUnsafe(buffer)实现细节

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. static void freeDirectBufferUnsafe(ByteBuffer buffer) {  
  2.         Cleaner cleaner;  
  3.         try {  
  4.             cleaner = (Cleaner) getObject(buffer, CLEANER_FIELD_OFFSET);  
  5.             if (cleaner == null) {  
  6.                 throw new IllegalArgumentException(  
  7.                         "attempted to deallocate the buffer which was allocated via JNIEnv->NewDirectByteBuffer()");  
  8.             }  
  9.             cleaner.clean();  
  10.         } catch (Throwable t) {  
  11.             // Nothing we can do here.  
  12.         }  
  13.     }  

PlatformDependent0.freeDirectBuffer(buffer)实现细节

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. static void freeDirectBuffer(ByteBuffer buffer) {  
  2.        if (CLEANER_FIELD == null) {  
  3.            return;  
  4.        }  
  5.        try {  
  6.            Cleaner cleaner = (Cleaner) CLEANER_FIELD.get(buffer);  
  7.            if (cleaner == null) {  
  8.                throw new IllegalArgumentException(  
  9.                        "attempted to deallocate the buffer which was allocated via JNIEnv->NewDirectByteBuffer()");  
  10.            }  
  11.            cleaner.clean();  
  12.        } catch (Throwable t) {  
  13.            // Nothing we can do here.  
  14.        }  
  15.    }  

可以看到UnpooledUnsafeDirectByteBuf和UnpooledDirectByteBuf的deallocate最终都是通过Cleaner类进行堆外的垃圾回收。Cleaner 是PhantomReference(虚引用)的子类。

3. UnpooledHeapByteBuf

 

 UnpooledHeapByteBuf是AbstractReferenceCountedByteBuf的子类,UnpooledHeapByteBuf是基于堆内存进行内存分配的字节码缓存区,它没有基于对象池技术实现,这就意味着每次I/O的读写都会创建一个新的UnpooledHeapByteBuf,频繁进行大块内存的分配和回收对性能造成一定的影响,但是相比堆外内存的申请和释放,它的成本还是会低一些。

看下UnpooledHeapByteBuf的成员变量定义

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {  
  2.   
  3.     private final ByteBufAllocator alloc;  
  4.     private byte[] array;  
  5.     private ByteBuffer tmpNioBuf;  

首先它聚合了一个ByteBufAllocator,用于UnpooledHeapByteBuf的内存分配,紧接着定义了一个byte数组作为缓冲区,最后定义一个ByteBuffer类型的tmpNioBuf变量用于实现Netty ByteBuf到JDK NIO ByteBuffer的转正。

 

看下UnpooledHeapByteBuf类缓冲区的自动扩展的实现

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2.     public ByteBuf capacity(int newCapacity) {  
  3.         ensureAccessible();  
  4.         if (newCapacity < 0 || newCapacity > maxCapacity()) {  
  5.             throw new IllegalArgumentException("newCapacity: " + newCapacity);  
  6.         }  
  7.   
  8.         int oldCapacity = array.length;  
  9.         if (newCapacity > oldCapacity) {  
  10.             byte[] newArray = new byte[newCapacity];  
  11.             System.arraycopy(array, 0, newArray, 0, array.length);  
  12.             setArray(newArray);  
  13.         } else if (newCapacity < oldCapacity) {  
  14.             byte[] newArray = new byte[newCapacity];  
  15.             int readerIndex = readerIndex();  
  16.             if (readerIndex < newCapacity) {  
  17.                 int writerIndex = writerIndex();  
  18.                 if (writerIndex > newCapacity) {  
  19.                     writerIndex(writerIndex = newCapacity);  
  20.                 }  
  21.                 System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);  
  22.             } else {  
  23.                 setIndex(newCapacity, newCapacity);  
  24.             }  
  25.             setArray(newArray);  
  26.         }  
  27.         return this;  
  28.     }  

方法入口首先对新容量进行合法性校验,不通过则抛出IllegalArgumentException,然后判断新的容量是否大于当前的缓冲区容量,如果大于容量则进行动态扩容,通过new byte[newCapacity]创建新的缓冲区字节数组,然后通过System.arraycopy()进行内存复制,将旧的字节数组复制到新创建的字节数组中,最后调用setArray替代旧的字节数组。
如果新的容量小于当前的缓冲区容量,不需要动态扩展,但需要截取当前缓冲区创建一个新的子缓冲区,具体的算法如下:首先判断下读取索引是否小于新的容量值,如果下雨进一步写索引是否大于新的容量,如果大于则将写索引设置为新的容量值。之后通过System.arraycopy将当前可读的字节数组复制到新创建的子缓冲区。如果新的容量值小于读索引,说明没有可读的字节数组需要复制到新创建的缓冲区中。

 

 

4.PooledHeapByteBuf

 

  PooledHeapByteBuf比UnpooledHeapByteBuf复杂一点,用到了线程池技术。首先来看看Recycler类。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.  * Light-weight object pool based on a thread-local stack. 
  3.  * 
  4.  * @param <T> the type of the pooled object 
  5.  */  
  6. public abstract class Recycler<T> {  
  7.   
  8.     private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {  
  9.         @Override  
  10.         protected Stack<T> initialValue() {  
  11.             return new Stack<T>(Recycler.this, Thread.currentThread());  
  12.         }  
  13.     };  

看注解就知道,Recycler是一个轻量级的线程池实现,通过定义了一个threadLocal,并初始化,看下初始化的详细

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. static final class Stack<T> implements Handle<T> {  
  2.   
  3.        private static final int INITIAL_CAPACITY = 256;  
  4.   
  5.        final Recycler<T> parent;  
  6.        final Thread thread;  
  7.        private T[] elements;  
  8.        private int size;  
  9.        private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);  
  10.   
  11.        @SuppressWarnings({ "unchecked""SuspiciousArrayCast" })  
  12.        Stack(Recycler<T> parent, Thread thread) {  
  13.            this.parent = parent;  
  14.            this.thread = thread;  
  15.            elements = newArray(INITIAL_CAPACITY);  
  16.        }  

 

Stack中定义了成员变量线程池、当前线程、数组、数字大小、map ,map主要用来验证线程池中是否已经存在。

 

继续看,PooledByteBuf类继承了AbstractReferenceCountedByteBuf,看下PooledByteBuf中定义的成员变量。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {  
  2.   
  3.     private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;  
  4.   
  5.     protected PoolChunk<T> chunk;  
  6.     protected long handle;  
  7.     protected T memory;  
  8.     protected int offset;  
  9.     protected int length;  
  10.     private int maxLength;  
  11.   
  12.     private ByteBuffer tmpNioBuf;  
  13.   
  14.     @SuppressWarnings("unchecked")  
  15.     protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {  
  16.         super(maxCapacity);  
  17.         this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;  
  18.     }  

其中chunk主要用来组织和管理内存的分配和释放。


5.ByteBufAllocator

 

 

  ByteBufAllocator是字节缓冲区分配器,按照Netty的缓冲区实现的不同,共有两者不同的分配器:基于内存池的字节缓冲区分配器和普通的字节缓冲区分配器。接口的继承关系如下。

 

看下ByteBufAllocator中定义的常用接口

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.     * Allocate a {@link ByteBuf}. If it is a direct or heap buffer 
  3.     * depends on the actual implementation. 
  4.     */  
  5.    ByteBuf buffer();  
  6.   
  7.    /** 
  8.     * Allocate a {@link ByteBuf} with the given initial capacity. 
  9.     * If it is a direct or heap buffer depends on the actual implementation. 
  10.     */  
  11.    ByteBuf buffer(int initialCapacity);  
  12.   
  13.    /** 
  14.     * Allocate a {@link ByteBuf} with the given initial capacity and the given 
  15.     * maximal capacity. If it is a direct or heap buffer depends on the actual 
  16.     * implementation. 
  17.     */  
  18.    ByteBuf buffer(int initialCapacity, int maxCapacity);  
  19.   
  20.    /** 
  21.     * Allocate a {@link ByteBuf} whose initial capacity is 0, preferably a direct buffer which is suitable for I/O. 
  22.     */  
  23.    ByteBuf ioBuffer();  
  24.   
  25.    /** 
  26.     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. 
  27.     */  
  28.    ByteBuf ioBuffer(int initialCapacity);  
  29.   
  30.    /** 
  31.     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. 
  32.     */  
  33.    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);  
  34.   
  35.    /** 
  36.     * Allocate a heap {@link ByteBuf}. 
  37.     */  
  38.    ByteBuf heapBuffer();  
  39.   
  40.    /** 
  41.     * Allocate a heap {@link ByteBuf} with the given initial capacity. 
  42.     */  
  43.    ByteBuf heapBuffer(int initialCapacity);  
  44.   
  45.    /** 
  46.     * Allocate a heap {@link ByteBuf} with the given initial capacity and the given 
  47.     * maximal capacity. 
  48.     */  
  49.    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);  
  50.   
  51.    /** 
  52.     * Allocate a direct {@link ByteBuf}. 
  53.     */  
  54.    ByteBuf directBuffer();  
  55.   
  56.    /** 
  57.     * Allocate a direct {@link ByteBuf} with the given initial capacity. 
  58.     */  
  59.    ByteBuf directBuffer(int initialCapacity);  
  60.   
  61.    /** 
  62.     * Allocate a direct {@link ByteBuf} with the given initial capacity and the given 
  63.     * maximal capacity. 
  64.     */  
  65.    ByteBuf directBuffer(int initialCapacity, int maxCapacity);  
  66.   
  67.      

 

三.总结下

 

1.ByteBuffer必须自己长度固定,一旦分配完成,它的容量不能动态扩展和收缩;ByteBuf默认容器大小为256,支持动态扩容,在允许的最大扩容范围内(Integer.MAX_VALUE)。

2.ByteBuffer只有一个标识位置的指针,读写的时候需要手动的调用flip()和rewind()等,否则很容易导致程序处理失败。而ByteBuf有两个标识位置的指针,一个写writerIndex,一个读readerIndex,读写的时候不需要调用额外的方法。

3.NIO的SocketChannel进行网络读写时,操作的对象是JDK标准的java.nio.byteBuffer。由于Netty使用统一的ByteBuf替代JDK原生的java.nio.ByteBuffer,所以ByteBuf中定义了ByteBuffer nioBuffer()方法将ByteBuf转换成ByteBuffer。

分享到:
评论

相关推荐

    内存bytebuf读写模型

    java的bytebuffer必须要进行读写模式的手动切换,否则不能正常读写数据

    对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解1

    2. wrap 操作:可以将 byte[] 数组、ByteBuf、ByteBuffer 等包装成一个 Netty ByteBuf 对象,避免了拷贝操作。 3. ByteBuf 的 slice 操作:可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免了内存的...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    ByteBuf是Netty提供的字节缓冲区,它是Java NIO ByteBuffer的一个增强版。ByteBuf提供了一种更高效的内存管理方式,支持读写索引独立,避免了BufferOverflow和BufferUnderflow等异常。它还支持堆内和堆外内存,以及...

    Netty Demo

    Netty的ByteBuf 是一个高效的数据缓冲区,替代了传统的Java NIO ByteBuffer。ByteBuf 提供了更友好的API,支持读写索引管理,以及更灵活的缓冲区操作。在"Netty Demo"项目中,你可能会看到如何创建和使用ByteBuf来...

    netty.chm nettyapi

    ByteBuf提供了更友好的API,同时解决了ByteBuffer的一些问题,如内存管理、读写指针管理和容量扩展等。 3. **Channel**:Channel是Netty中的核心概念,它代表了网络连接的一个端点,可以进行读写操作。例如,...

    Netty5.0架构剖析和源码解读.pdf

    相较于Java NIO中的ByteBuffer,ByteBuf提供了更丰富的操作接口,如预读、后读、分配、复制等,同时减少了内存拷贝,提升了性能。 在Netty5.0中,还引入了ByteToMessageDecoder和MessageToByteEncoder这两个编码...

    netty实战-netty-thing.zip

    4. **ByteBuf**:Netty提供了ByteBuf作为高效的数据缓冲区,相比Java的ByteBuffer,ByteBuf提供了更友好的API,支持自动扩容和内存池,减少了内存分配和垃圾回收的压力。 5. **零拷贝**:Netty实现了零拷贝技术,...

    netty例子 官方

    Netty 使用 ByteBuf 替代了 Java 的 ByteBuffer,ByteBuf 提供了更方便的操作字节缓冲区的方式,如预读、后写等,并且能够跟踪缓冲区的使用情况,避免了直接操作ByteBuffer的复杂性。 3. **Channel** 和 **...

    Netty4.0 官网例子(免费)

    1. **ByteBuf**:Netty4.0 引入了 ByteBuf 作为缓冲区,替代了传统的 ByteBuffer。ByteBuf 提供了更高效的内存管理,支持读写分离,避免了 ByteBuffer 的翻转操作,提高了性能。 2. **Channel**:Channel 是 Netty ...

    netty jar包

    2. **ByteBuf**:Netty 提供了 ByteBuf 类作为缓冲区,替代了 JDK 的 ByteBuffer。ByteBuf 提供了更友好的API,支持读写索引,以及自动扩容和缩容,方便数据的读取和写入。 3. **Channel** 和 **EventLoop**:...

    netty权威指南第二版 maven版本源码

    3. **ByteBuf**: ByteBuf是Netty提供的字节缓冲区,相比Java NIO中的ByteBuffer,ByteBuf提供了更方便的操作接口,如读写索引管理,以及自动扩容等功能,更适应于网络通信的需求。 4. **Pipeline与Handler**: Netty...

    netty.7z44444444444444444444444444

    Netty引入了ByteBuf作为字节缓冲区,替代了Java NIO中的ByteBuffer。ByteBuf提供了更友好的API,支持读写指针独立管理,避免了缓冲区的拷贝,提高了性能。 3. **Channel与Handler** Channel是Netty中连接的抽象,...

    Netty权威指南 第2版 带书签目录 完整版 及原书自带源码

    2. **ByteBuf**:Netty自定义的内存管理机制,替代了Java NIO中的ByteBuffer。ByteBuf提供更高效、安全的字节操作,并优化了内存分配和复用。 3. **Channel**和**Pipeline**:Channel代表一个连接,负责数据的读写...

    demo.rar netty代码例子

    相比 Java NIO 中的 ByteBuffer,ByteBuf 提供了更多的便利功能,如前向和后向增长、读写索引管理等。 3. **EventLoop 和 EventLoopGroup**:EventLoop 负责执行 Channel 上的事件处理器(Handler)。...

    netty博文资料

    相较于 Java NIO 中的 ByteBuffer,ByteBuf 提供了更高效、更安全的内存管理,支持读写索引的独立跟踪,避免了不必要的内存拷贝。 总的来说,Netty 是一款强大的网络编程框架,它的设计理念和丰富的功能使其在...

    netty开源 源码,用于阅读学习

    1. **ByteBuf**:Netty 自定义的缓冲区类,替代了Java的ByteBuffer。ByteBuf 提供了更友好的API,支持读写索引独立管理,提高了内存操作的效率。 2. **ChannelHandlerContext**:在处理器之间传递上下文信息和触发...

    netty快速入门教程7-8集 共12集

    相比 Java 的 ByteBuffer,ByteBuf 提供了更丰富的操作和更好的性能。在处理 Protocol Buffers 数据时,会涉及到 ByteBuf 的使用和管理。 3. **ProtoBufDecoder 和 ProtoBufEncoder**:Netty 中的解码器和编码器是...

    netty聊天项目

    3. **ByteBuf**:Netty 自定义了 ByteBuf 作为数据缓冲区,替代了传统的 Java NIO ByteBuffer。ByteBuf 提供了更友好的API,支持直接内存和堆内存,以及更高效的内存管理。 4. **Handler**:Handler 是 Netty 中的...

    netty项目代码

    6. **ByteBuf**:Netty的ByteBuf是高效的数据缓冲区,替代了Java NIO中的ByteBuffer。ByteBuf提供了一种更灵活且高效的内存管理方式,支持读写索引跟踪,以及自动扩容和缩容。 在这个"Netty项目代码"中,开发者可能...

    netty 3.2 api 中文版

    5. **ByteBuf**:Netty提供的高效缓冲区,替代了Java NIO的ByteBuffer。ByteBuf提供了更丰富的操作方法,并优化了内存管理,降低了内存碎片。 Netty 3.2还支持多种协议,如HTTP、FTP、SMTP、SSL/TLS等,这使得它在...

Global site tag (gtag.js) - Google Analytics