- 浏览: 10297 次
- 性别:
- 来自: 成都
最新评论
netty中的管道--ChannelPipeline的事件传输机制
在AbstractChannelHandlerContext对象中有inbound和outbound两个boolean变量,用于标识 Context所对应的handler的类型。
inbound=ture,表示对应的ChannelHandler实现了 ChannelInboundHandler方法.
outbound=ture,表示对应的ChannelHandler实现了 ChannelOutboundHandler方法.
在netty中支持两种类型的事件: Inbound和Outbound事件。
看看以下的图:
inbound事件的流向是从下至上的,outbound事件的流向是从上至下的,inbound事件的传递方式是通过调用相应的ChannelHandlerContext.fire_EVT()方法。outbound事件的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext.fireChannelRegistered()的调用会发送一个ChannelRegistered的 inbound事件给下一个ChannelHandlerContext。ChannelHandlerContext.bind调用会发送一个 bind的outbound的事件给下一个ChannelHandlerContext。
Inbound事件传播方法有:
Oubound事件传输方法有:
这些方法都在ChannelHandlerContext接口中定义。
Oubound:
Outbound事件都是请求事件(request event), 即请求某件事情的发生,然后通过Outbound事件进行通知。Outbound事件的传播方向是tail-->customContext-->head
接下来我们以connect事件,分析一下Outbound事件的传播机制。
在EchoClient实例中调用connect方法:
调用链是:Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect
在AbstractChannel.connect中其实就是调用DefaultChannelPipeline.connect的方法:
看看DefaultChannelPipeline的connect方法:
当connect事件(outbound事件)到达Pipeline后,就是以tail为起点开始传播的。tail.connect调用到的是:AbstractChannelHandlerContext.connect方法:
findContextOutbound这段代码:
它的作用是以当前Context为起点(tail),向Pipeline中的双向链表的前端寻找第一个outbound 属性为true的Context。当我们找到了一个outbound的Context时,就调用它的invokeConnect方法。
方法里面会调用与之关联的handler的connect方法,就是handler()获取的handler,如果我们自己实现的handle没有重写ChannelHandler的connect方法,那么会调用 ChannelOutboundHandlerAdapter所实现的connect方法:
这个方法又回到了AbstractChannelHandlerContext的connect方法了。又会找到从此context开始的第一个outbound为true的context。
循环链为:
Context.connect-->Connect.findContextOutbound-->next.invokeConnect--> handler.connect-->Context.connect。
这样的循环会把事件传递到pipeline双向链表中的头节点(head),head实现了ChannelOutboundHandler接口,outbound的属性为true。head本身是一个ChannelHandlerContext也是一个ChannelOutboundHandler,因此在connect事件到达head后,因为HeadContext中覆盖了handler()方法,因此在invokeConnect方法中的handler()返回的就是HeadContext,这样就会调用的HeadContext.connect的方法:
这样的一个outbound事件就完成了。其他的outbound事件和connect事件都是遵循一个的传播规则。
Inbound:
Inbound事件是一个通知事件,即某件事情已经发生了,然后通过Inbound事件进行通知。Inbound事件通常发生在Channel的状态改变或IO事件就绪。
Inbound事件的的传播方向是:head-->customContext-->tail。
既然我们分析了Connect这个Outbound事件,那么接着分析Connect事件后会发生什么Inbound 事件,并最终找到Outbound和Inbound事件之间的联系。
上面的unsafe.connect调用的是AbstractNioUnsafe.connect:
doConnect就是进行实际的Socket连接。在NioSocketChannel中
在此方法后,调用了
这个方法是在connect方法后调用的,它会调用pipeline的fireChannelActive方法,这个方法实际是把socket连接成功的事件发送出去,这个方法就是Inbound事件的起点。调用fireChannelActive方法后就产生了一个ChannelActive Inbound 事件。
看看fireChannelActive方法:
注意看看在这个方法中传入的对象是head对象,这就展示了Inbound事件在Pipeline中传输的起点是从head开始的。
AbstractChannelHandlerContext的invokeChannelActive:
调用head的invokeChannelActive方法:
因为HeadContext即使Context又是Handler,所以在HeadContext重写了handler()方法,因为这个的handler()返回的就是HeadContext。
看看HeadContext的channelActive
ctx.fireChannelActive()这个方法很明显了,就是把ChannelActive的Inbound事件在Pipeline中传下去。这样会调用到AbstractChannelHandlerContext里面
看到findContextInbound方法没!这里就是寻找第一个属性inbound为true的Context,
然后调用invokeChannelActive方法,这样又会调到
在((ChannelInboundHandler) handler()).channelActive(this);中,如果用户没有重写 channelActive方法, 那么会调用ChannelInboundHandlerAdapter的channelActive方法。
这里面也是仅仅调用了ctx.fireChannelActive方法。这样就是一个循环了。
Context.fireChannelActive-->Connect.findContextInbound--> nextContext.invokeChannelActive-->nextHandler.channelActive--> nextContext.fireChannelActive
在这里循环的最后到达了tail,它实现了ChannelInboundHandler和ChannelHandlerContext 口,当channelActive消息传递到tail后,handler()返回的就是tail本身,来看看tail中的channelActive方法:
它什么都没有做。
如果是Inbound事件当用户没有实现自定义的处理器时,那么最后默认是不处理的。
总结:
Outbound:
Outbound事件是请求事件。
Outbound事件的发起者是Channel。
Outbound事件的处理者是unsafe。
Outbound事件在Pipeline中的传输方向是tail-->head。
Inbound:
Inbound事件是通知事件。
Inbound事件发起者是unsafe。
Inbound事件的处理者是Channel。
Inbound事件在Pipeline中传输方向是head-->tail。
在AbstractChannelHandlerContext对象中有inbound和outbound两个boolean变量,用于标识 Context所对应的handler的类型。
inbound=ture,表示对应的ChannelHandler实现了 ChannelInboundHandler方法.
outbound=ture,表示对应的ChannelHandler实现了 ChannelOutboundHandler方法.
在netty中支持两种类型的事件: Inbound和Outbound事件。
看看以下的图:
inbound事件的流向是从下至上的,outbound事件的流向是从上至下的,inbound事件的传递方式是通过调用相应的ChannelHandlerContext.fire_EVT()方法。outbound事件的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext.fireChannelRegistered()的调用会发送一个ChannelRegistered的 inbound事件给下一个ChannelHandlerContext。ChannelHandlerContext.bind调用会发送一个 bind的outbound的事件给下一个ChannelHandlerContext。
Inbound事件传播方法有:
ChannelHandlerContext.fireChannelRegistered() ChannelHandlerContext.fireChannelActive() ChannelHandlerContext.fireChannelRead(Object) ChannelHandlerContext.fireChannelReadComplete() ChannelHandlerContext.fireExceptionCaught(Throwable) ChannelHandlerContext.fireUserEventTriggered(Object) ChannelHandlerContext.fireChannelWritabilityChanged() ChannelHandlerContext.fireChannelInactive() ChannelHandlerContext.fireChannelUnregistered()
Oubound事件传输方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise) ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext.write(Object, ChannelPromise) ChannelHandlerContext.flush() ChannelHandlerContext.read() ChannelHandlerContext.disconnect(ChannelPromise) ChannelHandlerContext.close(ChannelPromise)
这些方法都在ChannelHandlerContext接口中定义。
Oubound:
Outbound事件都是请求事件(request event), 即请求某件事情的发生,然后通过Outbound事件进行通知。Outbound事件的传播方向是tail-->customContext-->head
接下来我们以connect事件,分析一下Outbound事件的传播机制。
在EchoClient实例中调用connect方法:
ChannelFuture f = b.connect(HOST, PORT).sync();
调用链是:Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect
在AbstractChannel.connect中其实就是调用DefaultChannelPipeline.connect的方法:
@Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
看看DefaultChannelPipeline的connect方法:
@Override public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
当connect事件(outbound事件)到达Pipeline后,就是以tail为起点开始传播的。tail.connect调用到的是:AbstractChannelHandlerContext.connect方法:
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
findContextOutbound这段代码:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
它的作用是以当前Context为起点(tail),向Pipeline中的双向链表的前端寻找第一个outbound 属性为true的Context。当我们找到了一个outbound的Context时,就调用它的invokeConnect方法。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }
方法里面会调用与之关联的handler的connect方法,就是handler()获取的handler,如果我们自己实现的handle没有重写ChannelHandler的connect方法,那么会调用 ChannelOutboundHandlerAdapter所实现的connect方法:
@Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); }
这个方法又回到了AbstractChannelHandlerContext的connect方法了。又会找到从此context开始的第一个outbound为true的context。
循环链为:
Context.connect-->Connect.findContextOutbound-->next.invokeConnect--> handler.connect-->Context.connect。
这样的循环会把事件传递到pipeline双向链表中的头节点(head),head实现了ChannelOutboundHandler接口,outbound的属性为true。head本身是一个ChannelHandlerContext也是一个ChannelOutboundHandler,因此在connect事件到达head后,因为HeadContext中覆盖了handler()方法,因此在invokeConnect方法中的handler()返回的就是HeadContext,这样就会调用的HeadContext.connect的方法:
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
这样的一个outbound事件就完成了。其他的outbound事件和connect事件都是遵循一个的传播规则。
Inbound:
Inbound事件是一个通知事件,即某件事情已经发生了,然后通过Inbound事件进行通知。Inbound事件通常发生在Channel的状态改变或IO事件就绪。
Inbound事件的的传播方向是:head-->customContext-->tail。
既然我们分析了Connect这个Outbound事件,那么接着分析Connect事件后会发生什么Inbound 事件,并最终找到Outbound和Inbound事件之间的联系。
上面的unsafe.connect调用的是AbstractNioUnsafe.connect:
............... if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } ..............
doConnect就是进行实际的Socket连接。在NioSocketChannel中
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
在此方法后,调用了
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { return; } boolean active = isActive(); boolean promiseSet = promise.trySuccess(); if (!wasActive && active) { pipeline().fireChannelActive(); } if (!promiseSet) { close(voidPromise()); } }
这个方法是在connect方法后调用的,它会调用pipeline的fireChannelActive方法,这个方法实际是把socket连接成功的事件发送出去,这个方法就是Inbound事件的起点。调用fireChannelActive方法后就产生了一个ChannelActive Inbound 事件。
看看fireChannelActive方法:
@Override public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
注意看看在这个方法中传入的对象是head对象,这就展示了Inbound事件在Pipeline中传输的起点是从head开始的。
AbstractChannelHandlerContext的invokeChannelActive:
static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } }
调用head的invokeChannelActive方法:
private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); }
因为HeadContext即使Context又是Handler,所以在HeadContext重写了handler()方法,因为这个的handler()返回的就是HeadContext。
看看HeadContext的channelActive
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }
ctx.fireChannelActive()这个方法很明显了,就是把ChannelActive的Inbound事件在Pipeline中传下去。这样会调用到AbstractChannelHandlerContext里面
public ChannelHandlerContext fireChannelActive() { invokeChannelActive(findContextInbound()); return this; }
看到findContextInbound方法没!这里就是寻找第一个属性inbound为true的Context,
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
然后调用invokeChannelActive方法,这样又会调到
static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } }
在((ChannelInboundHandler) handler()).channelActive(this);中,如果用户没有重写 channelActive方法, 那么会调用ChannelInboundHandlerAdapter的channelActive方法。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
这里面也是仅仅调用了ctx.fireChannelActive方法。这样就是一个循环了。
Context.fireChannelActive-->Connect.findContextInbound--> nextContext.invokeChannelActive-->nextHandler.channelActive--> nextContext.fireChannelActive
在这里循环的最后到达了tail,它实现了ChannelInboundHandler和ChannelHandlerContext 口,当channelActive消息传递到tail后,handler()返回的就是tail本身,来看看tail中的channelActive方法:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { }
它什么都没有做。
如果是Inbound事件当用户没有实现自定义的处理器时,那么最后默认是不处理的。
总结:
Outbound:
Outbound事件是请求事件。
Outbound事件的发起者是Channel。
Outbound事件的处理者是unsafe。
Outbound事件在Pipeline中的传输方向是tail-->head。
Inbound:
Inbound事件是通知事件。
Inbound事件发起者是unsafe。
Inbound事件的处理者是Channel。
Inbound事件在Pipeline中传输方向是head-->tail。
发表评论
-
netty探索之旅八
2017-03-09 13:27 359我们继续EventLoop。走起! 在前一节我们谈到了一个e ... -
netty探索之旅七
2017-03-06 16:28 361前面我们分析了Pipeline,还有一个东西值得我们去研究研究 ... -
netty探索之旅五
2017-03-02 11:09 391netty中的管道--ChannelPipeline机制 前 ... -
netty探索之旅三
2017-02-17 16:00 496下面就开始我们的探索 ... -
netty探索之旅二
2017-02-13 13:30 315上一篇只是简单的介绍了一下NIO中的Selector。 这里我 ... -
netty探索之旅四
2017-02-27 16:19 356上一篇我们研究了netty的客户端代码,这一篇我们研究一下服务 ... -
netty探索之旅一
2017-02-12 16:38 335其实一直都在关注NETTY,前面也花了点时间去看过,但是 ...
相关推荐
Java进阶技术-netty进阶之路
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty
《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...
精选自1000多个一线业务实际案例,从原理到实践全景式讲解Netty项目实践,快速领悟Netty专家花大量时间积累的经验,提高编程水平及分析解决问题的能力,《Netty木又威指南》作者力作,众专家力荐 Netty将Java NIO...
Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...
读书笔记:Netty权威指南学习之旅
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty的基本概念,通过“Hello World”范例来理解其工作原理。 首先,让我们理解...
在《Netty进阶之路:跟着案例学Netty》中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
6. **性能优化**:Netty的性能优化策略,如内存管理、线程模型调整、缓冲区复用等,以提高系统的吞吐量和响应速度。 7. **Netty底层原理**:探讨Netty如何利用Java NIO实现异步非阻塞I/O,以及零拷贝技术的工作机制...
6. **线程模型**:Netty提供了BossGroup和WorkerGroup的线程模型,BossGroup负责接收新的连接,WorkerGroup负责处理I/O事件和业务逻辑。 7. **心跳机制**:Netty提供了心跳包处理,用于检测连接的活跃状态,防止...
6. **安全支持**:Netty 内置了 SSL/TLS 支持,可以轻松实现安全的网络通信。 **三、Netty 在实际项目中的应用** 1. **分布式系统**:Netty 的高性能和低延迟特性使其成为分布式系统间的通信首选。 2. **游戏...
ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。
在Netty中,最重要的概念之一是“Boss线程”和“Worker线程”的模型。Boss线程负责接收新的连接请求,而Worker线程则处理这些连接后的读写操作,这种模型能够有效提高系统的并发能力。Netty的NIO(非阻塞I/O)模型...
Netty是Java领域的一款高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能协议服务器和客户端。在本精讲中,我们将深入探讨Netty的核心概念、设计模式以及实际应用场景,帮助你全面理解并掌握...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 实时通讯的原理与应用,以及如何利用它构建 WebSocket 服务。 WebSocket 是...
6. **易用性与灵活性**:Netty提供了许多便捷的API和工具,使得开发者可以快速构建网络应用,同时它的设计允许灵活地调整和扩展。 接下来,我们关注下压缩包中的具体文件: - **javadoc** 文件夹包含了Netty 4.0.0...
springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...