NioSocketChannel之服务端视角
上篇文章我们对NioServerSocketChannel进行了分析,了解了它是如何接收客户端连接的。本篇我们将详细的了解接收到的连接的生命周期。需要说明的是由于采用的是服务端视角,因此一个连接的生命周期主要包括:
- 创建
- 读取数据
- 写数据
- 关闭
创建
创建过程主要是做一些基础属性的初始化(废话):
public NioSocketChannel(Channel parent, SocketChannel socket) {
// parent自然就是NioServerSocketChannel, socket是accept()到的连接
super(parent, socket);
//创建一份配置,从前一篇NioServerSocketChannel的介绍中我们知道稍后会用
//Bootstrap/ServerBootstrap的childOptions对这个config进行具体的设置
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
//一开始只关注OP_READ
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 配置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 分配一个全局唯一的id,默认为MAC+进程id+自增的序列号+时间戳相关的数值+随机值
id = DefaultChannelId.newInstance();
// 初始化Unsafe, NioSocketChannel对应的是NioSocketChannelUnsafe
unsafe = newUnsafe();
// 初始化pipeline,在后面的阶段里pipeline被用户定义的ChannelInitializer改掉
pipeline = new DefaultChannelPipeline(this);
}
创建过程很简单,这里不再做过多分析,直接进入读取数据的环节。
读取数据
前一篇文章介绍了在接收到连接时会触发fireChannelActive事件,该事件会使channel被注册到EventLoop的Selector中,此EventLoop后面会循环的从Selector中获取准备好的连接,并调用unsafe.read(); NioSocketChannel对应的unsafe为NioSocketChannelUnsafe,我们来看看它的read()方法是如何实现的:
// read()方法在其父类NioByteUnsafe中
public final void read() {
final ChannelConfig config = config();
// 如果autoRead为false且读未被挂起,则移除对OP_READ的关注,即暂时不接收数据
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取到配置的allocator,allocator是用来分配ByteBuf的
final ByteBufAllocator allocator = config.getAllocator();
// 从一个channel中读取消息的最大次数
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
// allocHandle主要用于预估本次ByteBuf的初始大小,避免分配太多导致浪费或者分配过小放不下单次读取的数据而需要多次读取
RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
// 分配一个ByteBuf
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
// 读取数据到ByteBuf中(最大为writable)
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// 未读取到数据则直接释放该ByteBuf,如果返回-1表示读取出错,后面会关闭该连接
byteBuf.release();
byteBuf = null;
close = localReadAmount < 0;
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
// 通过pipleline通知有数据到达,channelRead属于inbound事件,
// 数据会从第一个handler开始处理并往后传递(是否需要传递由handler自已定)
pipeline.fireChannelRead(byteBuf);
// 这里直接赋null,释放逻辑需要handler实现
byteBuf = null;
// 本次read方法已经接收的数据超过Integer.MAX_VALUE,避免溢出,不再读取数据,
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
// 如果autoRead为false则停止读取,默认为true,在流量控制的时候可能会被设置为false
if (!config.isAutoRead()) {
break;
}
// 本次读取的数据长度小于分配的ByteBuf的大小,说明准备好的数据都已经读完了
if (localReadAmount < writable) {
break;
}
} while (++ messages < maxMessagesPerRead);
// 本轮数据读取完毕
pipeline.fireChannelReadComplete();
// 记录本次读取到的数据长度(用于计算下次分配ByteBuf时的初始化大小)
allocHandle.record(totalReadAmount);
// 如果读取的时候发生错误则关闭连接
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
// 读取异常
handleReadException(pipeline, byteBuf, t, close);
} finally {
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
private void handleReadException(ChannelPipeline pipeline,
ByteBuf byteBuf, Throwable cause, boolean close) {
// 如果读取到了数据则继续处理,否则释放ByteBuf
if (byteBuf != null) {
if (byteBuf.isReadable()) {
setReadPending(false);
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
// 触发读取完成事件,触发异常捕获事件。需要的时候关闭连接
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
可以看到,读取的过程貌似不是很复杂,循环读取数据,每读到一次数据就调用pipeline.fireChannelRead来处理,本次循环结束后调用pipeline.fireChannelReadComplete。这中间有一个细节需要理解:ByteBuf的分配。分配代码:
ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
byteBuf = allocHandle.allocate(allocator);
...
allocHandle.record(totalReadAmount);
默认的allocator为PooledByteBufAllocator.DEFAULT,是一个全局的PooledByteBufAllocator实例,它维护了一个内存池,通过它从内存池中分配ByteBuf(可以通过-Dio.netty.allocator.type=unpooled来使用非内存池模式的allocator)。
默认的allocHandle为AdaptiveRecvByteBufAllocator.HandleImpl。该类的作用是通过历史分配的ByteBuf size来计算下次分配的size。allocHandle.allocate方法主要作用就是提供一个nextReceiveBufferSize,其他的就由allocator(连接池管理)处理:
public ByteBuf allocate(ByteBufAllocator alloc) {
// 由于是读取网络io, 此时使用的是ioBuffer, 即directBuffer(如果支持的话)
return alloc.ioBuffer(nextReceiveBufferSize);
}
下面看看nextReceiveBufferSize的计算方式,由于是根据历史size来计算的,所以必须有个记录历史size的入口,就是前面出现过的allocHandle.record(totalReadAmount)。其中totalReadAmout是接收到的一批数据的总大小。 了解record方法需要了解整个AdaptiveRecvByteBufAllocator的运行机制。
// 先了解SIZE_TABLE
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
// sizeTable中记录[16,512)的所有16倍数的值
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
// 增加[512, N), 512及以上则按照前一个值*2进行存储
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
// 最终得到的SIZE_TABLE->{16,32,48,...512,1024,2048,...}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
这个SIZE_TABLE记录了所有可能分配的size,是AdaptiveRecvByteBufAllocator计算size的一个重要依赖。
// index增加的步长
private static final int INDEX_INCREMENT = 4;
// index减小的步长
private static final int INDEX_DECREMENT = 1;
private AdaptiveRecvByteBufAllocator() {
// 三个参数依次为最小分配大小(64),初始化大小(1024),最大大小(65536)
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
...去掉无用的几行代码...
// 以下几段代码为根据size找到其在SIZE_TABLE中的index
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
// 通过size找到size对应在SIZE_TABLE的index,使用二分查找的方式,具体不细说。如果没有找到对应值则会返回小于size的最大值的index,如size=1024,返回32;size=512,返回31;若size=1000,由于SIZE_TABLE上没有该值,此时返回size=512的index,即返回31
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
再来看看AdaptiveRecvByteBufAllocator.HandleImpl
// 构造方法的三个参数与AdaptiveRecvByteBufAllocator的三个字段值一样
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
// 初始index为initial在SIZE_TABLE的位置,1024对应index=32
index = getSizeTableIndex(initial);
// index所在位置的实际值,默认为1024
nextReceiveBufferSize = SIZE_TABLE[index];
}
// actualReadBytes为一次真实的数据读取读到的字节数
public void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
// 读取到的字节数小于index-2上的数值
if (decreaseNow) {
// 如果是减少模式,则继续减小,并改为增加模式
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
// 如果是增加模式,则改为减小模式
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
//如果读取到的字节数大于nextReceiveBufferSize,则index调整为index+4, 此时进入增长模式
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
// 如果小于nextReceiveBufferSize且大于index-2上的值,则下次分配的大小不变
}
这段代码看起来有点乱,总结下就是:
本次读取到的size比之前大,则增大后面分配的size;如果本次读取到的size比之前小但差异不大(两个index之内),则后面分配的size不变;如果连续读到两次很小的值(两个index之外),则减小后面分配的size。
这种动态分配大小的方式,能够减少ByteBuf多次分配的情况,也能减少因分配过大而浪费的情况。
handler支持的协议很多,实现也有易有难,我们这里先不展开。
写数据
写数据是在handler或者用户自定义的逻辑中,调用方式为ctx.writeAndFlush(msg)、ctx.channel().writeAndFlush(msg)或者ctx.pipeline().writeAndFlush(msg)。其中第二、三两种方法等价,与第一种的差异是:第一种从当前context往前查找对应的handler进行处理,而后两种是从TailContext开始处理。
这节比较长,看之前请先准备好零食…
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
// 想吐槽下这段代码,两次next计算是相同的结果,
// 有点像是从write+flush两个方法copy+paste来的,
// 当然这只是alpha版本,netty4中的新版已经改了-,-
AbstractChannelHandlerContext next;
next = findContextOutbound();
// touch的主要作用是记录msg被next处理过,
// 可以通过此记录进行内存泄露的跟踪等
ReferenceCountUtil.touch(msg, next);
// 依次调用invoker的invokeWrite和invokeFlush
next.invoker().invokeWrite(next, msg, promise);
next = findContextOutbound();
next.invoker().invokeFlush(next);
return promise;
}
可以看到writeAndFlush就是write和flush两个步骤的合并,由invoker发起调用,具体的实现类为DefaultChannelHandlerInvoker, 先看看invokeWrite方法:
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(ctx, promise, true)) {
// promise cancelled
ReferenceCountUtil.release(msg);
return;
}
if (executor.inEventLoop()) {
// 如果在io线程中则直接触发
invokeWriteNow(ctx, msg, promise);
} else {
AbstractChannel channel = (AbstractChannel) ctx.channel();
// 评估消息的大小,注意此大小是指占用的堆内存大小,
// msg为FileRegion就返回0,因为它使用的是DirectMemory
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
// 待flush的字节数加size
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
// 将WriteTask添加到任务队列中,WriteTask的作用是将待写出的字节数减size然后调用invokeWriteNow
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
}
}
上面的方法有一个重要的点:buffer.incrementPendingOutboundBytes,在了解这个方法之前需要先介绍两个概念,ChannelConfig中的writeBufferHighWaterMark和writeBufferLowWaterMark。这两个值分别表示write buffer的高水位线及低水位线,当write buffer的size大于高水位线(默认64k,可通过ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK修改)时,channel.isWritable()将返回false,直到write buffer的size小于低水位线(默认32k, 可通过ChannelOption.WRITE_BUFFER_LOW_WATER_MARK修改)时,channel.isWritable()才返回true。每当isWritable值发生变化,都会调用fireChannelWritabilityChanged来通知handler处理。
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 待加入write buffer和已加入write buffer的msg的size之和,
// 如果该值超过高水位线,channel.isWritable()改为false。
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true);
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 如果size和低于低水位线,则channel.isWritable()恢复为true
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
高低水位线存在的意义是什么?isWritable的true/false有什么影响?
我们知道netty写数据分成两个步骤write和flush, write将数据加到buffer中,而flush则是真正的将数据发送到底层的缓冲区。在某些情况下,如网络拥塞或者对端接收数据过慢,或者发送方发送过快,将导致write buffer中积累的数据越来越多,如果继续写数据可能导致OOM。有了高低水位线后,当发现write buffer中的数据过多时,将isWritable设为false,并且通过fireChannelWritabilityChanged通知handler处理。handler可以针对这种情况自行决定如何处理,如发现isWritable为false时,丢弃信息或者存入其他存储等为true时继续发送,这样就可以保证应用不会发生OOM。需要说明的是netty只是提供了一个接口让应用有机会处理这种状况,如果应用不去处理,且发送一直很慢,最终还是会OOM。
invokeWriteNow在经过一些列handler处理后,最终调用unsafe.write(msg, promise)。
// AbstractUnsafe
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// 如果outboundBuffer为null说明channel被关闭了,直接失败
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
// 释放message防止内存泄露
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 对消息进行转换,如果不是direct buffer则转换为direct的buffer(FileRegion除外)
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 将消息加入到outboundBuffer中,这里面也会有高低水位线的处理
outboundBuffer.addMessage(msg, size, promise);
}
可以看到write方法的调用并不会真正的写出数据,而是将msg写到outbound buffer中。ChannelOutboundBuffer主要包含两个队列,一个是unflushed的缓冲队列,一个是flushed队列(马上要flush的队列),write时如果之前的flushed队列还未写完,新消息会加到flushed队列中,否则将消息加入到unflushed队列。
再来看看invokeFlush,最终调用的是unsafe.flush():
// AbstractUnsafe
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// unflushed队列转为为flushed,供后面flush
outboundBuffer.addFlush();
// 实际flush
flush0();
}
protected void flush0() {
if (inFlush0) {
// 已经在flush则不再调用后面的逻辑
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// 如果连接无效则所有flush请求都标记失败,flushed队列将被移除
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
} else {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
}
} finally {
inFlush0 = false;
}
return;
}
// 实际往外写数据,如果失败所有flush请求都标记失败,flushed队列将被移除
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
outboundBuffer.failFlushed(t);
} finally {
inFlush0 = false;
}
}
//outboundBuffer.addFlush();
public void addFlush() {
// unflushedEntry为空说明没有(新)消息添加过,则不用处理
Entry entry = unflushedEntry;
if (entry != null) {
// 如果flushedEntry为空则将unflushedEntry转为flushedEntry
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
// 统计需要flush的总数
flushed ++;
if (!entry.promise.setUncancellable()) {
// 处理被取消的消息
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false);
}
entry = entry.next;
} while (entry != null);
// unflushedEntry置为空
unflushedEntry = null;
}
}
完成unflushedEntry到flushedEntry的转换后,开始正式的写:
// 终于又回到NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
// size返回的是前面统计的flushed,为0表示没有需要写的数据
int size = in.size();
if (size == 0) {
// 无数据则清除对OP_WRITE的关注
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// 把flushed队列转换为ByteBuffer数组,注意flushed队列中可能存在非ByteBuffer的数据结构,因此nioBufferCnt=0不代表没有数据需要写
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
switch (nioBufferCnt) {
case 0:
// 为0时也可能存在其他需要写的数据,此时会调用非ByteBuffer的写.
super.doWrite(in);
return;
case 1:
// 只有一个ByteBuffer时使用非聚集的写
ByteBuffer nioBuffer = nioBuffers[0];
// 默认尝试16次写(不能保证一次写完,所以需要循环的尝试)
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
// 没有write任何数据,可能是网络拥塞之类的。
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
// 说明所有数据都写出了
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default://与1相似,只是此处写的是数组
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
// 释放已经flush完的ByteBuf, 并更新未flush完的ByteBuf的索引,
// (上面的write操作的是ByteBuffer,无法更新ByteBuf索引,需要netty自己再更新一次);
// 更新totalPendingSize的值,减去flush成功的字节数(该操作可能使一个channel的数据从高水位线恢复到低水位线,从而使isWritable()从false变为true;
// 注意该释放是从flushed队列的头开始根据writtenBytes挨个释放,
// 因此一个队列里不能有不同类型的数据(如部分ByteBuffer,部分FileRegion),
// 当然我相信正常的人类应该不会这样去做.
in.removeBytes(writtenBytes);
if (!done) {
// 没写完时调用此方法.
incompleteWrite(setOpWrite);
break;
}
}
}
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
// 之前因为拥塞未flush成功,则添加对OP_WRITE的关注,
// 这样当socket可写的时候,EventLoop中会调用ch.unsafe().forceFlush()再次flush
setOpWrite();
} else {
// 如果只是未flush完则添加一个继续flush的任务
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
上面可以看到,NioSocketChannel本身只处理ByteBuffer的写,其他类型的数据则交给父类来完成,所以我们需要继续看父类doWrite方法:
//AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
for (;;) {
Object msg = in.current();
if (msg == null) {
// 没有数据需要写则清除OP_WRITE并返回
clearOpWrite();
break;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transfered() >= region.count();
boolean setOpWrite = false;
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
}
上面的flush与ByteBuffer的flush流程差不多,区别是上面的代码flush完直接remove msg,不用再更新ByteBuf的index, 实际的flush代码:
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transfered();
return region.transferTo(javaChannel(), position);
}
到这里写数据的实现基本上分析完,相比读来说,写会复杂很多。我们来总结下整个写的过程:
写被分成两部分:
1、数据写入到缓冲队列
1.1 将msg转换为direct;
1.2 根据当前flush状态,msg可能被加入unflushed队列(之前未flush或已经flush完)或者flushed队列(之前flush且未完成);
1.3 根据msg的size更新totalPendingSize, 并判断高低水位线,根据状态的变更修改isWritable的值,并将变更事件交给handler处理; handler根据isWritable的值可采取忽略、继续write、丢弃数据或者其他措施。
2、数据flush到底层
2.1 将unflushed队列切换为flushed队列;
2.2 由NioSocketChannel的doWrite(ChannelOutboundBuffer in)处理in中的ByteBuffer数据;由AbstractNioByteChannel的doWrite(ChannelOutboundBuffer in)处理FileRegion和普通的ByteBuf;
2.2.1 可能会遇到网络拥塞导致数据无法flush,此时需要注册OP_WRITE事件;这样如果数据可写时,eventloop中会调用ch.unsafe().forceFlush()再次执行写操作;
2.2.2 可能数据在本次操作中没有flush完,此时清理掉已flush完的msg,并根据情况更新未flush完的msg的索引(目前只有ByteBuffer需要更新);同时添加一个任务,该任务完成剩余数据的flush;
2.2.3 所有数据写完,则此时flushed队列为null,unflushed队列也为null
看了这一段,希望你不要再犯以下两个错误:1、write了数据未flush; 2、write了一个非ByteBuf非FileRegion的msg(如果要write这样的数据,记得在加入write buffer之前用handler将其转换为ByteBuf或者FileRegion)。
同时你可以:1、明白网络拥塞可能导致的问题,并根据情况实现handler的fireChannelWritabilityChanged方法,使你的应用更健壮;2、将以上分析中不足的地方发给我。
这一段的最后,我想再补充下,water mark是针对单个连接的,因此如果是短连接且写的数据很小,虽然发生了拥塞,也不一定会触发相应方法。如果想对全局流量进行控制,请看io.netty.handler.traffic(后面会分析)。
关闭
最后讲下关闭这块,关闭包括应用主动关闭、检测到连接断开后关闭(其实调用的方法都一样),对应的关闭方法:
public ChannelFuture close() {
return pipeline.close();
}
close属于outbound事件,从TailContext往前调用,最终调用HeadContext中的close(ChannelHandlerContext ctx, ChannelPromise promise)方法,该方法调用:unsafe.close(promise);
// AbstractUnsafe
public final void close(final ChannelPromise promise) {
if (!promise.setUncancellable()) {
return;
}
// 如果正在写数据则等写完再调用
if (inFlush0) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
close(promise);
}
});
return;
}
if (outboundBuffer == null) {
// 为空说明close()已经调用过了,只需要添加listener来改变promise状态即可
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
return;
}
if (closeFuture.isDone()) {
// 已经关闭则直接返回
safeSetSuccess(promise);
return;
}
// 判断是否active,ch.isOpen() && ch.isConnected(),注意这两个状态的区别
final boolean wasActive = isActive();
final ChannelOutboundBuffer buffer = outboundBuffer;
outboundBuffer = null; // 设为null则无法再添加或刷消息
Executor closeExecutor = closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
Throwable cause = null;
try {
doClose();
} catch (Throwable t) {
cause = t;
}
final Throwable error = cause;
// invokeLater保证closeAndDeregister在EventLoop中执行
invokeLater(new OneTimeTask() {
@Override
public void run() {
closeAndDeregister(buffer, wasActive, promise, error);
}
});
}
});
} else {
Throwable error = null;
try {
doClose();
} catch (Throwable t) {
error = t;
}
closeAndDeregister(buffer, wasActive, promise, error);
}
}
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive,
ChannelPromise promise, Throwable error) {
try {
//移除flushed队列中的msg,并将对应promise设置异常
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
//移除unflushed队列中的msg,并将对应promise设置异常
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
// 从active变为非active,需要触发channelInactive方法,然后取消注册
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
// 调用doDeregister, 然后触发fireChannelUnregistered
deregister(voidPromise());
}
});
} else {
invokeLater(new OneTimeTask() {
@Override
public void run() {
deregister(voidPromise());
}
});
}
// closeFuture设置, promise设置.
closeFuture.setClosed();
if (error != null) {
safeSetFailure(promise, error);
} else {
safeSetSuccess(promise);
}
}
}
// NioSocketChannel
protected void doDeregister() throws Exception {
// 最终调用key.cancel(),取消后连接不再在EventLoop中
((NioEventLoop) eventLoop().unwrap()).cancel(selectionKey());
}
protected void doClose() throws Exception {
// doClose关闭物理连接
javaChannel().close();
}
close的过程也比较简单,总结下就是:1、如果正在flush则等待当前的flush完成; 2、关闭连接;3、触发channelInactive; 4、从EventLoop中取消key; 5、触发fireChannelUnregistered。
到这里服务端这边的流程除了handler处理外都讲完了, 后面我们将切到客户端的角度继续分析。
<script type="text/javascript"> $(function () { $('pre.prettyprint code').each(function () { var lines = $(this).text().split('\n').length; var $numbering = $('<ul/>').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('<li/>').text(i)); }; $numbering.fadeIn(1700); }); }); </script>
相关推荐
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-http-4.1.27.Final.jar; 赠送原API文档:netty-codec-http-4.1.27.Final-javadoc.jar; 赠送源代码:netty-codec-http-4.1.27.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-codec-http2-4.1.74.Final.jar; 赠送原API文档:netty-codec-http2-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-transport-rxtx-4.1.74.Final.jar; 赠送原API文档:netty-transport-rxtx-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-rxtx-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-codec-haproxy-4.1.73.Final.jar; 赠送原API文档:netty-codec-haproxy-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-haproxy-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-http2-4.1.73.Final.jar; 赠送原API文档:netty-codec-http2-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-codec-redis-4.1.73.Final.jar; 赠送原API文档:netty-codec-redis-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-redis-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-resolver-dns-4.1.73.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.73.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-mqtt-4.1.74.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-tcnative-classes-2.0.46.Final.jar; 赠送原API文档:netty-tcnative-classes-2.0.46.Final-javadoc.jar; 赠送源代码:netty-tcnative-classes-2.0.46.Final-sources.jar; 赠送Maven依赖信息...
赠送jar包:netty-codec-http-4.1.68.Final.jar; 赠送原API文档:netty-codec-http-4.1.68.Final-javadoc.jar; 赠送源代码:netty-codec-http-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...