- 浏览: 984384 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
引言:
上一篇文章我们看来netty的抽象通道初始化,先来回顾一下:
抽象通道AbstractChannel内部关联一个硬件底层操作类Unsafe,个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。
抽象通道构造初始化中,初始化底层操作类Unsafe,实际返回的是AbstractUnsafe,
今天我们看一下抽象Unsafe定义:
AbstractUnsafe为抽象通道的内部类
从上面可以看出,抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),
一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
下面几个方法,很容易,一看就明白,不多说了
//AbstractChannel
//AbstractChannel
来看注册通道到事件循环
//注册通道到事件循环
//AbstractChannel
来看实际注册工作:
在实际注册方法中我们有几点要看,
1.
2.
3.
4.
5.
6.
下面分别来看这几点:
1.
2.
//AbstractChannel
3.
4.
//Channel
//AbstractChannel
5.
//AbstractChannel
6.
从上面可以看出,通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead。这个过程中触发的事件,则传递给通道内部的Channel管道。
再来看绑定
//AbstractChannel
从上面可以看出,地址绑定方法委托给doBind,待子类实现。
再来看如果需要,则关闭通道的方法:
关闭方法我们有几点要关注:
1.
2.
3.
4.
我们分别来看这几点:
1.
2.
//AbstractChannel,待子类实现
3.
//ChannelOutboundBuffer
这个我们单列一篇来讲
4.
//AbstractChannel待子类实现
从上面可以看出,关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,
如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
再来通道反注册;
再来看发送数据:
//AbstractChannel
//ChannelOutboundBuffer
从上面可以看出,写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加写请求消息到OutBound Buf中。
再来看刷新写请求队列
刷新方法有以下几点要看:
1
简单看一下,在下面篇文章我们单讲
//ChannelOutboundBuffer
2.
//AbstractChannel
从上面可以看出,刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
再来看断开连接方法:
//AbstractChannel
再来看其他方法:
//包装异常
//AbstractChannel
我们分别来看上述方法中的几点:
1.
2.
//Throwable
总结:
抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。
地址绑定方法委托给doBind,待子类实现。
关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则
转换消息,估算消息大小,添加消息到OutBound Buf中。
刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
附:
//AbstractChannel
//Throwable
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
引言:
上一篇文章我们看来netty的抽象通道初始化,先来回顾一下:
抽象通道AbstractChannel内部关联一个硬件底层操作类Unsafe,个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。
抽象通道构造初始化中,初始化底层操作类Unsafe,实际返回的是AbstractUnsafe,
今天我们看一下抽象Unsafe定义:
AbstractUnsafe为抽象通道的内部类
/** * {@link Unsafe} implementation which sub-classes must extend and use. */ protected abstract class AbstractUnsafe implements Unsafe { //通道Outbound buf private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private RecvByteBufAllocator.Handle recvHandle;//接受字节数据分配器Hander private boolean inFlush0;//是否刷新写请求队列数据 /** true if the channel has never been registered, false otherwise */ private boolean neverRegistered = true;//通道是否注册到事件循环 //判断通道是否注册到事件循环 private void assertEventLoop() { assert !registered || eventLoop.inEventLoop(); } }
从上面可以看出,抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),
一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
下面几个方法,很容易,一看就明白,不多说了
//返回接收字节buf分配器Hander @Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) { //如果为空,则委托给通道的接收字节buf分配器,创建一个Handle recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
//获取通道Outbound缓冲区 @Override public final ChannelOutboundBuffer outboundBuffer() { return outboundBuffer; }
//获取本地socket地址 @Override public final SocketAddress localAddress() { return localAddress0(); }
//AbstractChannel
/** * Returns the {@link SocketAddress} which is bound locally. 待子类扩展 */ protected abstract SocketAddress localAddress0();
//获取远端socket地址 @Override public final SocketAddress remoteAddress() { return remoteAddress0(); }
//AbstractChannel
/** * Return the {@link SocketAddress} which the {@link Channel} is connected to. 待子类扩展 */ protected abstract SocketAddress remoteAddress0();
来看注册通道到事件循环
//注册通道到事件循环
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; //如果线程在当前事件循环,则委托给register0 if (eventLoop.inEventLoop()) { register0(promise); } else { //否则创建一个任务线程,完成通道注册事件循环实际工作,并将任务线程交由事件循环执行。 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
//AbstractChannel
/** * Return {@code true} if the given {@link EventLoop} is compatible with this instance. 通道是否兼容事件循环,待子类实现 */ protected abstract boolean isCompatible(EventLoop loop);
来看实际注册工作:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop //确保任务没取消,通道打开 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //完成实际注册 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //确保在实际通知注册任务完成前,调用handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded(); //更新注册通道到事件循环成功 safeSetSuccess(promise); //触发通道已注册事件fireChannelRegistered pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { //触发通道已激活事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //如果通道配置为自动读取,则读取数据 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. //异常,则强制关闭通道 closeForcibly(); closeFuture.setClosed();//更新异步关闭任务结果为已关闭 safeSetFailure(promise, t);//设置任务注册失败 } }
在实际注册方法中我们有几点要看,
1.
//确保任务没取消,通道打开 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } @Deprecated protected final boolean ensureOpen(ChannelPromise promise) { if (isOpen()) { return true; } safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION); return false; }
2.
//完成实际注册 doRegister();
3.
//更新注册通道到事件循环成功 safeSetSuccess(promise);
4.
if (isActive()) { if (firstRegistration) { //触发通道已激活事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //如果通道配置为自动读取,则读取数据 beginRead(); } }
5.
// Close the channel directly to avoid FD leak. //异常,则强制关闭通道 closeForcibly(); closeFuture.setClosed();//更新异步关闭任务结果为已关闭
6.
safeSetFailure(promise, t);//设置任务注册失败
下面分别来看这几点:
1.
//确保任务没取消,通道打开 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }
@Deprecated protected final boolean ensureOpen(ChannelPromise promise) { if (isOpen()) { return true; } safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION); return false; }
2.
//完成实际注册 doRegister();
//AbstractChannel
/** * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process. * * Sub-classes may override this method 待子类实现 */ protected void doRegister() throws Exception { // NOOP }
3.
//更新注册通道到事件循环成功 safeSetSuccess(promise);
/** * Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message. */ protected final void safeSetSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } }
4.
if (isActive()) { if (firstRegistration) { //触发通道已激活事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //如果通道配置为自动读取,则读取数据 beginRead(); } }
//Channel
/** * Return {@code true} if the {@link Channel} is active and so connected. */ boolean isActive();
//如果通道配置为自动读取,则读取数据 beginRead();
@Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { //实际读取方法 doBeginRead(); } catch (final Exception e) { //否则延后触发异常事件 invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
//AbstractChannel
/** * Schedule a read operation. */ protected abstract void doBeginRead() throws Exception;
//否则延后触发异常事件 private void invokeLater(Runnable task) { try { // This method is used by outbound operation implementations to trigger an inbound event later. // They do not trigger an inbound event immediately because an outbound operation might have been // triggered by another inbound event handler method. If fired immediately, the call stack // will look like this for example: // // handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection. // -> handlerA.ctx.close() // -> channel.unsafe.close() // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
5.
// Close the channel directly to avoid FD leak. //异常,则强制关闭通道 closeForcibly(); closeFuture.setClosed();//更新异步关闭任务结果为已关闭
//异常,则强制关闭通道 @Override public final void closeForcibly() { assertEventLoop(); try { doClose(); } catch (Exception e) { logger.warn("Failed to close a channel.", e); } }
//AbstractChannel
/** * Close the {@link Channel} */ protected abstract void doClose() throws Exception;
6.
safeSetFailure(promise, t);//设置任务注册失败
/** * Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message. */ protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }
从上面可以看出,通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead。这个过程中触发的事件,则传递给通道内部的Channel管道。
再来看绑定
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); //首先检查绑定任务是否取消,确保通道打开 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. //非root用户,不能接受一个广播消息 logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { //委托给doBind doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { //通道第一次激活,触发ChannelActive事件 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
//AbstractChannel
/** * Bind the {@link Channel} to the {@link SocketAddress},待子类实现 */ protected abstract void doBind(SocketAddress localAddress) throws Exception;
从上面可以看出,地址绑定方法委托给doBind,待子类实现。
再来看如果需要,则关闭通道的方法:
closeIfClosed();
protected final void closeIfClosed() { //通道打开,则直接返回,否则关闭 if (isOpen()) { return; } close(voidPromise()); }
@Override public final void close(final ChannelPromise promise) { assertEventLoop(); close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); } private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { //确保异步任务没有取消 if (!promise.setUncancellable()) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // Only needed if no VoidChannelPromise. //如果Outbound buf为空,则添加异步结果监听器 if (!(promise instanceof VoidChannelPromise)) { // This means close() was called before so we just register a listener and return closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.setSuccess(); } }); } return; } //如果通道关闭任务已完成,则更新异步任务结果 if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); return; } final boolean wasActive = isActive(); //到这里,已经不允许添加消息和刷新Outbound Buf this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. //获取关闭线程执行器 Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { //如果执行器不为空,则委托给关闭器,执行关闭任务线程 closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. //实际关闭工作 doClose0(promise); } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! invokeLater(new Runnable() { @Override public void run() { // Fail all the queued messages //最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); //触发ChannelInactive事件,并反注册 fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { //否则在当前事件循环中执行关闭任务 try { // Close the channel and fail the queued messages in all cases. doClose0(promise);//实际关闭工作 } finally { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } if (inFlush0) { //正在刷新,则延迟触发ChannelInactive事件、反注册 invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive); } }); } else { //否则,直接触发ChannelInactive事件、反注册 fireChannelInactiveAndDeregister(wasActive); } } }
关闭方法我们有几点要关注:
1.
//获取关闭线程执行器 Executor closeExecutor = prepareToClose()
2.
// Close the channel and fail the queued messages in all cases. doClose0(promise);//实际关闭工作
3.
//最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause);
4.
//触发ChannelInactive事件,并反注册 fireChannelInactiveAndDeregister(wasActive);
我们分别来看这几点:
1.
//获取关闭线程执行器 Executor closeExecutor = prepareToClose()
/** * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the * caller must call the {@link Executor#execute(Runnable)} method with a task that calls * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null}, * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop}) */ protected Executor prepareToClose() { return null; }
2.
// Close the channel and fail the queued messages in all cases. doClose0(promise);//实际关闭工作
private void doClose0(ChannelPromise promise) { try { doClose(); closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } }
//AbstractChannel,待子类实现
/** * Close the {@link Channel} */ protected abstract void doClose() throws Exception;
3.
//最后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause);
//ChannelOutboundBuffer
这个我们单列一篇来讲
4.
//触发ChannelInactive事件,并反注册 fireChannelInactiveAndDeregister(wasActive);
private void fireChannelInactiveAndDeregister(final boolean wasActive) { deregister(voidPromise(), wasActive && !isActive()); } private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } // As a user may call deregister() from within any method while doing processing in the ChannelPipeline, // we need to ensure we do the actual deregister operation later. This is needed as for example, // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay, // the deregister operation this could lead to have a handler invoked by different EventLoop and so // threads. // // See: // https://github.com/netty/netty/issues/4435 invokeLater(new Runnable() { @Override public void run() { try { //实际反注册 doDeregister(); } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { //当前通道已失效,则触发ChannelInactive事件 if (fireChannelInactive) { pipeline.fireChannelInactive(); } // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered, so check // if it was registered. if (registered) { registered = false; pipeline.fireChannelUnregistered(); } safeSetSuccess(promise); } } }); }
//AbstractChannel待子类实现
/** * Deregister the {@link Channel} from its {@link EventLoop}. * * Sub-classes may override this method */ protected void doDeregister() throws Exception { // NOOP }
从上面可以看出,关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,
如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
再来通道反注册;
@Override public final void deregister(final ChannelPromise promise) { assertEventLoop(); deregister(promise, false); }
再来看发送数据:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 //首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg);//释放消息 return; } int size; try { //转换消息 msg = filterOutboundMessage(msg); //估算消息大小 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //添加消息到outBound Buf outboundBuffer.addMessage(msg, size, promise); }
//AbstractChannel
/** * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer) 转换消息 */ protected Object filterOutboundMessage(Object msg) throws Exception { return msg; }
//ChannelOutboundBuffer
/** * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once * the message was written. */ public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false); }
从上面可以看出,写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加写请求消息到OutBound Buf中。
再来看刷新写请求队列
@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } //将Outbound buf中写请求,添加到刷新队列中 outboundBuffer.addFlush(); //刷新Outbound buf flush0(); }
刷新方法有以下几点要看:
1
//将Outbound buf中写请求,添加到刷新队列中 outboundBuffer.addFlush();
简单看一下,在下面篇文章我们单讲
//ChannelOutboundBuffer
/** * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed * and so you will be able to handle them. */ public void addFlush() { // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
2.
//刷新Outbound buf flush0();
@SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { //实际刷新Outbound buf doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
//AbstractChannel
/** * Flush the content of the given buffer to the remote peer. */ protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
从上面可以看出,刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
再来看断开连接方法:
@Override public final void disconnect(final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable()) { return; } boolean wasActive = isActive(); try { //完成实际断开连接 doDisconnect(); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelInactive(); } }); } safeSetSuccess(promise); closeIfClosed(); // doDisconnect() might have closed the channel }
//AbstractChannel
/** * Disconnect this {@link Channel} from its remote peer 断开连接,待子类扩展 */ protected abstract void doDisconnect() throws Exception;
再来看其他方法:
@Override public final ChannelPromise voidPromise() { assertEventLoop(); return unsafeVoidPromise; }
//包装异常
/** * Appends the remote address to the message of the exceptions caused by connection attempt failure. */ protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) { if (cause instanceof ConnectException) {//连接异常 return new AnnotatedConnectException((ConnectException) cause, remoteAddress); } if (cause instanceof NoRouteToHostException) {//无路由异常 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress); } if (cause instanceof SocketException) {//socket异常 return new AnnotatedSocketException((SocketException) cause, remoteAddress); } return cause; }
//AbstractChannel
//连接异常 private static final class AnnotatedConnectException extends ConnectException { private static final long serialVersionUID = 3901958112696433556L; AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) { //将地址添加到异常信息中 super(exception.getMessage() + ": " + remoteAddress); initCause(exception);//初始化异常 //设置异常堆栈 setStackTrace(exception.getStackTrace()); } //填充异常堆栈 @Override public Throwable fillInStackTrace() { return this; } }
我们分别来看上述方法中的几点:
1.
initCause(exception);//初始化异常
//Throwable /** * Initializes the <i>cause</i> of this throwable to the specified value. * (The cause is the throwable that caused this throwable to get thrown.) * * <p>This method can be called at most once. It is generally called from * within the constructor, or immediately after creating the * throwable. If this throwable was created * with {@link #Throwable(Throwable)} or * {@link #Throwable(String,Throwable)}, this method cannot be called * even once. * * <p>An example of using this method on a legacy throwable type * without other support for setting the cause is: * * <pre> * try { * lowLevelOp(); * } catch (LowLevelException le) { * throw (HighLevelException) * new HighLevelException().initCause(le); // Legacy constructor * } * </pre> * * @param cause the cause (which is saved for later retrieval by the * {@link #getCause()} method). (A {@code null} value is * permitted, and indicates that the cause is nonexistent or * unknown.) * @return a reference to this {@code Throwable} instance. * @throws IllegalArgumentException if {@code cause} is this * throwable. (A throwable cannot be its own cause.) * @throws IllegalStateException if this throwable was * created with {@link #Throwable(Throwable)} or * {@link #Throwable(String,Throwable)}, or this method has already * been called on this throwable. * @since 1.4 */ public synchronized Throwable initCause(Throwable cause) { if (this.cause != this) throw new IllegalStateException("Can't overwrite cause"); if (cause == this) throw new IllegalArgumentException("Self-causation not permitted"); this.cause = cause; return this; }
2.
//填充异常堆栈 @Override public Throwable fillInStackTrace() { return this; }
//Throwable
/** * Fills in the execution stack trace. This method records within this * {@code Throwable} object information about the current state of * the stack frames for the current thread. * * <p>If the stack trace of this {@code Throwable} {@linkplain * Throwable#Throwable(String, Throwable, boolean, boolean) is not * writable}, calling this method has no effect. * * @return a reference to this {@code Throwable} instance. * @see java.lang.Throwable#printStackTrace() */ public synchronized Throwable fillInStackTrace() { if (stackTrace != null || backtrace != null /* Out of protocol state */ ) { fillInStackTrace(0); stackTrace = UNASSIGNED_STACK; } return this; } private native Throwable fillInStackTrace(int dummy);
总结:
抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。
地址绑定方法委托给doBind,待子类实现。
关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则
转换消息,估算消息大小,添加消息到OutBound Buf中。
刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
附:
//AbstractChannel
private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException { private static final long serialVersionUID = -6801433937592080623L; AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) { super(exception.getMessage() + ": " + remoteAddress); initCause(exception); setStackTrace(exception.getStackTrace()); } @Override public Throwable fillInStackTrace() { return this; } } private static final class AnnotatedSocketException extends SocketException { private static final long serialVersionUID = 3896743275010454039L; AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) { super(exception.getMessage() + ": " + remoteAddress); initCause(exception); setStackTrace(exception.getStackTrace()); } @Override public Throwable fillInStackTrace() { return this; } }
//Throwable
public class Throwable implements Serializable { /** use serialVersionUID from JDK 1.0.2 for interoperability */ private static final long serialVersionUID = -3042686055658047285L; /** * Native code saves some indication of the stack backtrace in this slot. */ private transient Object backtrace; /** * Specific details about the Throwable. For example, for * {@code FileNotFoundException}, this contains the name of * the file that could not be found. 异常消息 * * @serial */ private String detailMessage; /** * Holder class to defer initializing sentinel objects only used * for serialization. */ private static class SentinelHolder { /** * {@linkplain #setStackTrace(StackTraceElement[]) Setting the * stack trace} to a one-element array containing this sentinel * value indicates future attempts to set the stack trace will be * ignored. The sentinal is equal to the result of calling:<br> * {@code new StackTraceElement("", "", null, Integer.MIN_VALUE)} */ public static final StackTraceElement STACK_TRACE_ELEMENT_SENTINEL = new StackTraceElement("", "", null, Integer.MIN_VALUE); /** * Sentinel value used in the serial form to indicate an immutable * stack trace. */ public static final StackTraceElement[] STACK_TRACE_SENTINEL = new StackTraceElement[] {STACK_TRACE_ELEMENT_SENTINEL}; } /** * A shared value for an empty stack. 异常堆栈 */ private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0]; /* * To allow Throwable objects to be made immutable and safely * reused by the JVM, such as OutOfMemoryErrors, fields of * Throwable that are writable in response to user actions, cause, * stackTrace, and suppressedExceptions obey the following * protocol: * * 1) The fields are initialized to a non-null sentinel value * which indicates the value has logically not been set. * * 2) Writing a null to the field indicates further writes * are forbidden * * 3) The sentinel value may be replaced with another non-null * value. * * For example, implementations of the HotSpot JVM have * preallocated OutOfMemoryError objects to provide for better * diagnosability of that situation. These objects are created * without calling the constructor for that class and the fields * in question are initialized to null. To support this * capability, any new fields added to Throwable that require * being initialized to a non-null value require a coordinated JVM * change. */ /** * The throwable that caused this throwable to get thrown, or null if this * throwable was not caused by another throwable, or if the causative * throwable is unknown. If this field is equal to this throwable itself, * it indicates that the cause of this throwable has not yet been * initialized. *异常原因 * @serial * @since 1.4 */ private Throwable cause = this; /** * The stack trace, as returned by {@link #getStackTrace()}. * * The field is initialized to a zero-length array. A {@code * null} value of this field indicates subsequent calls to {@link * #setStackTrace(StackTraceElement[])} and {@link * #fillInStackTrace()} will be be no-ops. *异常堆栈 * @serial * @since 1.4 */ private StackTraceElement[] stackTrace = UNASSIGNED_STACK; // Setting this static field introduces an acceptable // initialization dependency on a few java.util classes. private static final List<Throwable> SUPPRESSED_SENTINEL = Collections.unmodifiableList(new ArrayList<Throwable>(0)); /** * The list of suppressed exceptions, as returned by {@link * #getSuppressed()}. The list is initialized to a zero-element * unmodifiable sentinel list. When a serialized Throwable is * read in, if the {@code suppressedExceptions} field points to a * zero-element list, the field is reset to the sentinel value. * * @serial * @since 1.7 */ private List<Throwable> suppressedExceptions = SUPPRESSED_SENTINEL; /** Message for trying to suppress a null exception. */ private static final String NULL_CAUSE_MESSAGE = "Cannot suppress a null exception."; /** Message for trying to suppress oneself. */ private static final String SELF_SUPPRESSION_MESSAGE = "Self-suppression not permitted"; /** Caption for labeling causative exception stack traces */ private static final String CAUSE_CAPTION = "Caused by: "; /** Caption for labeling suppressed exception stack traces */ private static final String SUPPRESSED_CAPTION = "Suppressed: "; /** * Constructs a new throwable with {@code null} as its detail message. * The cause is not initialized, and may subsequently be initialized by a * call to {@link #initCause}. * * <p>The {@link #fillInStackTrace()} method is called to initialize * the stack trace data in the newly created throwable. */ public Throwable() { fillInStackTrace(); } ... }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2062netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2452netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1318netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1599netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2839netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2185netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2041netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1080netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1878netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1220netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1204netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1858netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1883netty Inboudn/Outbound通道Invoker ...
相关推荐
对Netty中的Unsafe做了简单的总结,构建自己的知识网!!!
Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个连接都有一个自己的Pipeline,其中包含多个处理器(ChannelHandler),这些处理器按顺序处理进出的数据。 ...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)的连接。每个Channel都有自己的生命周期,可以进行读写操作,并且可以注册监听器以响应各种网络事件。 3. **...
在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...
- 使用 Netty 创建 WebSocket 服务器,首先需要定义一个 ChannelInitializer,配置处理 WebSocket 数据的 ChannelHandler。例如,WebSocketFrameDecoder 和 WebSocketFrameEncoder 用于解码和编码 WebSocket 帧。 ...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
- **Channel与Pipeline**:Channel是网络连接的抽象,负责读写数据;Pipeline则是一系列处理I/O事件的处理器链,每个处理器执行特定的处理任务。 - **EventLoop与EventLoopGroup**:EventLoop是执行I/O操作和处理...
《跟闪电侠学Netty:Netty即时聊天实战与底层原理》是一本深入浅出的Netty技术指南,旨在帮助读者掌握Netty框架,并利用它实现即时聊天应用,同时理解其底层工作原理。Netty是Java领域的一款高性能、异步事件驱动的...
《Netty实战》是针对Java开发者的一本技术指南,它深入介绍了如何利用Netty这个高性能、异步事件驱动的网络应用程序框架来构建高效且可扩展的网络应用。Netty不仅简化了网络编程的复杂性,还提供了丰富的特性和组件...
在 Netty 中,我们定义 ChannelHandler 来处理网络事件,如连接建立、数据读取和写入等。ChannelPipeline 是 ChannelHandler 的容器,它负责事件的顺序处理和传播。 "TimeServer" 示例展示了如何创建一个简单的 TCP...
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...
Netty通过定义`DiscardServerHandler`类,继承自`SimpleChannelHandler`,并在`messageReceived`方法中不执行任何操作,从而实现了抛弃协议。此外,`exceptionCaught`方法用于处理异常情况,保证了程序的健壮性。 #...
通道是Netty中的基本I/O抽象,它可以代表任何类型的连接,如TCP、UDP或者本地进程间通信。事件处理器则负责处理各种I/O事件,如连接建立、数据接收和发送等,通过非阻塞I/O(NIO)实现高效率的数据传输。 在"Netty-...
2. **Bean 注册**: 创建一个 Netty 服务器的配置类,定义 ServerBootstrap、ChannelInitializer 和 ChannelHandler 等核心组件,并将它们作为 Spring Bean 注册。这些 Bean 可以通过构造函数或 @Autowired 注解注入...
- **Decoder与Encoder**:Netty提供了一系列预定义的编解码器,如LineBasedFrameDecoder、LengthFieldBasedFrameDecoder等,用于处理不同格式的数据。开发者可以自定义编解码器,将特定的数据格式转换为可处理的...
**Netty 深度解析** Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种领域,如分布式系统、云计算、游戏服务器、大数据传输等。Netty 的设计...
ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。