本文为原创,转载请注明出处
netty 4源码分析-write
Netty的写操作由两个步骤组成:
- Write:将msg存储到ChannelOutboundBuffer中
- Flush:将msg从ChannelOutboundBuffer中flush到套接字的发送缓冲区中。
本文介绍第一个步骤write
//DefaultChannelHandlerContext public ChannelFuture write(Object msg) { return write(msg, newPromise()); } public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } validatePromise(promise, true); write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } else { int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { buffer.incrementPendingOutboundBytes(size, false); } } executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); } }
Write是一个Outbound事件,所以会调用outbound处理器的write方法。下面分析headHandler的write方法。
//HeadHandler public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
里面会调用AbstractUnsafe的write方法
// AbstractUnsafe public void write(Object msg, ChannelPromise promise) { if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); } else { outboundBuffer.addMessage(msg, promise); } }
outboundBuffer是AbstractUnsafe使用的一种数据结构ChannelOutboundBuffer,用来存储待发送的消息。该数据结构在实例化AbstractUnsafe的同时被初始化:
// ChannelOutboundBuffer private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() { @Override protected ChannelOutboundBuffer newObject(Handle handle) { return new ChannelOutboundBuffer(handle); } }; static ChannelOutboundBuffer newInstance(AbstractChannel channel) { ChannelOutboundBuffer buffer = RECYCLER.get(); buffer.channel = channel; buffer.totalPendingSize = 0; buffer.writable = 1; return buffer; }
ChannelOutboundBuffer的结构如下:
Buffer是用来存储msg的Entry结构数组,entry的结构如下:
ChannelOutboundBuffer实例化时,buffer数组的大小为32,nioBuffers数组的大小也为32.由于ChannelOutboundBuffer的实例化的代价实际上是很高的,看以下构造方法:
private ChannelOutboundBuffer(Handle handle) { this.handle = handle; buffer = new Entry[INITIAL_CAPACITY]; for (int i = 0; i < buffer.length; i++) { buffer[i] = new Entry(); } nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; }
所以netty使用基于thread-local的轻量级对象池Recycler对ChannelOutboundBuffer进行回收。当ChannelOutboundBuffer第一次被实例化且使用完毕后,会回收到Recycler中(见下面的recyle方法),下次需要用时,直接从Recycler中取(见下面的get方法),避免了再次实例化和垃圾回收的开销。
public abstract class Recycler<T> { private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread()); } }; public final T get() { Stack<T> stack = threadLocal.get(); T o = stack.pop(); if (o == null) { o = newObject(stack); } return o; } public final boolean recycle(T o, Handle handle) { @SuppressWarnings("unchecked") Stack<T> stack = (Stack<T>) handle; if (stack.parent != this) { return false; } if (Thread.currentThread() != stack.thread) { return false; } stack.push(o); return true; } protected abstract T newObject(Handle handle); public interface Handle { } static final class Stack<T> implements Handle { private static final int INITIAL_CAPACITY = 256; final Recycler<T> parent; final Thread thread; private T[] elements; private int size; private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY); @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" }) Stack(Recycler<T> parent, Thread thread) { this.parent = parent; this.thread = thread; elements = newArray(INITIAL_CAPACITY); } T pop() { int size = this.size; if (size == 0) { return null; } size --; T ret = elements[size]; elements[size] = null; map.remove(ret); this.size = size; return ret; } void push(T o) { if (map.put(o, Boolean.TRUE) != null) { throw new IllegalStateException("recycled already"); } int size = this.size; if (size == elements.length) { T[] newElements = newArray(size << 1); System.arraycopy(elements, 0, newElements, 0, size); elements = newElements; } elements[size] = o; this.size = size + 1; } @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" }) private static <T> T[] newArray(int length) { return (T[]) new Object[length]; } }
下面接着分析ChannelOutboundBuffer的addMessage方法。
// ChannelOutboundBuffer void addMessage(Object msg, ChannelPromise promise) { int size = channel.estimatorHandle().size(msg); if (size < 0) { size = 0; } Entry e = buffer[tail++]; e.msg = msg; e.pendingSize = size; e.promise = promise; e.total = total(msg); tail &= buffer.length - 1; if (tail == flushed) { addCapacity(); } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(size, true); }
每次都会将msg作为一个Entry存储到buffer数组的tail位置,然后将tail自增1,自增后执行这行代码tail &= buffer.length – 1(譬如假设length为4,当已存储3个msg后,tail累加到4,和3执行与的结果得到0,因此下次的消息又重新存储到buffer的0位置)使得buffer数组可以循环存储。如果出现tail=flushed,说明空间不够,需要将数组扩容到原来大小的两倍.
incrementPendingOutboundBytes则会更新totalPendingSize,将其累加本次msg的大小。如果新的totalPendingSize超过了channel的高水位线writeBufferHighWaterMark(默认值为64 * 1024),则触发ChannelWritabilityChanged事件。(注意:如果网络很繁忙,套接字的发送缓冲区空间 不够,导致Msg不能及时从buffer中flush出去,那么不断的对channel执行write操作,会使得对数组不断地进行两倍扩容,最终导致OOM。所以最好在自己的Inbound处理器里捕获ChannelWritabilityChanged事件,然后调用channel的isWritable方法,根据结果来决定是否继续执行write操作)。
// ChannelOutboundBuffer private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); void incrementPendingOutboundBytes(int size, boolean fireEvent) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; if (size == 0 || channel == null) { return; } long oldValue = totalPendingSize; long newWriteBufferSize = oldValue + size; while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) { oldValue = totalPendingSize; newWriteBufferSize = oldValue + size; } int highWaterMark = channel.config().getWriteBufferHighWaterMark(); if (newWriteBufferSize > highWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) { if (fireEvent) { channel.pipeline().fireChannelWritabilityChanged(); } } } }
需要注意的是,该方法是线程安全的,采用了一个技巧,使用AtomicLongFieldUpdater来对totalPendingSize进行更新,实现CAS的效果,达到并发安全读写。相对于synchronized同步,AtomicLongFieldUpdater的开销是比较小的。
总结可以借鉴的几个点:
1、轻量级对象池的使用
2、buffer数组的循环存储
3、ChannelWritabilityChanged事件的触发
4、AtomicLongFieldUpdater的使用
相关推荐
然后,通过分析RocketMQ的源码,观察它是如何利用Netty来实现网络通信的。可以尝试自己编写简单的Netty服务器和客户端,模拟RocketMQ的消息发布和消费过程,加深理解。 总的来说,Netty作为强大的网络通信框架,为...
Netty是一个高性能、异步事件驱动的网络应用框架,它为Java开发人员提供了构建服务器和客户端应用程序的强大...通过阅读和分析源码,我们可以深入理解Netty的事件驱动模型、NIO机制以及如何在实际项目中应用这些概念。
Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...
12. Netty源码分析 - 深入理解Netty的内部实现,如事件处理机制、线程模型等,有助于优化和定制化开发。 总之,Netty通过高效的NIO模型、丰富的编解码器、灵活的线程管理和强大的异常处理能力,为企业级应用提供了...
NIOEventLoopGroup源码分析 NIOEventLoopGroup是Netty中用于处理基于NIO的事件循环的类。它继承自`MultithreadEventLoopGroup`,该类负责管理一组`EventLoop`实例,每个`EventLoop`实例都包含了对特定`Channel`的I...
本文将深入探讨Netty的工作流程,通过分析其核心组件和机制,帮助理解如何利用Netty构建高效的网络应用。 Netty 的工作流程主要基于Reactor模式,它包含以下几个关键组件: 1. **BossGroup**:BossGroup 负责接收...
RocketMQ是中国阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于处理高并发、低延迟的消息传递。...此外,源码分析还有助于提升分布式系统设计和高并发处理的能力。
Apache Spark是一个快速的大数据处理框架,它提供了高性能、易用性和复杂分析的能力。Spark核心功能剖析涉及对Spark内部工作机制的深入理解,这通常包括对Spark运行时架构、内存管理、任务调度、数据处理以及与外部...
- **Netty write buffer is full**:调整Netty相关参数配置,优化缓冲区大小和溢出处理机制。 - **连接失败**:检查服务端地址、端口等配置信息是否正确,以及防火墙设置是否阻止了连接。 - **类型转换异常**:...
**四、源码分析** `mpps-master`这个文件名可能指的是项目的源码主分支或者主目录。在实际分析中,我们可以查看项目源码,了解其设计架构、类库选择、线程模型以及各种策略的具体实现,这对于学习和理解MPPS的工作...
- **Netty框架**:Netty是一个高性能、异步事件驱动的网络应用框架,常用于创建高效的服务器和客户端。 - **WebSockets**:OneLive可能使用WebSocket协议来实现双向通信,提供实时数据传输。 - **RMI(Remote Method...