`
Donald_Draper
  • 浏览: 980947 次
社区版块
存档分类
最新评论

netty 抽象Unsafe定义

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
引言:
上一篇文章我们看来netty的抽象通道初始化,先来回顾一下:
    抽象通道AbstractChannel内部关联一个硬件底层操作类Unsafe,个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。

抽象通道构造初始化中,初始化底层操作类Unsafe,实际返回的是AbstractUnsafe,
今天我们看一下抽象Unsafe定义:
AbstractUnsafe为抽象通道的内部类
/**
 * {@link Unsafe} implementation which sub-classes must extend and use.
 */
protected abstract class AbstractUnsafe implements Unsafe {
    //通道Outbound buf
    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
    private RecvByteBufAllocator.Handle recvHandle;//接受字节数据分配器Hander
    private boolean inFlush0;//是否刷新写请求队列数据
    /** true if the channel has never been registered, false otherwise */
    private boolean neverRegistered = true;//通道是否注册到事件循环
    //判断通道是否注册到事件循环
    private void assertEventLoop() {
        assert !registered || eventLoop.inEventLoop();
    }
}

从上面可以看出,抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),
一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
下面几个方法,很容易,一看就明白,不多说了
//返回接收字节buf分配器Hander
 @Override
 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
     if (recvHandle == null) {
         //如果为空,则委托给通道的接收字节buf分配器,创建一个Handle
         recvHandle = config().getRecvByteBufAllocator().newHandle();
     }
     return recvHandle;
 }


//获取通道Outbound缓冲区
@Override
public final ChannelOutboundBuffer outboundBuffer() {
    return outboundBuffer;
}

//获取本地socket地址
@Override
public final SocketAddress localAddress() {
    return localAddress0();
}

//AbstractChannel
/**
 * Returns the {@link SocketAddress} which is bound locally.
 待子类扩展
 */
protected abstract SocketAddress localAddress0();


//获取远端socket地址
@Override
public final SocketAddress remoteAddress() {
    return remoteAddress0();
}

//AbstractChannel
/**
 * Return the {@link SocketAddress} which the {@link Channel} is connected to.
  待子类扩展
 */
protected abstract SocketAddress remoteAddress0();

来看注册通道到事件循环
//注册通道到事件循环
 @Override
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     //首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环
     if (eventLoop == null) {
         throw new NullPointerException("eventLoop");
     }
     if (isRegistered()) {
         promise.setFailure(new IllegalStateException("registered to an event loop already"));
         return;
     }
     if (!isCompatible(eventLoop)) {
         promise.setFailure(
                 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
         return;
     }

     AbstractChannel.this.eventLoop = eventLoop;
     //如果线程在当前事件循环,则委托给register0
     if (eventLoop.inEventLoop()) {
         register0(promise);
     } else {
         //否则创建一个任务线程,完成通道注册事件循环实际工作,并将任务线程交由事件循环执行。
         try {
             eventLoop.execute(new Runnable() {
                 @Override
                 public void run() {
                     register0(promise);
                 }
             });
         } catch (Throwable t) {
             logger.warn(
                     "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                     AbstractChannel.this, t);
             closeForcibly();
             closeFuture.setClosed();
             safeSetFailure(promise, t);
         }
     }
 }

//AbstractChannel
/**
 * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
 通道是否兼容事件循环,待子类实现
 */
protected abstract boolean isCompatible(EventLoop loop);


来看实际注册工作:

 private void register0(ChannelPromise promise) {
     try {
         // check if the channel is still open as it could be closed in the mean time when the register
         // call was outside of the eventLoop
	 //确保任务没取消,通道打开
         if (!promise.setUncancellable() || !ensureOpen(promise)) {
             return;
         }
         boolean firstRegistration = neverRegistered;
	 //完成实际注册
         doRegister();
         neverRegistered = false;
         registered = true;

         // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
         // user may already fire events through the pipeline in the ChannelFutureListener.
	 //确保在实际通知注册任务完成前,调用handlerAdded事件
         pipeline.invokeHandlerAddedIfNeeded();
         //更新注册通道到事件循环成功
         safeSetSuccess(promise);
	 //触发通道已注册事件fireChannelRegistered
         pipeline.fireChannelRegistered();
         // Only fire a channelActive if the channel has never been registered. This prevents firing
         // multiple channel actives if the channel is deregistered and re-registered.
         if (isActive()) {
             if (firstRegistration) {
	         //触发通道已激活事件
                 pipeline.fireChannelActive();
             } else if (config().isAutoRead()) {
                 // This channel was registered before and autoRead() is set. This means we need to begin read
                 // again so that we process inbound data.
                 //
                 // See https://github.com/netty/netty/issues/4805
		 //如果通道配置为自动读取,则读取数据
                 beginRead();
             }
         }
     } catch (Throwable t) {
         // Close the channel directly to avoid FD leak.
	 //异常,则强制关闭通道
         closeForcibly();
         closeFuture.setClosed();//更新异步关闭任务结果为已关闭
         safeSetFailure(promise, t);//设置任务注册失败
     }
}

在实际注册方法中我们有几点要看,
1.
//确保任务没取消,通道打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
}

 @Deprecated
 protected final boolean ensureOpen(ChannelPromise promise) {
     if (isOpen()) {
         return true;
     }

     safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
     return false;
 }

2.
//完成实际注册
doRegister();


3.
//更新注册通道到事件循环成功
safeSetSuccess(promise);


4.
if (isActive()) {
     if (firstRegistration) {
        //触发通道已激活事件
         pipeline.fireChannelActive();
     } else if (config().isAutoRead()) {
         // This channel was registered before and autoRead() is set. This means we need to begin read
         // again so that we process inbound data.
         //
         // See https://github.com/netty/netty/issues/4805
	 //如果通道配置为自动读取,则读取数据
         beginRead();
     }
 }

5.
 // Close the channel directly to avoid FD leak.
//异常,则强制关闭通道
closeForcibly();
closeFuture.setClosed();//更新异步关闭任务结果为已关闭


6.
safeSetFailure(promise, t);//设置任务注册失败




下面分别来看这几点:

1.
//确保任务没取消,通道打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
}


@Deprecated
protected final boolean ensureOpen(ChannelPromise promise) {
    if (isOpen()) {
        return true;
    }

    safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
    return false;
}


2.
//完成实际注册
doRegister();


//AbstractChannel
/**
 * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
 *
 * Sub-classes may override this method
 待子类实现
 */
protected void doRegister() throws Exception {
    // NOOP
}


3.
//更新注册通道到事件循环成功
safeSetSuccess(promise);



/**
 * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
 */
protected final void safeSetSuccess(ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
        logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
    }
}



4.
if (isActive()) {
     if (firstRegistration) {
        //触发通道已激活事件
         pipeline.fireChannelActive();
     } else if (config().isAutoRead()) {
         // This channel was registered before and autoRead() is set. This means we need to begin read
         // again so that we process inbound data.
         //
         // See https://github.com/netty/netty/issues/4805
	 //如果通道配置为自动读取,则读取数据
         beginRead();
     }
}


//Channel
/**
 * Return {@code true} if the {@link Channel} is active and so connected.
 */
boolean isActive();



//如果通道配置为自动读取,则读取数据
beginRead();


@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        //实际读取方法
        doBeginRead();
    } catch (final Exception e) {
        //否则延后触发异常事件
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

//AbstractChannel
/**
 * Schedule a read operation.
 */
protected abstract void doBeginRead() throws Exception;


//否则延后触发异常事件
private void invokeLater(Runnable task) {
    try {
        // This method is used by outbound operation implementations to trigger an inbound event later.
        // They do not trigger an inbound event immediately because an outbound operation might have been
        // triggered by another inbound event handler method.  If fired immediately, the call stack
        // will look like this for example:
        //
        //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
        //   -> handlerA.ctx.close()
        //      -> channel.unsafe.close()
        //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
        //
        // which means the execution of two inbound handler methods of the same handler overlap undesirably.
        eventLoop().execute(task);
    } catch (RejectedExecutionException e) {
        logger.warn("Can't invoke task later as EventLoop rejected it", e);
    }
}


5.
 // Close the channel directly to avoid FD leak.
//异常,则强制关闭通道
closeForcibly();
closeFuture.setClosed();//更新异步关闭任务结果为已关闭



//异常,则强制关闭通道
@Override
public final void closeForcibly() {
    assertEventLoop();

    try {
        doClose();
    } catch (Exception e) {
        logger.warn("Failed to close a channel.", e);
    }
}


//AbstractChannel
 /**
 * Close the {@link Channel}
 */
protected abstract void doClose() throws Exception;


6.
safeSetFailure(promise, t);//设置任务注册失败


/**
 * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
 */
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
    if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
        logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
    }
}


从上面可以看出,通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead。这个过程中触发的事件,则传递给通道内部的Channel管道。

再来看绑定


@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
     //首先检查绑定任务是否取消,确保通道打开
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
	//非root用户,不能接受一个广播消息
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        //委托给doBind
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        //通道第一次激活,触发ChannelActive事件
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}


//AbstractChannel
/**
  * Bind the {@link Channel} to the {@link SocketAddress},待子类实现
  */
 protected abstract void doBind(SocketAddress localAddress) throws Exception;


从上面可以看出,地址绑定方法委托给doBind,待子类实现。

再来看如果需要,则关闭通道的方法:
closeIfClosed();



protected final void closeIfClosed() {
    //通道打开,则直接返回,否则关闭
    if (isOpen()) {
        return;
    }
    close(voidPromise());
}

@Override
public final void close(final ChannelPromise promise) {
    assertEventLoop();
    close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}

private void close(final ChannelPromise promise, final Throwable cause,
                   final ClosedChannelException closeCause, final boolean notify) {
   //确保异步任务没有取消
    if (!promise.setUncancellable()) {
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // Only needed if no VoidChannelPromise.
	//如果Outbound buf为空,则添加异步结果监听器
        if (!(promise instanceof VoidChannelPromise)) {
            // This means close() was called before so we just register a listener and return
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    promise.setSuccess();
                }
            });
        }
        return;
    }
    //如果通道关闭任务已完成,则更新异步任务结果
    if (closeFuture.isDone()) {
        // Closed already.
        safeSetSuccess(promise);
        return;
    }

    final boolean wasActive = isActive();
    //到这里,已经不允许添加消息和刷新Outbound Buf
    this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
    //获取关闭线程执行器
    Executor closeExecutor = prepareToClose();
    if (closeExecutor != null) {
        //如果执行器不为空,则委托给关闭器,执行关闭任务线程
        closeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // Execute the close.
		    //实际关闭工作
                    doClose0(promise);
                } finally {
                    // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            // Fail all the queued messages
			    //最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf
                            outboundBuffer.failFlushed(cause, notify);
                            outboundBuffer.close(closeCause);
			    //触发ChannelInactive事件,并反注册
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                }
            }
        });
    } else {
       //否则在当前事件循环中执行关闭任务
        try {
            // Close the channel and fail the queued messages in all cases.
            doClose0(promise);//实际关闭工作
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify);
            outboundBuffer.close(closeCause);
        }
        if (inFlush0) {
	    //正在刷新,则延迟触发ChannelInactive事件、反注册
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            });
        } else {
	    //否则,直接触发ChannelInactive事件、反注册
            fireChannelInactiveAndDeregister(wasActive);
        }
    }
}

关闭方法我们有几点要关注:
1.
 //获取关闭线程执行器
 Executor closeExecutor = prepareToClose()



2.
// Close the channel and fail the queued messages in all cases.
doClose0(promise);//实际关闭工作


3.
 //最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);


4.
//触发ChannelInactive事件,并反注册
fireChannelInactiveAndDeregister(wasActive);


我们分别来看这几点:

1.
 //获取关闭线程执行器
 Executor closeExecutor = prepareToClose()


/**
  * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
  * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
  * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
  * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
  */
 protected Executor prepareToClose() {
     return null;
 }

2.
// Close the channel and fail the queued messages in all cases.
doClose0(promise);//实际关闭工作


 private void doClose0(ChannelPromise promise) {
    try {
        doClose();
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}


//AbstractChannel,待子类实现
/**
 * Close the {@link Channel}
 */
protected abstract void doClose() throws Exception;



3.
 //最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);



//ChannelOutboundBuffer

这个我们单列一篇来讲


4.
//触发ChannelInactive事件,并反注册
fireChannelInactiveAndDeregister(wasActive);


private void fireChannelInactiveAndDeregister(final boolean wasActive) {
    deregister(voidPromise(), wasActive && !isActive());
}

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    if (!promise.setUncancellable()) {
        return;
    }

    if (!registered) {
        safeSetSuccess(promise);
        return;
    }

    // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
    // we need to ensure we do the actual deregister operation later. This is needed as for example,
    // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
    // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
    // the deregister operation this could lead to have a handler invoked by different EventLoop and so
    // threads.
    //
    // See:
    // https://github.com/netty/netty/issues/4435
    invokeLater(new Runnable() {
        @Override
        public void run() {
            try {
	        //实际反注册
                doDeregister();
            } catch (Throwable t) {
                logger.warn("Unexpected exception occurred while deregistering a channel.", t);
            } finally {
	        //当前通道已失效,则触发ChannelInactive事件
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive();
                }
                // Some transports like local and AIO does not allow the deregistration of
                // an open channel.  Their doDeregister() calls close(). Consequently,
                // close() calls deregister() again - no need to fire channelUnregistered, so check
                // if it was registered.
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered();
                }
                safeSetSuccess(promise);
            }
        }
    });
}


//AbstractChannel待子类实现
/**
 * Deregister the {@link Channel} from its {@link EventLoop}.
 *
 * Sub-classes may override this method
 */
protected void doDeregister() throws Exception {
    // NOOP
}


从上面可以看出,关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,
如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。

再来通道反注册;

@Override
 public final void deregister(final ChannelPromise promise) {
     assertEventLoop();

     deregister(promise, false);
 }


再来看发送数据:


@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
	//首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);//释放消息
        return;
    }
    int size;
    try {
       //转换消息
        msg = filterOutboundMessage(msg);
	//估算消息大小
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //添加消息到outBound Buf
    outboundBuffer.addMessage(msg, size, promise);
}

//AbstractChannel
/**
 * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
 * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
 转换消息
 */
protected Object filterOutboundMessage(Object msg) throws Exception {
    return msg;
}


//ChannelOutboundBuffer
/**
 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
 */
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

从上面可以看出,写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加写请求消息到OutBound Buf中。

再来看刷新写请求队列
@Override
public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    //将Outbound buf中写请求,添加到刷新队列中
    outboundBuffer.addFlush();
    //刷新Outbound buf
    flush0();
}

刷新方法有以下几点要看:
1
//将Outbound buf中写请求,添加到刷新队列中
outboundBuffer.addFlush();


简单看一下,在下面篇文章我们单讲
//ChannelOutboundBuffer
/**
 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
 * and so you will be able to handle them.
 */
public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

2.
//刷新Outbound buf
flush0();



@SuppressWarnings("deprecation")
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        //实际刷新Outbound buf
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}

//AbstractChannel
/**
 * Flush the content of the given buffer to the remote peer.
 */
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;


从上面可以看出,刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。

再来看断开连接方法:

@Override
public final void disconnect(final ChannelPromise promise) {
    assertEventLoop();
    if (!promise.setUncancellable()) {
        return;
    }
    boolean wasActive = isActive();
    try {
        //完成实际断开连接
        doDisconnect();
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (wasActive && !isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelInactive();
            }
        });
    }
    safeSetSuccess(promise);
    closeIfClosed(); // doDisconnect() might have closed the channel
}

//AbstractChannel
/**
 * Disconnect this {@link Channel} from its remote peer
 断开连接,待子类扩展
 */
protected abstract void doDisconnect() throws Exception;



再来看其他方法:

 @Override
 public final ChannelPromise voidPromise() {
     assertEventLoop();
     return unsafeVoidPromise;
 }


//包装异常
/**
  * Appends the remote address to the message of the exceptions caused by connection attempt failure.
  */
 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
     if (cause instanceof ConnectException) {//连接异常
         return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
     }
     if (cause instanceof NoRouteToHostException) {//无路由异常
         return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
     }
     if (cause instanceof SocketException) {//socket异常
         return new AnnotatedSocketException((SocketException) cause, remoteAddress);
     }

     return cause;
 }

//AbstractChannel
//连接异常
private static final class AnnotatedConnectException extends ConnectException {

    private static final long serialVersionUID = 3901958112696433556L;

    AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
        //将地址添加到异常信息中
        super(exception.getMessage() + ": " + remoteAddress);
        initCause(exception);//初始化异常
	//设置异常堆栈
        setStackTrace(exception.getStackTrace());
    }
    //填充异常堆栈
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}



我们分别来看上述方法中的几点:
1.
initCause(exception);//初始化异常



//Throwable
/**
 * Initializes the <i>cause</i> of this throwable to the specified value.
 * (The cause is the throwable that caused this throwable to get thrown.)
 *
 * <p>This method can be called at most once.  It is generally called from
 * within the constructor, or immediately after creating the
 * throwable.  If this throwable was created
 * with {@link #Throwable(Throwable)} or
 * {@link #Throwable(String,Throwable)}, this method cannot be called
 * even once.
 *
 * <p>An example of using this method on a legacy throwable type
 * without other support for setting the cause is:
 *
 * <pre>
 * try {
 *     lowLevelOp();
 * } catch (LowLevelException le) {
 *     throw (HighLevelException)
 *           new HighLevelException().initCause(le); // Legacy constructor
 * }
 * </pre>
 *
 * @param  cause the cause (which is saved for later retrieval by the
 *         {@link #getCause()} method).  (A {@code null} value is
 *         permitted, and indicates that the cause is nonexistent or
 *         unknown.)
 * @return  a reference to this {@code Throwable} instance.
 * @throws IllegalArgumentException if {@code cause} is this
 *         throwable.  (A throwable cannot be its own cause.)
 * @throws IllegalStateException if this throwable was
 *         created with {@link #Throwable(Throwable)} or
 *         {@link #Throwable(String,Throwable)}, or this method has already
 *         been called on this throwable.
 * @since  1.4
 */
public synchronized Throwable initCause(Throwable cause) {
    if (this.cause != this)
        throw new IllegalStateException("Can't overwrite cause");
    if (cause == this)
        throw new IllegalArgumentException("Self-causation not permitted");
    this.cause = cause;
    return this;
}


2.
//填充异常堆栈
@Override
public Throwable fillInStackTrace() {
    return this;
}


//Throwable
/**
 * Fills in the execution stack trace. This method records within this
 * {@code Throwable} object information about the current state of
 * the stack frames for the current thread.
 *
 * <p>If the stack trace of this {@code Throwable} {@linkplain
 * Throwable#Throwable(String, Throwable, boolean, boolean) is not
 * writable}, calling this method has no effect.
 *
 * @return  a reference to this {@code Throwable} instance.
 * @see     java.lang.Throwable#printStackTrace()
 */
public synchronized Throwable fillInStackTrace() {
    if (stackTrace != null ||
        backtrace != null /* Out of protocol state */ ) {
        fillInStackTrace(0);
        stackTrace = UNASSIGNED_STACK;
    }
    return this;
}
private native Throwable fillInStackTrace(int dummy);




总结:

      抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
      通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。
地址绑定方法委托给doBind,待子类实现。
      关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
     写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则
转换消息,估算消息大小,添加消息到OutBound Buf中。
     刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。


附:

//AbstractChannel
private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {

    private static final long serialVersionUID = -6801433937592080623L;

    AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
        super(exception.getMessage() + ": " + remoteAddress);
        initCause(exception);
        setStackTrace(exception.getStackTrace());
    }

    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}

private static final class AnnotatedSocketException extends SocketException {

    private static final long serialVersionUID = 3896743275010454039L;

    AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
        super(exception.getMessage() + ": " + remoteAddress);
        initCause(exception);
        setStackTrace(exception.getStackTrace());
    }

    @Override
    public Throwable fillInStackTrace() {
        return this;
    }
}


//Throwable
public class Throwable implements Serializable {
    /** use serialVersionUID from JDK 1.0.2 for interoperability */
    private static final long serialVersionUID = -3042686055658047285L;

    /**
     * Native code saves some indication of the stack backtrace in this slot.
     */
    private transient Object backtrace;

    /**
     * Specific details about the Throwable.  For example, for
     * {@code FileNotFoundException}, this contains the name of
     * the file that could not be found. 异常消息
     *
     * @serial
     */
    private String detailMessage;


    /**
     * Holder class to defer initializing sentinel objects only used
     * for serialization.
     */
    private static class SentinelHolder {
        /**
         * {@linkplain #setStackTrace(StackTraceElement[]) Setting the
         * stack trace} to a one-element array containing this sentinel
         * value indicates future attempts to set the stack trace will be
         * ignored.  The sentinal is equal to the result of calling:<br>
         * {@code new StackTraceElement("", "", null, Integer.MIN_VALUE)}
         */
        public static final StackTraceElement STACK_TRACE_ELEMENT_SENTINEL =
            new StackTraceElement("", "", null, Integer.MIN_VALUE);

        /**
         * Sentinel value used in the serial form to indicate an immutable
         * stack trace.
         */
        public static final StackTraceElement[] STACK_TRACE_SENTINEL =
            new StackTraceElement[] {STACK_TRACE_ELEMENT_SENTINEL};
    }

    /**
     * A shared value for an empty stack. 异常堆栈
     */
    private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0];

    /*
     * To allow Throwable objects to be made immutable and safely
     * reused by the JVM, such as OutOfMemoryErrors, fields of
     * Throwable that are writable in response to user actions, cause,
     * stackTrace, and suppressedExceptions obey the following
     * protocol:
     *
     * 1) The fields are initialized to a non-null sentinel value
     * which indicates the value has logically not been set.
     *
     * 2) Writing a null to the field indicates further writes
     * are forbidden
     *
     * 3) The sentinel value may be replaced with another non-null
     * value.
     *
     * For example, implementations of the HotSpot JVM have
     * preallocated OutOfMemoryError objects to provide for better
     * diagnosability of that situation.  These objects are created
     * without calling the constructor for that class and the fields
     * in question are initialized to null.  To support this
     * capability, any new fields added to Throwable that require
     * being initialized to a non-null value require a coordinated JVM
     * change.
     */

    /**
     * The throwable that caused this throwable to get thrown, or null if this
     * throwable was not caused by another throwable, or if the causative
     * throwable is unknown.  If this field is equal to this throwable itself,
     * it indicates that the cause of this throwable has not yet been
     * initialized.
     *异常原因
     * @serial
     * @since 1.4
     */
    private Throwable cause = this;

    /**
     * The stack trace, as returned by {@link #getStackTrace()}.
     *
     * The field is initialized to a zero-length array.  A {@code
     * null} value of this field indicates subsequent calls to {@link
     * #setStackTrace(StackTraceElement[])} and {@link
     * #fillInStackTrace()} will be be no-ops.
     *异常堆栈
     * @serial
     * @since 1.4
     */
    private StackTraceElement[] stackTrace = UNASSIGNED_STACK;

    // Setting this static field introduces an acceptable
    // initialization dependency on a few java.util classes.
    private static final List<Throwable> SUPPRESSED_SENTINEL =
        Collections.unmodifiableList(new ArrayList<Throwable>(0));

    /**
     * The list of suppressed exceptions, as returned by {@link
     * #getSuppressed()}.  The list is initialized to a zero-element
     * unmodifiable sentinel list.  When a serialized Throwable is
     * read in, if the {@code suppressedExceptions} field points to a
     * zero-element list, the field is reset to the sentinel value.
     *
     * @serial
     * @since 1.7
     */
    private List<Throwable> suppressedExceptions = SUPPRESSED_SENTINEL;

    /** Message for trying to suppress a null exception. */
    private static final String NULL_CAUSE_MESSAGE = "Cannot suppress a null exception.";

    /** Message for trying to suppress oneself. */
    private static final String SELF_SUPPRESSION_MESSAGE = "Self-suppression not permitted";

    /** Caption  for labeling causative exception stack traces */
    private static final String CAUSE_CAPTION = "Caused by: ";

    /** Caption for labeling suppressed exception stack traces */
    private static final String SUPPRESSED_CAPTION = "Suppressed: ";

    /**
     * Constructs a new throwable with {@code null} as its detail message.
     * The cause is not initialized, and may subsequently be initialized by a
     * call to {@link #initCause}.
     *
     * <p>The {@link #fillInStackTrace()} method is called to initialize
     * the stack trace data in the newly created throwable.
     */
    public Throwable() {
        fillInStackTrace();
    }
    ...
}

0
0
分享到:
评论

相关推荐

    netty-unsafe总结

    对Netty中的Unsafe做了简单的总结,构建自己的知识网!!!

    深入浅出Netty_netty_

    Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个连接都有一个自己的Pipeline,其中包含多个处理器(ChannelHandler),这些处理器按顺序处理进出的数据。 ...

    Netty实战.epub_netty实战epub_netty实战epub_netty_

    《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...

    netty4.0源码,netty例子,netty api文档

    2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)的连接。每个Channel都有自己的生命周期,可以进行读写操作,并且可以注册监听器以响应各种网络事件。 3. **...

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    整合netty实时通讯

    - 使用 Netty 创建 WebSocket 服务器,首先需要定义一个 ChannelInitializer,配置处理 WebSocket 数据的 ChannelHandler。例如,WebSocketFrameDecoder 和 WebSocketFrameEncoder 用于解码和编码 WebSocket 帧。 ...

    netty官网学习手册中文版

    - **Channel与Pipeline**:Channel是网络连接的抽象,负责读写数据;Pipeline则是一系列处理I/O事件的处理器链,每个处理器执行特定的处理任务。 - **EventLoop与EventLoopGroup**:EventLoop是执行I/O操作和处理...

    Netty实战 电子版.pdf_java_netty_服务器_

    《Netty实战》是针对Java开发者的一本技术指南,它深入介绍了如何利用Netty这个高性能、异步事件驱动的网络应用程序框架来构建高效且可扩展的网络应用。Netty不仅简化了网络编程的复杂性,还提供了丰富的特性和组件...

    跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty.zip

    《跟闪电侠学Netty:Netty即时聊天实战与底层原理》是一本深入浅出的Netty技术指南,旨在帮助读者掌握Netty框架,并利用它实现即时聊天应用,同时理解其底层工作原理。Netty是Java领域的一款高性能、异步事件驱动的...

    Netty-入门Netty编码

    在 Netty 中,我们定义 ChannelHandler 来处理网络事件,如连接建立、数据读取和写入等。ChannelPipeline 是 ChannelHandler 的容器,它负责事件的顺序处理和传播。 "TimeServer" 示例展示了如何创建一个简单的 TCP...

    netty框架 jar包

    在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...

    Netty进阶之路-跟着案例学Netty

    《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...

    netty-4.1.17.Final.jar

    通道是Netty中的基本I/O抽象,它可以代表任何类型的连接,如TCP、UDP或者本地进程间通信。事件处理器则负责处理各种I/O事件,如连接建立、数据接收和发送等,通过非阻塞I/O(NIO)实现高效率的数据传输。 在"Netty-...

    netty-netty-4.1.96.Final.tar.gz 官网最新版Netty Project

    它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...

    netty4与spring集成

    2. **Bean 注册**: 创建一个 Netty 服务器的配置类,定义 ServerBootstrap、ChannelInitializer 和 ChannelHandler 等核心组件,并将它们作为 Spring Bean 注册。这些 Bean 可以通过构造函数或 @Autowired 注解注入...

    netty5.0官方自带的demo

    - **Decoder与Encoder**:Netty提供了一系列预定义的编解码器,如LineBasedFrameDecoder、LengthFieldBasedFrameDecoder等,用于处理不同格式的数据。开发者可以自定义编解码器,将特定的数据格式转换为可处理的...

    Netty 教程 Netty权威指南

    **Netty 深度解析** Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种领域,如分布式系统、云计算、游戏服务器、大数据传输等。Netty 的设计...

    netty4-netty5.rar

    ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。

Global site tag (gtag.js) - Google Analytics