- 浏览: 987639 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 网络通信示例一 :http://donald-draper.iteye.com/blog/2383326
netty 网络通信示例二:http://donald-draper.iteye.com/blog/2383328
netty 网络通信示例三:http://donald-draper.iteye.com/blog/2383392
netty 网络通信示例四:http://donald-draper.iteye.com/blog/2383472
Netty 构建HTTP服务器示例:http://donald-draper.iteye.com/blog/2383527
Netty UDT网络通信示例:http://donald-draper.iteye.com/blog/2383529
前我们用几篇文章简单看了一下Netty的网络通信,从示例中可以看出,实际的数据处理都是交给
通道处理器ChannelHandler去处理,包括上层消息对象转换底层字节流和字节流转换为上层消息对象。在简单的ECHO示例中服务端和客户端的IO操作Handler都是基于ChannelInboundHandlerAdapter,今天我们来看一下通道处理器。
先看一下Inbound通道处理器的继承树结构:
下面我们来看一下通道处理器接口ChannelHandler的定义
小节:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,
handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;
handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。
通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,
通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。
通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的
通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到
通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新
创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从
上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。这些我们在后面再讲。
再来看一下通道处理器适配器:
从通道处理器适配的定义来看,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
我们来单独看一下,判断通道处理器是否为注解:
来看这一句:
//InternalThreadLocalMap
从上面来看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量
获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则
将探测结果添加到缓存中。
总结:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。
通道处理器适配器ChannelHandlerAdapter的设计模式为适配器,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则
将探测结果添加到缓存中。
附:
//UnpaddedInternalThreadLocalMap
netty 网络通信示例二:http://donald-draper.iteye.com/blog/2383328
netty 网络通信示例三:http://donald-draper.iteye.com/blog/2383392
netty 网络通信示例四:http://donald-draper.iteye.com/blog/2383472
Netty 构建HTTP服务器示例:http://donald-draper.iteye.com/blog/2383527
Netty UDT网络通信示例:http://donald-draper.iteye.com/blog/2383529
前我们用几篇文章简单看了一下Netty的网络通信,从示例中可以看出,实际的数据处理都是交给
通道处理器ChannelHandler去处理,包括上层消息对象转换底层字节流和字节流转换为上层消息对象。在简单的ECHO示例中服务端和客户端的IO操作Handler都是基于ChannelInboundHandlerAdapter,今天我们来看一下通道处理器。
先看一下Inbound通道处理器的继承树结构:
//ChannelInboundHandlerAdapter public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
//ChannelInboundHandler public interface ChannelInboundHandler extends ChannelHandler {
//ChannelHandlerAdapter public abstract class ChannelHandlerAdapter implements ChannelHandler {
下面我们来看一下通道处理器接口ChannelHandler的定义
package io.netty.channel; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in * its {@link ChannelPipeline}. *处理IO事件或拦截IO操作,并转发事件到处理器管道的下一个通道处理器 * <h3>Sub-types</h3> * * {@link ChannelHandler} itself does not provide many methods, but you usually have to implement one of its subtypes: 通道处理器本身没有提供太多的方法,但是你可以实现它的子类型如下 * [list] * [*]{@link ChannelInboundHandler} to handle inbound I/O events, and 输入流通道处理器,处理流入的IO事件 * [*]{@link ChannelOutboundHandler} to handle outbound I/O operations. 输出流通道处理器,处理流出的IO操作 * [/list] * * * Alternatively, the following adapter classes are provided for your convenience: 另外还提供了如下结果适配器 * [list] * [*]{@link ChannelInboundHandlerAdapter} to handle inbound I/O events, 输入流处理器适配器,处理流入的IO事件 * [*]{@link ChannelOutboundHandlerAdapter} to handle outbound I/O operations, and 输出流通道适配器,处理流出的IO操作 * [*]{@link ChannelDuplexHandler} to handle both inbound and outbound events 多路复用适配器,可以处理流入和流出的IO事件 * [/list] * * * For more information, please refer to the documentation of each subtype. * * * <h3>The context object</h3> 上下文对象 * <p> * A {@link ChannelHandler} is provided with a {@link ChannelHandlerContext} * object. A {@link ChannelHandler} is supposed to interact with the * {@link ChannelPipeline} it belongs to via a context object. Using the * context object, the {@link ChannelHandler} can pass events upstream or * downstream, modify the pipeline dynamically, or store the information * (using {@link AttributeKey}s) which is specific to the handler. *一个通道处理器关联一个通道处理器上下文。通道处理器通过一个上下文对象,与它所属的 通道管道线交互。通道上一下对象,通道处理器上行或下行传递事件,动态修改管道,或通过 AttributeKey存储特殊的信息。 * <h3>State management</h3> *状态管理 * A {@link ChannelHandler} often needs to store some stateful information. * The simplest and recommended approach is to use member variables: 通道处理器器通道通常需要存储一些状态信息,简单有效,并强烈的方法是用成员变量: * <pre> * public interface Message { * // your methods here * } * * public class DataServerHandler extends {@link SimpleChannelInboundHandler}<Message> { * * <b>private boolean loggedIn;</b> * * {@code @Override} * public void channelRead0({@link ChannelHandlerContext} ctx, Message message) { * {@link Channel} ch = e.getChannel(); * if (message instanceof LoginMessage) { * authenticate((LoginMessage) message); * <b>loggedIn = true;</b> * } else (message instanceof GetDataMessage) { * if (<b>loggedIn</b>) { * ch.write(fetchSecret((GetDataMessage) message)); * } else { * fail(); * } * } * } * ... * } * </pre> * Because the handler instance has a state variable which is dedicated to * one connection, you have to create a new handler instance for each new * channel to avoid a race condition where a unauthenticated client can get * the confidential information: 因为通道处理器实例有一个用于表示连接的状态变量,为了避免没有认证的客户端获取机密的信息, 这一个竞争条件,你不得不为每个新建的通道创建一个处理器实例。 * <pre> * // Create a new handler instance per channel. 每个通道创建一个处理器 * // See {@link ChannelInitializer#initChannel(Channel)}. * public class DataServerInitializer extends {@link ChannelInitializer}<{@link Channel}> { * {@code @Override} * public void initChannel({@link Channel} channel) { * channel.pipeline().addLast("handler", <b>new DataServerHandler()</b>); * } * } * * </pre> * * <h4>Using {@link AttributeKey}s</h4> *使用AttributeKey * Although it's recommended to use member variables to store the state of a * handler, for some reason you might not want to create many handler instances. * In such a case, you can use {@link AttributeKey}s which is provided by * {@link ChannelHandlerContext}: 尽管强烈建议使用成员变量来存储处理器状态,由于一些原因,你也许不想创建许多处理器实例。 在这种情况,你可以使用通道处理器上下文提供的属性键AttributeKey。 * <pre> * public interface Message { * // your methods here * } * * {@code @Sharable}//共享通道处理器 * public class DataServerHandler extends {@link SimpleChannelInboundHandler}<Message> { * private final {@link AttributeKey}<{@link Boolean}> auth = * {@link AttributeKey#valueOf(String) AttributeKey.valueOf("auth")}; * * {@code @Override} * public void channelRead({@link ChannelHandlerContext} ctx, Message message) { * {@link Attribute}<{@link Boolean}> attr = ctx.attr(auth); * {@link Channel} ch = ctx.channel(); * if (message instanceof LoginMessage) { * authenticate((LoginMessage) o); * <b>attr.set(true)</b>; * } else (message instanceof GetDataMessage) { * if (<b>Boolean.TRUE.equals(attr.get())</b>) { * ch.write(fetchSecret((GetDataMessage) o)); * } else { * fail(); * } * } * } * ... * } * </pre> * Now that the state of the handler is attached to the {@link ChannelHandlerContext}, you can add the * same handler instance to different pipelines: 这种情况下,处理器附加在通道处理器上下文上,你可以添加相同的处理器实例到不同的管道上。 * <pre> * public class DataServerInitializer extends {@link ChannelInitializer}<{@link Channel}> { * * private static final DataServerHandler <b>SHARED</b> = new DataServerHandler(); * * {@code @Override} * public void initChannel({@link Channel} channel) { * channel.pipeline().addLast("handler", <b>SHARED</b>); * } * } * </pre> * * * <h4>The {@code @Sharable} annotation</h4> * <p>@Sharable注解 * In the example above which used an {@link AttributeKey}, * you might have noticed the {@code @Sharable} annotation. 在上面属性键的实例中,你也许已经注意到共享注解的使用。 * <p> * If a {@link ChannelHandler} is annotated with the {@code @Sharable} * annotation, it means you can create an instance of the handler just once and * add it to one or more {@link ChannelPipeline}s multiple times without * a race condition. 被@Sharable注解的通道处理器,意味着,你可以一次性创建一个通道处理器实例,在没有竞争 条件的情况下,可以一次或多次到通道管道线。 * <p> * If this annotation is not specified, you have to create a new handler * instance every time you add it to a pipeline because it has unshared state * such as member variables. 如果通道处理器没有被@Sharable注解,由于通道处理器状态不共享,比如成员变量, 你不得不在每次添加通道处理器到通道管道线时,创建一个新的通道处理器实例。 * <p> * This annotation is provided for documentation purpose, just like * [url=http://www.javaconcurrencyinpractice.com/annotations/doc/]the JCIP annotations[/url]. *此注解的相关使用文档,参看如下链接。 * <h3>Additional resources worth reading</h3> * <p>附加可读资源。 * Please refer to the {@link ChannelHandler}, and * {@link ChannelPipeline} to find out more about inbound and outbound operations, * what fundamental differences they have, how they flow in a pipeline, and how to handle * the operation in your application. 参见通道处理器和通道管道线,获取inbound和outbound更多的操作信息,两种操作的本质不同,在管道中的流向, 以及如果在应用中处理操作 */ public interface ChannelHandler { /** * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events. 在通道处理器添加到实际上下文后调用,准备处理IO事件 */ void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events * anymore. 在通道处理器从实际上下文中移除后调用,不再处理IO事件 */ void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. *到处理IO事件,异常抛出时调用,已丢弃 * @deprecated is part of {@link ChannelInboundHandler} */ @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; /** * Indicates that the same instance of the annotated {@link ChannelHandler} * can be added to one or more {@link ChannelPipeline}s multiple times * without a race condition. Sharable注解表示一个被Sharable注解的通道处理器可以在没有竞争条件的情况下,一次或多次 添加到通道管道线上。 * <p> * If this annotation is not specified, you have to create a new handler * instance every time you add it to a pipeline because it has unshared * state such as member variables. 如果通道处理器,没有被Sharable注解通道处理器,由于通道处理器成员变量为不共享状态,每次添加通道处理器到管道时, 必须创建一个新的处理器实例 * <p> * This annotation is provided for documentation purpose, just like * [url=http://www.javaconcurrencyinpractice.com/annotations/doc/]the JCIP annotations[/url]. */ @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }
小节:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,
handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;
handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。
通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,
通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。
通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的
通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到
通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新
创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从
上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。这些我们在后面再讲。
再来看一下通道处理器适配器:
package io.netty.channel; import io.netty.util.internal.InternalThreadLocalMap; import java.util.Map; /** * Skeleton implementation of a {@link ChannelHandler}. 通道处理基本实现 */ public abstract class ChannelHandlerAdapter implements ChannelHandler { // Not using volatile because it's used only for a sanity check. //没有使用volatile,因为此变量仅仅用于 boolean added; /** * Throws {@link IllegalStateException} if {@link ChannelHandlerAdapter#isSharable()} returns {@code true} 判断通道处理器是否开启共享 */ protected void ensureNotSharable() { if (isSharable()) { throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared"); } } /** * Return {@code true} if the implementation is {@link Sharable} and so can be added * to different {@link ChannelPipeline}s. 如果通道处理器被Sharable注解,则返回true,你可以添加到不同的通道管道线。 */ public boolean isSharable() { /** * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of * {@link Thread}s are quite limited anyway. 缓存通道处理器共享注解探测结果。我们用一个ThreadLocal和WeakHashMap来剔除可见的读写操作。 每个线程一个WeakHashMap实例,同时线程数量是有限的 * * See [url=https://github.com/netty/netty/issues/2289]#2289[/url]. */ Class<?> clazz = getClass(); //获取线程共享注解通道处理器缓存 Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { //判断通道处理器是否被Sharable注解 sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } /** * Do nothing by default, sub-classes may override this method. handlerAdded事件默认处理器,不做任何事情 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } /** * Do nothing by default, sub-classes may override this method. handlerRemoved事件默认处理器,不做任何事情 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } /** * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. *当IO操作出现异常时,调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给 通道管道线的下一个通道处理器 * Sub-classes may override this method to change behavior. */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
从通道处理器适配的定义来看,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
我们来单独看一下,判断通道处理器是否为注解:
public boolean isSharable() { /** * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of * {@link Thread}s are quite limited anyway. 缓存通道处理器共享注解探测结果。我们用一个ThreadLocal和WeakHashMap来剔除可见的读写操作。 每个线程一个WeakHashMap实例,同时线程数量是有限的 * * See [url=https://github.com/netty/netty/issues/2289]#2289[/url]. */ Class<?> clazz = getClass(); //获取线程共享注解通道处理器缓存 Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { //判断通道处理器是否被Sharable注解 sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; }
来看这一句:
//获取线程共享注解通道处理器缓存 Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
//InternalThreadLocalMap
/** * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} * unless you know what you are doing. InternalThreadLocalMap为存储Netty线程本地变量和FastThreadLocal的内部数据结构。注意此类仅内部使用,可能随时改变。 除非你知道你在做什么,不然的话用FastThreadLocal */ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { //获取线程本地变量InternalThreadLocalMap public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; } private static InternalThreadLocalMap slowGet() { ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; } //获取线程本地共享注解通道处理器探测结果缓存Map<Class<?>, Boolean> public Map<Class<?>, Boolean> handlerSharableCache() { Map<Class<?>, Boolean> cache = handlerSharableCache; if (cache == null) { // Start with small capacity to keep memory overhead as low as possible. handlerSharableCache = cache = new WeakHashMap<Class<?>, Boolean>(4); } return cache; } }
从上面来看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量
获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则
将探测结果添加到缓存中。
总结:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。
通道处理器适配器ChannelHandlerAdapter的设计模式为适配器,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则
将探测结果添加到缓存中。
附:
//UnpaddedInternalThreadLocalMap
/** * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} * unless you know what you are doing. UnpaddedInternalThreadLocalMap为存储Netty线程本地变量和FastThreadLocal的内部数据结构。注意此类仅内部使用,可能随时改变。 除非你知道你在做什么,不然的话用FastThreadLocal */ class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); /** Used by {@link FastThreadLocal} */ Object[] indexedVariables; // Core thread-locals int futureListenerStackDepth; int localChannelReaderStackDepth; Map<Class<?>, Boolean> handlerSharableCache; IntegerHolder counterHashCode; ThreadLocalRandom random; Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache; Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache; // String-related thread-locals StringBuilder stringBuilder; Map<Charset, CharsetEncoder> charsetEncoderCache; Map<Charset, CharsetDecoder> charsetDecoderCache; // ArrayList-related thread-locals ArrayList<Object> arrayList; UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { this.indexedVariables = indexedVariables; } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1328netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2072netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2462netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1324netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1325netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1604netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1851netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1407netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2848netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2193netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2046netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1089netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1884netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1225netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1212netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 964netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1318netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2197netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1091netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1866netty 管道线定义-ChannelPipeline:htt ...
相关推荐
3. **创建 Netty 通道处理器**:在 Netty 中,我们需要创建一个自定义的 `ChannelInboundHandler` 或 `ChannelOutboundHandler` 来处理 Protobuf 消息的编码和解码。可以使用 ` ProtobufVarint32FrameDecoder` 和 `...
ChannelHandler 是 Netty 中处理事件的核心接口,分为两种类型:InboundHandler(入站操作,如接收数据、连接建立等)和 OutboundHandler(出站操作,如发送数据、关闭连接等)。通过组合不同的 Handler,可以实现...
ChannelHandler则是定义了入站和出站事件的处理逻辑,是开发者最关心的部分;EventLoop用于处理异步操作,负责执行事件处理器中的代码;BootStrap则负责Netty应用的启动和初始化。 Netty的高性能主要得益于其零拷贝...
在处理网络通信时,数据的编码和解码是必不可少的环节,Netty 提供了一系列预置的 `ChannelHandler` 和编解码器,使得开发者能够方便地处理各种复杂的网络通信场景。 首先,`IdleStateHandler` 是 Netty 中用于检测...
这个压缩包“掘金-Netty 入门与实战-netty-demo.zip”包含了关于 Netty 的入门教程和实战示例,特别是通过 "netty-demo-pipeline-channelHandler" 这个文件名我们可以推测,它可能包含了一个展示 Netty ...
Netty提供了丰富的ChannelHandler(通道处理器)和ByteBuf(字节缓冲区)等组件,使得开发人员可以方便地处理各种网络协议,包括HTTP和FTP等涉及文件传输的协议。在文件上传场景下,服务器端通常需要创建一个...
3. **易用性**:Netty提供了丰富的API和预定义的ChannelHandler,如ByteToMessageDecoder、MessageToByteEncoder等,使得编码过程更为简洁。 4. **可扩展性**:Netty的管道(Pipeline)机制允许开发者方便地插入...
而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 Netty 4.1.6版本引入了许多改进和优化。例如,性能提升、内存管理优化、错误处理的增强以及对新...
为了实现通信,服务端和客户端都需要定义自己的`model对象`,这些对象通常包含了数据传输的结构和协议解析的相关逻辑。在本示例中,"model对象目录必须一致"意味着服务端和客户端需要使用相同的类来序列化和反序列化...
通过定义处理器(ChannelHandler)来处理 MQTT 的各种消息类型,如 CONNECT、SUBSCRIBE、PUBLISH 等。Netty 的非阻塞 I/O 模型使得服务器能够高效地处理大量并发连接。 **5. MQTT 服务器端开发步骤** - 创建 MQTT ...
- **ChannelHandler**:这是Netty中的处理器接口,用于处理I/O事件和拦截I/O操作。开发者可以通过实现ChannelInboundHandler或ChannelOutboundHandler接口来处理入站和出站事件。 - **ChannelHandlerContext**:它...
在Netty中,处理器ChannelHandler是处理I/O事件的组件,它负责数据的读取、业务逻辑处理和异常处理等。编写处理器是开发Netty应用程序的关键步骤之一。在Netty5用户指南中,通过实现一个简单的DISCARD协议来说明如何...
在SpringBoot中,我们可以通过实现WebFlux的`HandlerFunction`或者`WebFilter`来定义Netty的路由和处理逻辑。Netty Server端通常会包含一个`ServerBootstrap`,用于设置服务器的各种属性,如事件循环组、通道处理器...
ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,实现数据的双向流动。 Netty 4.1.19.Final 版本包含了多个优化和改进,例如更好的内存管理、性能提升以及对新协议的...
在 Netty 中,Disruptor 可以作为事件处理器链的一部分,优化事件的发布和消费。通常,Netty 的 ChannelHandler 链会处理进来的网络事件,如读写操作。通过引入 Disruptor,这些事件可以更高效地在处理器之间传递。 ...
2. **定义 ChannelHandler**:在 Netty 中,ChannelHandler 是处理网络事件的核心组件。我们需要创建自定义的 ChannelInboundHandler 或 ChannelOutboundHandler,以处理接收和发送数据的逻辑。这些 Handler 可以...
1. **事件驱动模型**:Netty采用的是一种反应式编程模型,通过EventLoop(事件循环)和ChannelHandler(通道处理器)处理网络事件,如连接建立、数据读写、异常发生等。 2. **NIO基础**:Netty基于Java NIO(非阻塞...
4. **ChannelHandler**:ChannelHandler 是 Netty 中的核心组件,负责处理 I/O 事件和数据传输。它们可以被串联起来形成一个 ChannelPipeline,每个 ChannelHandler 只需关注自己处理的部分,简化了逻辑。 5. **...
1. **创建 ChannelHandler**:定义自定义的 ChannelInboundHandler 和 ChannelOutboundHandler,用于处理接收到的数据和发送数据。例如,我们可以创建一个 `MessageDecoder` 来解码接收到的字节流为消息对象,以及一...