- 浏览: 17417 次
- 性别:
- 来自: 杭州
最新评论
本文的主要内容是对netty框架中的Channel、ChannelEvent、ChannelFuture、ChannelHandler、ChannelPipeline、ChannelSink、SelectionKey、Selector对象进行一个概念的理解,信息都来自源码中的英文解释,算一个翻译文档。
先来说说channel
channel是一个网络套接字的纽带或者说是一个组件,它可以有这样的i/o操作:读写、连接以及绑定。以下是它的一些特点:
1、netty中全部的i/o操作都是异步的,这就意味着任何i/o操作将会立即返回而不保证操作是否完成,返回的是ChannelFuture实例,在i/o操作成功、失败或者被删除后会获得一个通知。
2、Channels是有层次的,比如SocketChannel是被ServerSocketChannel接收后产生的,就可以说SocketChannel的父节点是ServerSocketChannel,可以通过getParent()方法获得。但是这种分层结构的语义依赖传输方式的具体实现,比如在udp中这种层次语义就没有这么强烈。
3、Channel中有个叫InterestOps的属性,它由四个值:OP_READ、OP_WRITE、OP_READ_WRITE、OP_NONE。下面解释一下:
OP_READ:如果被设置表示可以立即读取远程客户端发来的信息,否则就不接收直到标记再次被设置;
OP_WRITE:如果设置,写请求将不会被发送到远程客户端直到标记被清除而且写请求将在队列中挂起,如果没有设 置,写请求将尽早从队列中发送出去;
OP_READ_WRITE:这是OP_READ和OP_WRITE的组合,意思是只有写请求会被挂起;
OP_NONE:意思是只有读操作会被挂起。
值得一提的是,OP_READ标记可以通过setReadable(boolean)直接修改,而OP_WRITE是只读的,其实OP_WRITE标记只是想告诉你不能发布太多的写操作,可能会导致内存溢出,比如在nio的socket中OP_WRITE标记在NioSocketChannelConfig中配置。
ChannelEvent
一个i/o事件关联一个Channel,ChannelEvent在ChannelPipeline中被一系列的ChannelHandler处理。
每个事件要么是upstream event(上游事件),要么是downstream event(下游事件),如果一个事件从管道中的第一个处理器游向最后一个处理器,我们叫它上游事件;如果一个事件从管道中的最后一个事件游向第一个事件,我们叫它下游事件,看ChannelPipeline的图就明白多了。具体点说,服务器接收一个信息,这个接收信息的事件叫上游事件;服务器发送一个信息,这个发送信息的事件叫下游事件。这个规则同样适用于客户端,在客户端接收信息的事件叫上游事件,发送信息的事件叫下游事件。以下是事件举例:
上游事件:messageReceived、exceptionCaught、channelOpen、channelClosed、channelBound、channelUnbound、channelConnected、writeComplete、channelDisconnected、channelInterestChanged,还有两个额外的事件只用于父通道(可以有子通道的):childChannelOpen、childChannelClosed;
下游事件:write、bind、unbind、connect、disconnect、close;
值得一提的是,Channel中没有open事件,这是因为Channel在被创建后总是开着的。
ChannelFuture
ChannelFuture表示异步i/o操作的结果
netty中的全部的i/o操作都是异步的,这就意味着任何i/o调用将会立即返回,而不保证i/o操作是否完成。取而代之的是,获得一个ChannelFuture实例,其中包含着i/o操作的结果或状态。
ChannelFuture要么是完成的要么是没有完成的。当一个i/o操作开始,一个新的future对象将被创建。新的future初始化为没有完成,它既不是成功、失败也不是删除,因为它还没有完成。如果i/o操作是完成的,要么就是成功、失败,要么就是删除,future被标记为completed会有更多信息,比如失败的原因,值得注意的是失败和删除都属于完成。
* <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = <b>true</b> | * +--------------------------+ | | isSuccess() = <b>true</b> | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = <b>false</b> | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = <b>true</b> | * | isCancelled() = false | | | getCause() = <b>non-null</b> | * | getCause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = <b>true</b> | * | isCancelled() = <b>true</b> | * +---------------------------+ * </pre>
建议使用addListener(ChannelFutureListener)而不是await()来获得通知,addListener(ChannelFutureListener) 是非阻塞的,给ChannelFuture添加监听器是简便的,i/o线程将会通知监听器当i/o操作关联的future完成了,ChannelFutureListener有利于性能的提升和资源的利用因为它不是阻塞的,但是如果不用事件驱动模型,实现顺序逻辑就相当棘手。
对比一下await(),它是阻塞的,一旦调用,线程将阻塞直到操作完成。它实现顺序逻辑是容易的。但是调用线程阻塞直到操作完成是不必要的,并且线程间的唤醒是昂贵的,而且在特定的环境中容易死锁。如下描述
1、不要在ChannelHandler里面调用await()
* <pre> * // BAD - NEVER DO THIS * {@code @Override} * public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) { * if (e.getMessage() instanceof GoodByeMessage) { * {@link ChannelFuture} future = e.getChannel().close(); * future.awaitUninterruptibly(); * // Perform post-closure operation * // ... * } * } * * // GOOD * {@code @Override} * public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) { * if (e.getMessage() instanceof GoodByeMessage) { * {@link ChannelFuture} future = e.getChannel().close(); * future.addListener(new {@link ChannelFutureListener}() { * public void operationComplete({@link ChannelFuture} future) { * // Perform post-closure operation * // ... * } * }); * } * } * </pre>
不要弄混了i/o 延时和等待延时,他们之间没有一点关系,如果一个i/o操作延时,future会被标记为'completed with failure'。连接延时应该通过指定的传输操作配置,如下
* <pre> * // BAD - NEVER DO THIS * {@link ClientBootstrap} b = ...; * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(10, TimeUnit.SECONDS); * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * // You might get a NullPointerException here because the future * // might not be completed yet. * f.getCause().printStackTrace(); * } else { * // Connection established successfully * } * * // GOOD * {@link ClientBootstrap} b = ...; * // Configure the connect timeout option. * <b>b.setOption("connectTimeoutMillis", 10000);</b> * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(); * * // Now we are sure the future is completed. * assert f.isDone(); * * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * f.getCause().printStackTrace(); * } else { * // Connection established successfully * } * </pre>
ChannelHandler
ChannelHandler有两种类型:ChannelUpstreamHandler和ChannelDownstreamHandler,分别处理、拦截上游事件和下游事件,处理器由ChannelHandlerContext提供,通过这个上下文对象,处理器可以获得上游事件或者下游事件,动态修改管道,存储一些有用的信息。
一个ChannelHandler常常需要存储一些有用的信息,最简单的办法就是用成员变量,由于处理器实例有连接专用的状态变量,不得不为每个新的channel新建一个处理器实例以避免竞态条件(未认证的客户不能获取机密信息)。
当然还是有一些方法可以不用成员变量来存储的,比如ChannelHandlerContext提供的attachment,使得同一个处理器实例用于不同的管道中。
在ChannelHandler中有这么一个注解@Sharable,意味着这个处理器实例可以被添加到一个或多个ChannelPipeline对象中多次,而不用考虑竞态条件,如果不指定这个注解,每次给管道添加处理器时不得不新创建一个实例对象,因为它有不共享的变量。
ChannelPipeline
处理器处理Channel中的事件,ChannelPipeline提供了一个很好的模式去有效控制一个事件怎么被处理以及管道中的处理器怎么相互作用。
对于每个新的channel(渠道),一个新的pipeline(管道)必须被创建并且连接上这个channel。一旦连接,他们之间的耦合就是永久的,channel不能连接其他的pipeline也不能分离当前的pipeline。
一个事件是怎么流入管道的呢?先看下图
* <pre> * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +----------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +----------------------+ +-----------+------------+ | * | | Upstream Handler N | | Downstream Handler 1 | | * | +----------+-----------+ +-----------+------------+ | * | /|\ | | * | | \|/ | * | +----------+-----------+ +-----------+------------+ | * | | Upstream Handler N-1 | | Downstream Handler 2 | | * | +----------+-----------+ +-----------+------------+ | * | /|\ . | * | . . | * | [ sendUpstream() ] [ sendDownstream() ] | * | [ + INBOUND data ] [ + OUTBOUND data ] | * | . . | * | . \|/ | * | +----------+-----------+ +-----------+------------+ | * | | Upstream Handler 2 | | Downstream Handler M-1 | | * | +----------+-----------+ +-----------+------------+ | * | /|\ | | * | | \|/ | * | +----------+-----------+ +-----------+------------+ | * | | Upstream Handler 1 | | Downstream Handler M | | * | +----------+-----------+ +-----------+------------+ | * | /|\ | | * +-------------+--------------------------+---------------+ * | \|/ * +-------------+--------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +--------------------------------------------------------+ * </pre>
在这里举例来说明这个图的意思
ChannelPipeline p=Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());
处理器的顺序是1、2、3、4、5,
如果一个上游事件流入管道,处理器顺序是1、2、5;如果一个下游事件流入管道,处理器的顺序是4、3;如果5既实现了ChannelUpstreamHandler也实现了ChannelDownstreamHandler,上下游事件的处理器顺序分别是1、2、5和5、4、3。
一般来说管道中会注册多个处理器,比如通常的服务器会这样定义:Protocol Decoder、Protocol Encoder、Business Logic Handler。处理器能够被添加和删除在任何时刻因为ChannelPipeline是线程安全的,比如有机密信息要交换时你可以插入SslHandler,交换后可以删除。但是需要注意的一个陷阱就是,在删除一个处理器的时候应该确保在其后管道中至少有两个处理器或者一个都没有。比如以下代码不起作用
public class FirstHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { // Remove this handler from the pipeline, ctx.getPipeline().remove(this); // And let SecondHandler handle the current event. ctx.getPipeline().addLast("2nd", new SecondHandler()); ctx.sendUpstream(e); } }
ChannelSink
ChannelSink接收和处理终端的下游事件,ChannelSink是一个内部组件,应该由传输协议提供者实现,大部分的用户在他们的代码中不会看到这个类。
SelectionKey
SelectionKey是SelectableChannel在selector中注册的一个标志。
key在每次channel注册到selector时被创建,key保持有用的直到它被删除、channel被关闭或者selector被关闭,删除一个key不意味着它被立即从selector中删除,而是被添加到cancelled-key集合中。一个key是否可用可以通过isValid来测试。
Selector
Selector是channel的一个多路复用器,selector有两种创建方式:
1、open方法,用系统默认的java.nio.channels.spi.SelectorProvider实例创建一个新的selector对象;
2、java.nio.channels.spi.SelectorProvider的openSelector方法;
selector的创建都是通过底层操作实现的。selector会保持打开状态知道调用了close方法。
一个selectable channel注册selector意味着会有一个SelectionKey对象,一个selector有三个这样的键集合:
key set代表在selector中注册的channel的key集合;selected-key set代表准备好i/o操作的key集合,它总是key set的一个子集;cancelled-key表示key已经被删除了但对应的channel还没有在selector中被撤销注册的key集合。
在新创建selector的时候,这三个集合都是空的。
一个key被添加到selector的key集合中是channel注册到selector中的一个附加的作用,Cancelled keys是在selection操作过程中被删除的,key集合自己不会被直接修改。
当channel关闭或者调用SelectionKey的删除方法后(表示key被删除),key将会被添加到selector的cancelled-key集合中。删除key将导致他的channel在下次selection操作中被撤销注册,同时这个key将会从selector的key集合中被删除。
key被添加到selected-key 集合中是通过selection操作完成的,一个key可以被直接删除,通过这两种方式:java.util.Set#remove和java.util.Iterator#remove();除此之外没有其他方法可以删除了,特别是,他们不会像添加一样作为一种附加的作用而被删除,key不会被直接添加到selected-key集合中。
select操作有三个:selectNow()、select(long timeout)和select()。下面是他们的一些特点: selectNow()是非阻塞的,如果没有channel是可选择的,它将立即返回0;select(long timeout)是阻塞的,唤醒方式:至少一个channel被选择了、调用了wakeup方法、当前线程被中断、超时,无论哪种先发生。select()与select(long timeout)相比就是无限定时间等待了。
wakeup()方法的作用在上面三个操作中有提到,在这里再详细的描述一下:如果其他线程正在阻塞中(调用select()或select(long timeout)),wakeup()后他们将立即返回,如果当时没有selection操作在阻塞,那么在下次调用阻塞的selection操作后将立即返回,除非同时调用了selectNow()(它会清除先前调用的wakeup方法的效果),所以说wakeup()调用后不会空手而回的,后面的阻塞操作将像平常一样阻塞。值得一提的是:wakeup()方法在两个成功的selection操作之间调用多次将视作一次。整个机制用跟线程的中断机制有点类似。
相关推荐
- **概念**: `ChannelHandler` 是 Netty 中处理 I/O 事件的核心接口。 - **用途**: 实现各种事件处理方法,如读事件、写事件、异常事件等。 - **分类**: `ChannelInboundHandler` 处理入站事件,`...
1. **语言无障碍**: 对于不擅长英语的开发者,中文文档降低了理解门槛,能更快地掌握Netty的核心概念和使用方法。 2. **结构化内容**: CHM格式是一种常见的离线帮助文档格式,内容组织有序,方便查阅和学习。 3. **...
Channel 是 Netty 中的基础概念,代表一个打开的连接;EventLoop 负责处理 I/O 事件;ByteBuf 是高效的数据缓冲区;Pipeline 是消息处理的链式结构;Handler 是事件处理的实体。理解这些概念及其交互方式,是构建...
这个中文教程全面覆盖了Netty的核心概念和技术,旨在帮助开发者深入理解和高效利用Netty。以下是对教程中可能涉及的知识点的详细解释: 1. **基础功能**: - **NIO(非阻塞I/O)**:Netty基于Java NIO API构建,...
这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,多线程的应用是其处理高并发、高...理解并掌握这些概念,有助于构建更健壮、更高效的网络应用。
这部分内容将帮助理解Netty是如何组织各个组件以实现高效、可扩展的服务。 - **实践案例**:除了理论学习之外,实践也是非常重要的。尝试自己动手实现一些小项目,比如一个简单的聊天服务器或文件传输服务,可以...
这个“netty 4.1 中文.CHM”文件是一个压缩包,包含的是Netty 4.1版本的中文版帮助文档,对于开发者来说是一个非常宝贵的资源,特别是对于那些中文为母语的开发者,它提供了方便的理解和学习Netty的途径。...
在本文中,我们将深入探讨 Netty 4.1 的官方中文 API 文档,了解它如何帮助开发者在大数据高并发场景下构建网络应用。 首先,让我们了解一下 Netty 的核心概念: 1. **NIO (非阻塞I/O)**: Netty 基于 Java NIO ...
这本书通过详实的代码案例,帮助读者理解和掌握Netty的核心概念和技术。 在Netty中,最重要的概念之一是“Boss线程”和“Worker线程”的模型。Boss线程负责接收新的连接请求,而Worker线程则处理这些连接后的读写...
2. **Channel**: Channel 是 Netty 中的核心概念,它代表了一个网络连接,可以进行读写操作。每个 Channel 都有其对应的 ChannelHandler 处理进来的数据。 3. **ChannelHandlerContext**: ChannelHandlerContext 是...
这个“netty简单的demo很好理解”的例子,很可能是为了展示Netty的基本用法,帮助初学者理解其核心概念。我们将通过以下几个方面来深入探讨这个Demo: 1. **异步编程模型**: Netty 使用了Java NIO(非阻塞I/O)...
在开始使用 Netty 之前,需要理解它的核心概念,如 Channel、EventLoop 和 ChannelHandler。Channel 是网络连接的抽象,它负责实际的读写操作。EventLoop 是 Netty 中用于处理 I/O 事件的线程,而 ChannelHandler 则...
在Netty的学习过程中,首先需要理解的是它的基础概念。Netty基于NIO(非阻塞I/O)模型,提供了高效、易用的API,使得开发者可以方便地构建网络服务。它包含了服务器端和客户端的Bootstrap类,用于启动和配置网络应用...
在实际学习过程中,可以先从阅读Netty的官方文档开始,理解基本概念和API。然后,通过分析RocketMQ的源码,观察它是如何利用Netty来实现网络通信的。可以尝试自己编写简单的Netty服务器和客户端,模拟RocketMQ的消息...
在深入探讨Netty之前,我们先理解一下IO流的基础概念。 IO流是Java中处理输入输出的基本方式,分为字节流(处理字节数据)和字符流(处理字符数据)。BIO( Blocking IO)是最传统的IO模型,它的特点是阻塞式读写,...
首先,让我们深入理解Netty的核心概念。Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环...
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
通过阅读《Netty权威指南》中文版,读者不仅可以掌握Netty的基本用法,还能深入理解其背后的原理和技术细节,从而在实际项目中更好地利用Netty实现高效、可靠的网络服务。无论是初学者还是经验丰富的开发者,都能...
- **Netty的意义**:本节简要介绍了Netty的目的及其在现代网络应用开发中的重要性。 #### 1. 问题 - **常规协议局限性**:讨论了通用协议(如HTTP)在面对大规模文件传输、邮件、金融数据等特殊应用场景时存在的...