- 浏览: 978858 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
引言
前一篇文章我们看了抽象字节buf引用计数器,我们先来看回顾一下:
抽象字节引用计数器AbstractReferenceCountedByteBuf,内部有一个引用计数器,以及原子更新引用计数器的refCntUpdater(AbstractReferenceCountedByteBuf),更新引用计数器,实际通过refCntUpdater CAS操作,释放对象引用的时候,如果引用计数器为0,则释放对象相关资源。
今天我们将要看的是,抽象字节buf引用计数器一个具体实现CompositeByteBuf
先来看一下组件Component的定义
从上来看,组件可以看做字节buf的包装。
再来看构造:
复合buf的构造主要有一下几点要看:
1.
2.
3.
4.
我们分别来看这几点:
1.
2.
来看一下更新索引和更新写索引:
//更新写索引
3.
4.
从上面可以看出:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在
复合buf中的起始位置和结束位置,及buf可读数据长度。
其他添加字节buf到复合buf的的方法见附。
来看移除buf
再来看返回复合buf迭代器:
//CompositeByteBufIterator
再来从复合buf读取数据到字节buf集合
再来看字节buf相关的方法:
复合buf的其他方法,在附篇,我们只需要知道复合buf实际为字节buf数据概念即可,如果想进一步了解复合buf,可以看附篇的方法。
总结:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。
附:
这部分,不做详细详解,just see see...
添加字节buf到复合buf
get*相关方法:
其他get*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的get*方法。
再来看getBytes*相关方法
再来看读取到字节buf,输出流,聚集字节buf
从上面可以看出,getBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的getBytes*方法,将数据写到目的字节buf,nio buf,文件通道,输出流等。
再来看set方法:
set*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的set*方法。
再来看setBytes*方法:
setBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的setBytes*方法,将字节数组,字节buf,nio buf,文件通道,输出流等数据写到当前复合buf中。
再来看readBytes方法:
从上来看readBytes*方法,委托给父类抽象buf的readBytes*方法,实际通过getBytes*方法。
再来看write*方法:
从上来看write*方法,委托给父类抽象buf的write*方法,实际通过write*方法。
再来看其他方法,看看就行:
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
引言
前一篇文章我们看了抽象字节buf引用计数器,我们先来看回顾一下:
抽象字节引用计数器AbstractReferenceCountedByteBuf,内部有一个引用计数器,以及原子更新引用计数器的refCntUpdater(AbstractReferenceCountedByteBuf),更新引用计数器,实际通过refCntUpdater CAS操作,释放对象引用的时候,如果引用计数器为0,则释放对象相关资源。
今天我们将要看的是,抽象字节buf引用计数器一个具体实现CompositeByteBuf
package io.netty.buffer; import io.netty.util.internal.EmptyArrays; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * A virtual buffer which shows multiple buffers as a single merged buffer. It is recommended to use * {@link ByteBufAllocator#compositeBuffer()} or {@link Unpooled#wrappedBuffer(ByteBuf...)} instead of calling the * constructor explicitly. 复合直接为一个虚拟的buf,将多个buf展示为一个合并的buf。强烈建议使用ByteBufAllocator#compositeBuffer()和 Unpooled#wrappedBuffer(ByteBuf...)构造复合buf。 */ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> { private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer(); private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator(); private final ByteBufAllocator alloc;//字节buf分配器 private final boolean direct;//是否为direct buf private final List<Component> components;//字节buf集合 private final int maxNumComponents;//buf即最大容量 private boolean freed; }
先来看一下组件Component的定义
private static final class Component { final ByteBuf buf;//内部字节buf final int length;//buf字节长度 int offset;//在复合buf中的开始位置 int endOffset;//在复合buf中的结束位置 Component(ByteBuf buf) { this.buf = buf; length = buf.readableBytes(); } void freeIfNecessary() { buf.release(); // We should not get a NPE here. If so, it must be a bug. } }
从上来看,组件可以看做字节buf的包装。
再来看构造:
// Special constructor used by WrappedCompositeByteBuf CompositeByteBuf(ByteBufAllocator alloc) { super(Integer.MAX_VALUE); this.alloc = alloc; direct = false; maxNumComponents = 0; components = Collections.emptyList(); } public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) { super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY); if (alloc == null) { throw new NullPointerException("alloc"); } this.alloc = alloc; this.direct = direct; this.maxNumComponents = maxNumComponents; //创建字节buf集 components = newList(maxNumComponents); } public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) { this(alloc, direct, maxNumComponents, buffers, 0, buffers.length); } public CompositeByteBuf( ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable<ByteBuf> buffers) { super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY); if (alloc == null) { throw new NullPointerException("alloc"); } if (maxNumComponents < 2) { throw new IllegalArgumentException( "maxNumComponents: " + maxNumComponents + " (expected: >= 2)"); } this.alloc = alloc; this.direct = direct; this.maxNumComponents = maxNumComponents; components = newList(maxNumComponents);//创建字节buf集 addComponents0(false, 0, buffers);//添加字节buf数组,到buf集 consolidateIfNeeded();//调整底层字节buf setIndex(0, capacity());//更新复合buf读写索引 } CompositeByteBuf( ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf[] buffers, int offset, int len) { super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY); if (alloc == null) { throw new NullPointerException("alloc"); } if (maxNumComponents < 2) { throw new IllegalArgumentException( "maxNumComponents: " + maxNumComponents + " (expected: >= 2)"); } this.alloc = alloc; this.direct = direct; this.maxNumComponents = maxNumComponents; components = newList(maxNumComponents);//创建字节buf集 addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集 consolidateIfNeeded();//调整底层字节buf setIndex(0, capacity());//更新复合buf读写索引 }
复合buf的构造主要有一下几点要看:
1.
components = newList(maxNumComponents);//创建字节buf集
2.
addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集
3.
consolidateIfNeeded();//检查是否需要扩展buf集
4.
setIndex(0, capacity());//更新复合buf读写索引
我们分别来看这几点:
1.
components = newList(maxNumComponents);//创建字节buf集
private static List<Component> newList(int maxNumComponents) { return new ArrayList<Component>(Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents)); }
2.
addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集
private int addComponents0(boolean increaseIndex, int cIndex, Iterable<ByteBuf> buffers) { if (buffers instanceof ByteBuf) { // If buffers also implements ByteBuf (e.g. CompositeByteBuf), it has to go to addComponent(ByteBuf). //处理实现CompositeByteBuf的字节buf场景 return addComponent0(increaseIndex, cIndex, (ByteBuf) buffers); } checkNotNull(buffers, "buffers"); //如果buffers不是集合类,则将buffers转换为字节buf集合 if (!(buffers instanceof Collection)) { List<ByteBuf> list = new ArrayList<ByteBuf>(); try { for (ByteBuf b: buffers) { list.add(b); } buffers = list; } finally { if (buffers != list) { for (ByteBuf b: buffers) { if (b != null) { try { //释放源buf数组中的字节buf b.release(); } catch (Throwable ignored) { // ignore } } } } } } Collection<ByteBuf> col = (Collection<ByteBuf>) buffers; return addComponents0(increaseIndex, cIndex, col.toArray(new ByteBuf[col.size()]), 0 , col.size()); } private int addComponents0(boolean increaseWriterIndex, int cIndex, ByteBuf[] buffers, int offset, int len) { checkNotNull(buffers, "buffers"); int i = offset; try { checkComponentIndex(cIndex); // No need for consolidation while (i < len) { // Increment i now to prepare for the next iteration and prevent a duplicate release (addComponent0 // will release if an exception occurs, and we also release in the finally block here). ByteBuf b = buffers[i++]; if (b == null) { break; } //添加字节buf到复合buf的索引cIndex上 cIndex = addComponent0(increaseWriterIndex, cIndex, b) + 1; int size = components.size(); if (cIndex > size) { cIndex = size; } } return cIndex; } finally { for (; i < len; ++i) { ByteBuf b = buffers[i]; if (b != null) { try { //释放源buf数组中的字节buf b.release(); } catch (Throwable ignored) { // ignore } } } } } //检查索引是否越界 private void checkComponentIndex(int cIndex) { ensureAccessible(); if (cIndex < 0 || cIndex > components.size()) { throw new IndexOutOfBoundsException(String.format( "cIndex: %d (expected: >= 0 && <= numComponents(%d))", cIndex, components.size())); } } ** * Precondition is that {@code buffer != null}. 添加字节buf到复合buf的索引cIndex上 */ private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) { assert buffer != null; boolean wasAdded = false; try { checkComponentIndex(cIndex); int readableBytes = buffer.readableBytes(); // No need to consolidate - just add a component to the list. @SuppressWarnings("deprecation") //包装buf为buf组件 Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice()); if (cIndex == components.size()) { wasAdded = components.add(c); if (cIndex == 0) { //buf添加到复合buf集第一个位置 c.endOffset = readableBytes; } else { //buf添加到复合buf集最后一个位置 Component prev = components.get(cIndex - 1); c.offset = prev.endOffset; c.endOffset = c.offset + readableBytes; } } else { //添加buf到复合buf集合 components.add(cIndex, c); wasAdded = true; if (readableBytes != 0) { //更新索引 updateComponentOffsets(cIndex); } } if (increaseWriterIndex) { //更新写索引 writerIndex(writerIndex() + buffer.readableBytes()); } return cIndex; } finally { if (!wasAdded) { buffer.release(); } } }
来看一下更新索引和更新写索引:
//更新复合buf集合中cIndex位置上的buf的开始位置和终止位置 private void updateComponentOffsets(int cIndex) { int size = components.size(); if (size <= cIndex) { return; } Component c = components.get(cIndex); if (cIndex == 0) { c.offset = 0; c.endOffset = c.length; cIndex ++; } for (int i = cIndex; i < size; i ++) { Component prev = components.get(i - 1); Component cur = components.get(i); cur.offset = prev.endOffset; cur.endOffset = cur.offset + cur.length; } }
//更新写索引
@Override public CompositeByteBuf writerIndex(int writerIndex) { return (CompositeByteBuf) super.writerIndex(writerIndex); }
3.
consolidateIfNeeded();//检查是否需要扩展buf集
/** * This should only be called as last operation from a method as this may adjust the underlying * array of components and so affect the index etc. */ private void consolidateIfNeeded() { // Consolidate if the number of components will exceed the allowed maximum by the current // operation. final int numComponents = components.size(); if (numComponents > maxNumComponents) { //如果当前buf的数量大于复合buf集最大容量,则获取最后一个buf的结束位置,即复合buf的数据长度 final int capacity = components.get(numComponents - 1).endOffset; //分配capacity容量的字节buf ByteBuf consolidated = allocBuffer(capacity); // We're not using foreach to avoid creating an iterator. //将复合buf集中的所有字节buf,整合到一个buf中 for (int i = 0; i < numComponents; i ++) { Component c = components.get(i); ByteBuf b = c.buf; consolidated.writeBytes(b); c.freeIfNecessary(); } Component c = new Component(consolidated); c.endOffset = c.length; components.clear();//清空原始复合buf集 components.add(c);//添加整合后的buf到复合buf集 } }
//分配capacity容量的字节buf private ByteBuf allocBuffer(int capacity) { return direct ? alloc().directBuffer(capacity) : alloc().heapBuffer(capacity); }
4.
setIndex(0, capacity());//更新复合buf读写索引
@Override public CompositeByteBuf setIndex(int readerIndex, int writerIndex) { return (CompositeByteBuf) super.setIndex(readerIndex, writerIndex); }
从上面可以看出:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在
复合buf中的起始位置和结束位置,及buf可读数据长度。
其他添加字节buf到复合buf的的方法见附。
来看移除buf
/** * Remove the {@link ByteBuf} from the given index. * * @param cIndex the index on from which the {@link ByteBuf} will be remove */ public CompositeByteBuf removeComponent(int cIndex) { checkComponentIndex(cIndex); //直接从组件buf集合移除buf Component comp = components.remove(cIndex); comp.freeIfNecessary(); if (comp.length > 0) { // Only need to call updateComponentOffsets if the length was > 0 updateComponentOffsets(cIndex);//更新索引对应的字节buf的起止位置 } return this; } /** * Remove the number of {@link ByteBuf}s starting from the given index. * * @param cIndex the index on which the {@link ByteBuf}s will be started to removed * @param numComponents the number of components to remove */ public CompositeByteBuf removeComponents(int cIndex, int numComponents) { checkComponentIndex(cIndex, numComponents); if (numComponents == 0) { return this; } List<Component> toRemove = components.subList(cIndex, cIndex + numComponents); boolean needsUpdate = false; for (Component c: toRemove) { if (c.length > 0) { needsUpdate = true; } c.freeIfNecessary(); } //移除cIndex到cIndex + numComponents字节buf toRemove.clear(); if (needsUpdate) { // Only need to call updateComponentOffsets if the length was > 0 updateComponentOffsets(cIndex);//更新索引对应的字节buf的起止位置 } return this; }
再来看返回复合buf迭代器:
@Override public Iterator<ByteBuf> iterator() { ensureAccessible(); if (components.isEmpty()) { return EMPTY_ITERATOR; } return new CompositeByteBufIterator(); }
//CompositeByteBufIterator
private final class CompositeByteBufIterator implements Iterator<ByteBuf> { private final int size = components.size(); private int index; @Override public boolean hasNext() { return size > index; } @Override public ByteBuf next() { if (size != components.size()) { throw new ConcurrentModificationException(); } if (!hasNext()) { throw new NoSuchElementException(); } try { return components.get(index++).buf; } catch (IndexOutOfBoundsException e) { throw new ConcurrentModificationException(); } } @Override public void remove() { throw new UnsupportedOperationException("Read-Only"); } }
再来从复合buf读取数据到字节buf集合
/** * Same with {@link #slice(int, int)} except that this method returns a list. */ public List<ByteBuf> decompose(int offset, int length) { checkIndex(offset, length); if (length == 0) { return Collections.emptyList(); } //获取开始位置对应的buf的索引 int componentId = toComponentIndex(offset); List<ByteBuf> slice = new ArrayList<ByteBuf>(components.size()); // The first component Component firstC = components.get(componentId); ByteBuf first = firstC.buf.duplicate(); first.readerIndex(offset - firstC.offset); ByteBuf buf = first; int bytesToSlice = length; do { int readableBytes = buf.readableBytes(); if (bytesToSlice <= readableBytes) { // Last component //字节长度小于索引对应的buf的可读字节数 buf.writerIndex(buf.readerIndex() + bytesToSlice); //直接添加buf slice.add(buf); break; } else { //否则向后继续读取buf,直至读到length长度的字节 // Not the last component slice.add(buf); bytesToSlice -= readableBytes; componentId ++; // Fetch the next component. buf = components.get(componentId).buf.duplicate(); } } while (bytesToSlice > 0); // Slice all components because only readable bytes are interesting. //见字节buf对应slice 映射添加到指定的索引上 for (int i = 0; i < slice.size(); i ++) { slice.set(i, slice.get(i).slice()); } return slice; }
再来看字节buf相关的方法:
//判断buf是否为direct类型 @Override public boolean isDirect() { int size = components.size(); if (size == 0) { return false; } for (int i = 0; i < size; i++) { if (!components.get(i).buf.isDirect()) { return false; } } return true; } //判断底层buf是否为字节数组 @Override public boolean hasArray() { switch (components.size()) { case 0: return true; case 1: return components.get(0).buf.hasArray(); default: return false; } } //转化复合buf为字节数组 @Override public byte[] array() { switch (components.size()) { case 0: return EmptyArrays.EMPTY_BYTES; case 1: return components.get(0).buf.array(); default: throw new UnsupportedOperationException(); } } //设置读写索引 @Override public CompositeByteBuf readerIndex(int readerIndex) { return (CompositeByteBuf) super.readerIndex(readerIndex); } @Override public CompositeByteBuf writerIndex(int writerIndex) { return (CompositeByteBuf) super.writerIndex(writerIndex); } @Override public CompositeByteBuf setIndex(int readerIndex, int writerIndex) { return (CompositeByteBuf) super.setIndex(readerIndex, writerIndex); }
复合buf的其他方法,在附篇,我们只需要知道复合buf实际为字节buf数据概念即可,如果想进一步了解复合buf,可以看附篇的方法。
总结:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。
附:
这部分,不做详细详解,just see see...
添加字节buf到复合buf
/** * Add the given {@link ByteBuf}. * <p> * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * If you need to have it increased use {@link #addComponent(boolean, ByteBuf)}. 添加字节buf到复合buf,不会更新写索引 * <p> * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this * {@link CompositeByteBuf}. */ public CompositeByteBuf addComponent(ByteBuf buffer) { return addComponent(false, buffer); } /** * Add the given {@link ByteBuf}s. * <p> * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * If you need to have it increased use {@link #addComponents(boolean, ByteBuf[])}. 与上面不同的是,添加的是一个字节buf组 * <p> * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link CompositeByteBuf}. * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()} * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. */ public CompositeByteBuf addComponents(ByteBuf... buffers) { return addComponents(false, buffers); } /** * Add the given {@link ByteBuf}s. * <p> * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * If you need to have it increased use {@link #addComponents(boolean, Iterable)}. 添加具有迭代器属性的字节buf到复合buf,不会更新写索引 * <p> * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link CompositeByteBuf}. * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()} * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. */ public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) { return addComponents(false, buffers); } /** * Add the given {@link ByteBuf} on the specific index. * <p> * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * If you need to have it increased use {@link #addComponent(boolean, int, ByteBuf)}. 添加到字节buf到复合buf的指定索引上 * <p> * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * @param cIndex the index on which the {@link ByteBuf} will be added. * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this * {@link CompositeByteBuf}. */ public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) { return addComponent(false, cIndex, buffer); } /** * Add the given {@link ByteBuf} and increase the {@code writerIndex} if {@code increaseWriterIndex} is * {@code true}. *添加buf到复合buf,并根据increaseWriterIndex参数,决定是否更新复合buf写索引 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this * {@link CompositeByteBuf}. */ public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { checkNotNull(buffer, "buffer"); addComponent0(increaseWriterIndex, components.size(), buffer); consolidateIfNeeded(); return this; } /** * Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is * {@code true}. *与上面方法不同的是,可以添加多个buf * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link CompositeByteBuf}. * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()} * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. */ public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) { addComponents0(increaseWriterIndex, components.size(), buffers, 0, buffers.length); consolidateIfNeeded(); return this; } /** * Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is * {@code true}. * 与上面方法不同的是,添加的buf为集合buf * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link CompositeByteBuf}. * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()} * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. */ public CompositeByteBuf addComponents(boolean increaseWriterIndex, Iterable<ByteBuf> buffers) { addComponents0(increaseWriterIndex, components.size(), buffers); consolidateIfNeeded(); return this; } /** * Add the given {@link ByteBuf} on the specific index and increase the {@code writerIndex} * if {@code increaseWriterIndex} is {@code true}. *添加buf到复合buf的指定索引上,并根据increaseWriterIndex参数,决定是否更新复合buf写索引 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * @param cIndex the index on which the {@link ByteBuf} will be added. * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this * {@link CompositeByteBuf}. */ public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) { checkNotNull(buffer, "buffer"); addComponent0(increaseWriterIndex, cIndex, buffer); consolidateIfNeeded(); return this; }
get*相关方法:
//获取位置index对应的字节 @Override public byte getByte(int index) { return _getByte(index); } @Override protected byte _getByte(int index) { Component c = findComponent(index); //委托给组件内部buf对应的方法 return c.buf.getByte(index - c.offset); } //获取索引所在的组件buf private Component findComponent(int offset) { checkIndex(offset); for (int low = 0, high = components.size(); low <= high;) { int mid = low + high >>> 1; Component c = components.get(mid); if (offset >= c.endOffset) { low = mid + 1; } else if (offset < c.offset) { high = mid - 1; } else { assert c.length != 0; return c; } } throw new Error("should not reach here"); } @Override protected int _getInt(int index) { Component c = findComponent(index); if (index + 4 <= c.endOffset) { //委托给组件内部buf对应的方法 return c.buf.getInt(index - c.offset); } else if (order() == ByteOrder.BIG_ENDIAN) { return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff; } else { return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16; } }
其他get*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的get*方法。
再来看getBytes*相关方法
//读取复合buf数据到字节数组 @Override public CompositeByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); if (length == 0) { return this; } //获取位置index开始的组件buf索引 int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); //读取buf数据到目的buf s.getBytes(index - adjustment, dst, dstIndex, localLength); index += localLength; dstIndex += localLength; length -= localLength; i ++; } return this; } //读取buf到nio 字节buf @Override public CompositeByteBuf getBytes(int index, ByteBuffer dst) { int limit = dst.limit(); int length = dst.remaining(); checkIndex(index, length); if (length == 0) { return this; } int i = toComponentIndex(index); try { while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); //调整目的buf limit dst.limit(dst.position() + localLength); //读取组件buf数据到目的buf s.getBytes(index - adjustment, dst); index += localLength; length -= localLength; i ++; } } finally { dst.limit(limit); } return this; }
再来看读取到字节buf,输出流,聚集字节buf
@Override public CompositeByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.capacity()); if (length == 0) { return this; } int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); //读buf数据,写到目的buf中 s.getBytes(index - adjustment, dst, dstIndex, localLength); index += localLength; dstIndex += localLength; length -= localLength; i ++; } return this; } @Override public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { int count = nioBufferCount(); if (count == 1) { return out.write(internalNioBuffer(index, length)); } else { long writtenBytes = out.write(nioBuffers(index, length)); if (writtenBytes > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } else { return (int) writtenBytes; } } } //获取底层buf数 @Override public int nioBufferCount() { switch (components.size()) { case 0: return 1; case 1: return components.get(0).buf.nioBufferCount(); default: int count = 0; int componentsCount = components.size(); for (int i = 0; i < componentsCount; i++) { Component c = components.get(i); count += c.buf.nioBufferCount(); } return count; } } //获取底层nio字节buf @Override public ByteBuffer internalNioBuffer(int index, int length) { switch (components.size()) { case 0: return EMPTY_NIO_BUFFER; case 1: return components.get(0).buf.internalNioBuffer(index, length); default: throw new UnsupportedOperationException(); } } //获取从位置index开始,长度为length字节数据的nio 字节buf @Override public ByteBuffer nioBuffer(int index, int length) { checkIndex(index, length); switch (components.size()) { case 0: return EMPTY_NIO_BUFFER; case 1: ByteBuf buf = components.get(0).buf; if (buf.nioBufferCount() == 1) { return components.get(0).buf.nioBuffer(index, length); } } ByteBuffer merged = ByteBuffer.allocate(length).order(order()); ByteBuffer[] buffers = nioBuffers(index, length); for (ByteBuffer buf: buffers) { merged.put(buf); } merged.flip(); return merged; } //从位置index开始,读取length个字节数据,并将读取的数据添加到nio字节buf数据 @Override public ByteBuffer[] nioBuffers(int index, int length) { checkIndex(index, length); if (length == 0) { return new ByteBuffer[] { EMPTY_NIO_BUFFER }; } List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(components.size()); int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); switch (s.nioBufferCount()) { case 0: throw new UnsupportedOperationException(); case 1: buffers.add(s.nioBuffer(index - adjustment, localLength)); break; default: Collections.addAll(buffers, s.nioBuffers(index - adjustment, localLength)); } index += localLength; length -= localLength; i ++; } //转换读取的nio 字节buf集合 为nio字节buf数组 return buffers.toArray(new ByteBuffer[buffers.size()]); } //读取数据到文件通道 @Override public int getBytes(int index, FileChannel out, long position, int length) throws IOException { int count = nioBufferCount(); if (count == 1) { return out.write(internalNioBuffer(index, length), position); } else { long writtenBytes = 0; for (ByteBuffer buf : nioBuffers(index, length)) { writtenBytes += out.write(buf, position + writtenBytes); } if (writtenBytes > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } return (int) writtenBytes; } } //读取数据到输出流 @Override public CompositeByteBuf getBytes(int index, OutputStream out, int length) throws IOException { checkIndex(index, length); if (length == 0) { return this; } int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); s.getBytes(index - adjustment, out, localLength); index += localLength; length -= localLength; i ++; } return this; }
从上面可以看出,getBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的getBytes*方法,将数据写到目的字节buf,nio buf,文件通道,输出流等。
再来看set方法:
@Override public CompositeByteBuf setByte(int index, int value) { Component c = findComponent(index); c.buf.setByte(index - c.offset, value); return this; } @Override protected void _setByte(int index, int value) { setByte(index, value); } @Override public CompositeByteBuf setInt(int index, int value) { return (CompositeByteBuf) super.setInt(index, value); } @Override protected void _setInt(int index, int value) { Component c = findComponent(index); if (index + 4 <= c.endOffset) { c.buf.setInt(index - c.offset, value); } else if (order() == ByteOrder.BIG_ENDIAN) { _setShort(index, (short) (value >>> 16)); _setShort(index + 2, (short) value); } else { _setShort(index, (short) value); _setShort(index + 2, (short) (value >>> 16)); } }
set*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的set*方法。
再来看setBytes*方法:
@Override public CompositeByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); if (length == 0) { return this; } int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); s.setBytes(index - adjustment, src, srcIndex, localLength); index += localLength; srcIndex += localLength; length -= localLength; i ++; } return this; } @Override public CompositeByteBuf setBytes(int index, ByteBuffer src) { int limit = src.limit(); int length = src.remaining(); checkIndex(index, length); if (length == 0) { return this; } int i = toComponentIndex(index); try { while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); src.limit(src.position() + localLength); s.setBytes(index - adjustment, src); index += localLength; length -= localLength; i ++; } } finally { src.limit(limit); } return this; } @Override public CompositeByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.capacity()); if (length == 0) { return this; } int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); s.setBytes(index - adjustment, src, srcIndex, localLength); index += localLength; srcIndex += localLength; length -= localLength; i ++; } return this; } @Override public int setBytes(int index, InputStream in, int length) throws IOException { checkIndex(index, length); if (length == 0) { return in.read(EmptyArrays.EMPTY_BYTES); } int i = toComponentIndex(index); int readBytes = 0; do { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); if (localLength == 0) { // Skip empty buffer i++; continue; } int localReadBytes = s.setBytes(index - adjustment, in, localLength); if (localReadBytes < 0) { if (readBytes == 0) { return -1; } else { break; } } if (localReadBytes == localLength) { index += localLength; length -= localLength; readBytes += localLength; i ++; } else { index += localReadBytes; length -= localReadBytes; readBytes += localReadBytes; } } while (length > 0); return readBytes; } @Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { checkIndex(index, length); if (length == 0) { return in.read(EMPTY_NIO_BUFFER); } int i = toComponentIndex(index); int readBytes = 0; do { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); if (localLength == 0) { // Skip empty buffer i++; continue; } int localReadBytes = s.setBytes(index - adjustment, in, localLength); if (localReadBytes == 0) { break; } if (localReadBytes < 0) { if (readBytes == 0) { return -1; } else { break; } } if (localReadBytes == localLength) { index += localLength; length -= localLength; readBytes += localLength; i ++; } else { index += localReadBytes; length -= localReadBytes; readBytes += localReadBytes; } } while (length > 0); return readBytes; } @Override public int setBytes(int index, FileChannel in, long position, int length) throws IOException { checkIndex(index, length); if (length == 0) { return in.read(EMPTY_NIO_BUFFER, position); } int i = toComponentIndex(index); int readBytes = 0; do { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); if (localLength == 0) { // Skip empty buffer i++; continue; } int localReadBytes = s.setBytes(index - adjustment, in, position + readBytes, localLength); if (localReadBytes == 0) { break; } if (localReadBytes < 0) { if (readBytes == 0) { return -1; } else { break; } } if (localReadBytes == localLength) { index += localLength; length -= localLength; readBytes += localLength; i ++; } else { index += localReadBytes; length -= localReadBytes; readBytes += localReadBytes; } } while (length > 0); return readBytes; }
setBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的setBytes*方法,将字节数组,字节buf,nio buf,文件通道,输出流等数据写到当前复合buf中。
再来看readBytes方法:
@Override public CompositeByteBuf readBytes(ByteBuf dst) { return (CompositeByteBuf) super.readBytes(dst); } @Override public CompositeByteBuf readBytes(ByteBuf dst, int length) { return (CompositeByteBuf) super.readBytes(dst, length); } @Override public CompositeByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { return (CompositeByteBuf) super.readBytes(dst, dstIndex, length); } @Override public CompositeByteBuf readBytes(byte[] dst) { return (CompositeByteBuf) super.readBytes(dst); } @Override public CompositeByteBuf readBytes(byte[] dst, int dstIndex, int length) { return (CompositeByteBuf) super.readBytes(dst, dstIndex, length); } @Override public CompositeByteBuf readBytes(ByteBuffer dst) { return (CompositeByteBuf) super.readBytes(dst); } @Override public CompositeByteBuf readBytes(OutputStream out, int length) throws IOException { return (CompositeByteBuf) super.readBytes(out, length); }
从上来看readBytes*方法,委托给父类抽象buf的readBytes*方法,实际通过getBytes*方法。
再来看write*方法:
@Override public CompositeByteBuf writeByte(int value) { return (CompositeByteBuf) super.writeByte(value); } @Override public CompositeByteBuf writeInt(int value) { return (CompositeByteBuf) super.writeInt(value); } @Override public CompositeByteBuf writeBytes(ByteBuf src) { return (CompositeByteBuf) super.writeBytes(src); } @Override public CompositeByteBuf writeBytes(ByteBuf src, int length) { return (CompositeByteBuf) super.writeBytes(src, length); } @Override public CompositeByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { return (CompositeByteBuf) super.writeBytes(src, srcIndex, length); } @Override public CompositeByteBuf writeBytes(byte[] src) { return (CompositeByteBuf) super.writeBytes(src); } @Override public CompositeByteBuf writeBytes(byte[] src, int srcIndex, int length) { return (CompositeByteBuf) super.writeBytes(src, srcIndex, length); } @Override public CompositeByteBuf writeBytes(ByteBuffer src) { return (CompositeByteBuf) super.writeBytes(src); }
从上来看write*方法,委托给父类抽象buf的write*方法,实际通过write*方法。
再来看其他方法,看看就行:
@Override public CompositeByteBuf discardSomeReadBytes() { return discardReadComponents(); } /** * Discard all {@link ByteBuf}s which are read. 丢弃所有已经读取的字节buf组件 */ public CompositeByteBuf discardReadComponents() { ensureAccessible(); final int readerIndex = readerIndex(); if (readerIndex == 0) { return this; } // Discard everything if (readerIndex = writerIndex = capacity). int writerIndex = writerIndex(); if (readerIndex == writerIndex && writerIndex == capacity()) { for (Component c: components) { c.freeIfNecessary(); } components.clear(); setIndex(0, 0); adjustMarkers(readerIndex); return this; } // Remove read components. int firstComponentId = toComponentIndex(readerIndex); for (int i = 0; i < firstComponentId; i ++) { components.get(i).freeIfNecessary(); } components.subList(0, firstComponentId).clear(); // Update indexes and markers. Component first = components.get(0); int offset = first.offset; updateComponentOffsets(0); setIndex(readerIndex - offset, writerIndex - offset); adjustMarkers(offset); return this; } //丢弃已读buf组件 @Override public CompositeByteBuf discardReadBytes() { ensureAccessible(); final int readerIndex = readerIndex(); if (readerIndex == 0) { return this; } // Discard everything if (readerIndex = writerIndex = capacity). int writerIndex = writerIndex(); //已读完,则清空组件buf集 if (readerIndex == writerIndex && writerIndex == capacity()) { for (Component c: components) { c.freeIfNecessary(); } components.clear(); setIndex(0, 0); adjustMarkers(readerIndex); return this; } //丢弃已经读取的组件buf // Remove read components. int firstComponentId = toComponentIndex(readerIndex); for (int i = 0; i < firstComponentId; i ++) { components.get(i).freeIfNecessary(); } components.subList(0, firstComponentId).clear(); // Remove or replace the first readable component with a new slice. Component c = components.get(0); int adjustment = readerIndex - c.offset; if (adjustment == c.length) { // new slice would be empty, so remove instead components.remove(0); } else { Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment)); components.set(0, newC); } // Update indexes and markers. updateComponentOffsets(0); setIndex(0, writerIndex - readerIndex); adjustMarkers(readerIndex); return this; } //释放复合buf @Override protected void deallocate() { if (freed) { return; } freed = true; int size = components.size(); // We're not using foreach to avoid creating an iterator. // see https://github.com/netty/netty/issues/2642 for (int i = 0; i < size; i++) { components.get(i).freeIfNecessary(); } } @Override public ByteBuf unwrap() { return null; } @Override public boolean hasMemoryAddress() { switch (components.size()) { case 0: return Unpooled.EMPTY_BUFFER.hasMemoryAddress(); case 1: return components.get(0).buf.hasMemoryAddress(); default: return false; } } @Override public long memoryAddress() { switch (components.size()) { case 0: return Unpooled.EMPTY_BUFFER.memoryAddress(); case 1: return components.get(0).buf.memoryAddress(); default: throw new UnsupportedOperationException(); } } @Override public int capacity() { final int numComponents = components.size(); if (numComponents == 0) { return 0; } return components.get(numComponents - 1).endOffset; } @Override public CompositeByteBuf retain(int increment) { return (CompositeByteBuf) super.retain(increment); } @Override public CompositeByteBuf retain() { return (CompositeByteBuf) super.retain(); } @Override public CompositeByteBuf touch() { return this; } @Override public CompositeByteBuf touch(Object hint) { return this; } @Override public ByteBuf copy(int index, int length) { checkIndex(index, length); ByteBuf dst = Unpooled.buffer(length); if (length != 0) { copyTo(index, length, toComponentIndex(index), dst); } return dst; } private void copyTo(int index, int length, int componentId, ByteBuf dst) { int dstIndex = 0; int i = componentId; while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; int adjustment = c.offset; int localLength = Math.min(length, s.capacity() - (index - adjustment)); s.getBytes(index - adjustment, dst, dstIndex, localLength); index += localLength; dstIndex += localLength; length -= localLength; i ++; } dst.writerIndex(dst.capacity()); }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1313netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2051netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2437netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1311netty 字节buf定义:http:// ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1592netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1842netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1393netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2823netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2174netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2033netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1076netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1876netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1216netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1201netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 955netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1306netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2187netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1071netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1851netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1871netty Inboudn/Outbound通道Invoker ...
相关推荐
该书首先介绍了Netty的基本概念,包括其设计理念、核心组件以及与其他网络编程模型的区别。Netty的核心组件主要包括Bootstrap、ServerBootstrap、Channel、Pipeline、Handler等,它们协同工作以实现高效的网络通信。...
本书首先会介绍Netty的基本概念,包括其非阻塞I/O模型、事件驱动架构以及Channel、EventLoop、Buffer等核心组件。接着,通过一系列逐步进阶的案例,如构建简单的Echo服务器、处理HTTP请求、实现WebSocket通信,读者...
下面将详细阐述Netty的核心概念和关键特性,以及`netty-all-4.1.25.Final.jar`这个包在Netty中的作用。 1. **Netty核心组件**: - **Channel**: Netty中的基础概念,代表一个连接或一个端点,可以用来读写数据。 -...
这本书通过详实的代码案例,帮助读者理解和掌握Netty的核心概念和技术。 在Netty中,最重要的概念之一是“Boss线程”和“Worker线程”的模型。Boss线程负责接收新的连接请求,而Worker线程则处理这些连接后的读写...
在深入探讨Netty与RocketMQ的关联之前,我们先来详细了解一下Netty的基本概念和核心特性。 Netty的核心设计理念是基于Reactor模式,这是一种事件驱动的设计模式,它使得Netty能够高效地处理并发连接。在Netty中,...
Netty是Java的网络编程框架,广泛应用于数据采集服务中,本文将对Netty的基本概念和应用进行详细介绍,并将其与ETL技术结合,讲解如何使用Netty进行数据流转和处理。 1. ETL概述 ETL(Extract、Transform、Load)...
在Netty的学习过程中,首先需要理解的是它的基础概念。Netty基于NIO(非阻塞I/O)模型,提供了高效、易用的API,使得开发者可以方便地构建网络服务。它包含了服务器端和客户端的Bootstrap类,用于启动和配置网络应用...
首先,让我们深入理解Netty的核心概念。Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环...
下面将围绕Netty的核心概念、特性、以及如何通过学习笔记来深入理解Netty进行详细阐述。 1. **Netty核心概念** - **Channel**: Netty中的基本I/O组件,代表一个到另一端点的连接,可以用来读取和写入数据。 - **...
下面我们将详细探讨 Netty 的核心概念以及这些自定义 demo 可能涵盖的知识点。 1. **Netty 的基本概念**: - **ByteBuf**: Netty 提供了自己的缓冲区类 ByteBuf,比 Java NIO 中的 ByteBuffer 更加强大和灵活,...
1. **语言无障碍**: 对于不擅长英语的开发者,中文文档降低了理解门槛,能更快地掌握Netty的核心概念和使用方法。 2. **结构化内容**: CHM格式是一种常见的离线帮助文档格式,内容组织有序,方便查阅和学习。 3. **...
这本书籍《Netty 权威指南》深入浅出地介绍了 Netty 的核心概念、设计模式以及实际应用,是 IT 开发者深入理解和使用 Netty 的宝贵资源。 1. **Netty 概述** Netty 是由 JBoss 提供的一个开源框架,它简化了网络...
在本文中,我们将深入探讨Netty的基本概念,通过“Hello World”范例来理解其工作原理。 首先,让我们理解Netty的核心概念。Netty基于Java NIO(非阻塞I/O)构建,提供了高级API来处理网络通信。它包含以下几个关键...
Netty的核心概念包括: 1. **Channel**:它是网络连接的抽象,可以代表TCP连接、UDP套接字或任何其他类型的I/O连接。 2. **EventLoop**:每个Channel都会关联一个EventLoop,负责执行事件处理器(Handler)的任务,...
这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...
本篇文章将深入探讨Netty的核心概念、关键特性和实际应用。 Netty的核心概念之一是其基于Reactor模式的设计。Reactor模式是一种处理并发I/O事件的模式,通过一个中心调度器(Selector)来管理多个事件处理器...
这本书通过详细阐述Netty的核心概念、设计模式和最佳实践,帮助开发者提升网络编程的能力。 首先,Netty的基础知识包括了NIO(非阻塞I/O)的概念。NIO在Java中提供了一种不同于传统阻塞I/O的I/O模型,它允许多个...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
本篇文章将深入探讨 Netty 的核心概念、特性以及其在实际开发中的应用。 Netty 框架由 Java 编写,它提供了一套丰富的预构建的组件,如协议解码器、编码器、处理程序和管道,这些组件可以帮助开发者快速、高效地...