/** * Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in * its {@link ChannelPipeline}. * 处理IO事件、拦截IO操作或跳转到下一个ChannelHandler处理 * <h3>子类型</h3> * ChannelHandler 本身未提供IO操作的方法,你通常需要实现它的两个子接口: * 1.ChannelInboundHandler 处理inbound IO事件 * 2.ChannelOutboundHandler 处理outbound IO操作。 * 或者使用方使的Adapter class: * 1.ChannelInboundHandlerAdapter 处理inbound IO事件 * 2.ChannelOutboundHandlerAdapter 处理outbound IO操作。 * 3.ChannelDuplexHandler 处理inbound IO事件 和 outbound IO事件 * * <h3>The context object</h3> * 每一个ChannelHandler 都对应一个独立的ChannelHandlerContext(ChannelHandlerContext持有ChannelHandler) * ChannelHandlerContext 是ChannelPipeline与ChannelHandler 的粘合剂。 * 事件来源于ChannelPipeline-》ChannelHandlerContext-》 ChannelHandler。io方法 * 上一个(或下一个)ChannelHandlerContext-》 ChannelHandler。io方法 * 上一个(或下一个)ChannelHandlerContext-》 ChannelHandler。io方法 * ChannelHandlerContext的attr(AttributeKey)可以保存状态信息(线程安全) * <h3>使用AttributeKey 保存状态信息</h3> * * * <h4>Using {@link AttributeKey}s</h4> * * public class DataServerHandler extends SimpleChannelInboundHandler<Message> { * private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth")}; * public void channelRead(ChannelHandlerContext ctx, Message message) { * Attribute<Boolean> attr = ctx.attr(auth); * Channel ch = ctx.channel(); * if (message instanceof LoginMessage) { * authenticate((LoginMessage) o); * attr.set(true); * } else (message instanceof GetDataMessage) { * if (Boolean.TRUE.equals(attr.get())) { * ch.write(fetchSecret((GetDataMessage) o)); * } else { * fail(); * } * } * } * ... * } * * <h4>The {@code @Sharable} annotation</h4> * 如果一个ChannelHandler被标记@Sharable,该ChannelHandler可以被添加到一个或多个ChannelPipeline多次。 * 该ChannelHandler必须是线程安全的。 * 如果一个ChannelHandler未标记@Sharable,每添加到ChannelPipeline之前,需要创建一个新的实例,再添到ChannelPipeline中 * */ public interface ChannelHandler { /** * 当ChannelHandler被添加到ChannelPipeline,并准备好处理IO事件时(register完成后),触发。 * 被调用时机: * 1. Bootstrap b;b.handler(new ChannelInitializer<SocketChannel>() {//ignore} 当regisger操作(在selector上注 * 册channel)完成后触发 * 2.channel.pipeline.addxxxx(ChannelHandler)方法时,如果已注册的,则立即触发否则等注册后触发 **/ void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * 当ChannelHandler被从ChannelPipeline移除时(不处理IO事件) */ void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /** * 如果一个ChannelHandler被标记@Sharable,该ChannelHandler可以被添加到一个或多个ChannelPipeline多次。 * 该ChannelHandler必须是线程安全的。 * 如果一个ChannelHandler未标记@Sharable,每添加到ChannelPipeline之前,需要创建一个新的实例,再添到ChannelPipeline中 * **/ @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } } /** * {@link ChannelHandler} which will get notified for IO-outbound-operations. */ public interface ChannelOutboundHandler extends ChannelHandler { /** * * 绑定到本地端口 */ void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * 连接到远程地址 * */ void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * * 网络连接断开 */ void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * 关闭网络连接 * */ void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * 从EventLoop上注销Channel */ void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * 读取数据 */ void read(ChannelHandlerContext ctx) throws Exception; /** * 写入数据至缓冲区,需要调用Flush才真正写入channel **/ void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; /** * * 将写入缓冲区数据发送到对端网络 */ void flush(ChannelHandlerContext ctx) throws Exception; } /** * * 一个类实现了ChannelOutboundHandler的所有方法, * 实现仅仅是把事件转发给下一个ChannelHandler,即本类不处理任务事件,可以在子类中 * 覆盖方法实现逻辑。目的:子类可以覆盖需要的方法,其它方法采用默认实现--简单。 * </p> */ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } /** * * 对象--》byte 编码器骨架类,扩展了ChannelOutboundHandlerAdapter ,覆盖了write方法, * 只处理指定类型的对象: * 1.对于指定类型:对象转换byte写入ByteBuf,再交由后续ChannelOutboundHandler处理 * 2.对于非指定类型:直接交由后续ChannelOutboundHandler处理 * * 其子类需要实现 encode(ChannelHandlerContext ctx, I msg, ByteBuf out) 方法,将I msg 对象 * 转换为byte 写入ByteBuf out * * 以下为整型转换为ByteBuf的示例(Integer为指定类型): * * public class IntegerEncoder extends MessageToByteEncoder<Integer> { * * public void encode(ChannelHandlerContextctx, Integer msg, ByteBuf out) * throws Exception { * out.writeInt(msg); * } * } * */ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//是否为指定类型 I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf);//对象转换为byte写入buf } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) {//如果已转换为ByteBuf(buf可读取),则ByteBuf交由后续ChannelOutboundHandler处理 ctx.write(buf, promise); } else {//如果对象未写入ByteBuf(buf不可读取),则空的ByteBuf交由后续ChannelOutboundHandler处理 buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else {//非指定类型交由后续ChannelOutboundHandler处理 ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } } /** * 将I msg 对象转换为byte 写入ByteBuf out */ protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; } /** * 对象--》对象 编码器骨架抽像类,扩展了ChannelOutboundHandlerAdapter ,覆盖了write方法, * 只处理指定类型的对象: * 1.对于指定类型:对象转换一个或多个其它类型,再交由后续ChannelOutboundHandler处理 * 2.对于非指定类型:直接交由后续ChannelOutboundHandler处理 * * 目的:可以用于复杂对象拆分为更小粒度的简单对象。之后再对简单对象写入byte-->网络流 * * 其子类需要实现 encode(ChannelHandlerContext ctx, I msg, List<Object> out) 方法,将I msg 对象 * 转换一个或多个对象 写入List<Object> out * * 以下为Integer转换为String的编码器示例(Integer为指定类型) : * public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer>{ * * public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, List<Object> out) * throws {@link Exception} { * out.add(message.toString()); * } * } * */ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { CodecOutputList out = null; try { if (acceptOutboundMessage(msg)) {//是否为指定类型 out = CodecOutputList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out);//对象转换为另一类型写入CodecOutputList out } finally { ReferenceCountUtil.release(cast); } if (out.isEmpty()) {//如果未写入CodecOutputList out ,则抛出异常 out.recycle(); out = null; throw new EncoderException( StringUtil.simpleClassName(this) + " must produce at least one message."); } } else {//非指定类型交由后续ChannelOutboundHandler处理 ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) {//对解码后的每一个对象,调用后续ChannOutboundHandler处理 final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.get(0), promise); } else if (sizeMinusOne > 0) { // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure // See https://github.com/netty/netty/issues/2525 ChannelPromise voidPromise = ctx.voidPromise(); boolean isVoidPromise = promise == voidPromise; for (int i = 0; i < sizeMinusOne; i ++) { ChannelPromise p; if (isVoidPromise) { p = voidPromise; } else { p = ctx.newPromise(); } ctx.write(out.getUnsafe(i), p); } ctx.write(out.getUnsafe(sizeMinusOne), promise); } out.recycle(); } } } /** * 将I msg 对象转换一个或多个对象 写入List<Object> out */ protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception; } /** * 字符串编码器 * 字符串转换为ByteBuf * * {@link ChannelPipeline} pipeline = ...; * * // Decoders * pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80)); * pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); * * // Encoder * pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); * * 设置了以上ChannelOutboundHandler,ctx.write方法就可以直接使用字符串了。 * 由StringDecoder将其转换为ByteBuf * <pre> * void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) { * ch.write("Did you say '" + msg + "'?\n"); * } */ @Sharable public class StringEncoder extends MessageToMessageEncoder<CharSequence> { @Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception { if (msg.length() == 0) { return; } out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset)); } } /** * 字节数组编码器 * 字节数组转换为ByteBuf * * * // Decoders * pipeline.addLast("frameDecoder", * new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); * pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); * * // Encoder * pipeline.addLast("frameEncoder", new {@link LengthFieldPrepender}(4)); * pipeline.addLast("bytesEncoder", new {@link ByteArrayEncoder}()); * </pre> * 设置了以上ChannelOutboundHandler,ctx.write方法就可以直接使用字符串了。 * 由StringDecoder将其转换为ByteBuf * <pre> * void channelRead({@link ChannelHandlerContext} ctx, byte[] bytes) { * ctx.write(new byte []{0x41,0x42}); * } * </pre> */ @Sharable public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> { @Override protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception { out.add(Unpooled.wrappedBuffer(msg)); } } /** * 编码器 :ByteBuf转换为Base64编码格式的ByteBuf * // Decoders * pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.nulDelimiter() )); * pipeline.addLast("base64Decoder", new Base64Decoder ()); * * // Encoder * pipeline.addLast("base64Encoder", new Base64Encoder ()); * </pre> */ @Sharable public class Base64Encoder extends MessageToMessageEncoder<ByteBuf> { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect)); } } /** * 编码器 ,在报文头增加长度字段,长度字段值为报文长度。 * 目的:用于解码时,能分清完整报文 * * 下面使用 new LengthFieldPrepender (2) 编码器对下面12个字节进行编码 * * +----------------+ * | "HELLO, WORLD" | * +----------------+ * </pre> * 编码后: * <pre> * +--------+----------------+ * + 0x000C | "HELLO, WORLD" | * +--------+----------------+ * * 解码时使用LengthFieldBasedFrameDecoder 就可以解出完整报文 */ @Sharable public class LengthFieldPrepender extends MessageToMessageEncoder<ByteBuf> { /** * 构建器 * * @param lengthFieldLength 长度字段占用字节数,只允许 1,2,3,4,8 * * @param lengthAdjustment lengthFieldLength值=实际报文长度基础上或+或-多少字节 ,0 为不调整 * @param lengthIncludesLengthFieldLength lengthFieldLength值 是否包括长度字段本身的字节数 true or false * * @throws IllegalArgumentException * if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8 */ public LengthFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) { this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength); } } /** * 分割符编码器 * * 使用基个分割符做为报文间分融标记,默认使用换行符做为分割符 * 发送报文时,在报文结尾处,增加分割符来标记报文结束 * // Decoders * pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80)); * pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8)); * * // Encoder * pipeline.addLast("lineEncoder", new LineEncoder(LineSeparator.UNIX, CharsetUtil.UTF_8)); * </pre> * <pre> * void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) { * ch.write("Did you say '" + msg + "'?"); * } * </pre> */ @Sharable public class LineEncoder extends MessageToMessageEncoder<CharSequence> { /** * Creates a new instance with the specified line separator and character set. */ public LineEncoder(LineSeparator lineSeparator, Charset charset) { this.charset = ObjectUtil.checkNotNull(charset, "charset"); this.lineSeparator = ObjectUtil.checkNotNull(lineSeparator, "lineSeparator").value().getBytes(charset); } @Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception { ByteBuf buffer = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset, lineSeparator.length); buffer.writeBytes(lineSeparator); out.add(buffer); } }
相关推荐
Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................
这个“netty4.1源码”压缩包包含的是Netty框架4.1版本的源代码,对于深入理解Netty的工作原理、性能优化以及自定义功能扩展非常有帮助。 Netty的核心特性包括: 1. **异步非阻塞I/O**:Netty基于Java NIO(非阻塞I...
这个"Netty-4.1 源码包"包含了Netty框架的源代码,允许开发者深入理解其内部工作原理,优化自定义实现,或者排查问题。 在Netty 4.1 版本中,主要包含以下关键知识点: 1. **NIO (Non-blocking I/O)**: Netty 使用...
在本文中,我们将深入探讨 Netty 4.1 的中文API帮助文档和用户指南,以及如何利用这些资源来提升你的网络编程技能。 首先,Netty 4.1 中文API帮助文档是理解 Netty 内部机制的关键工具。它包含了详细的类、接口、...
这个“netty 4.1 中文.CHM”文件是一个压缩包,包含的是Netty 4.1版本的中文版帮助文档,对于开发者来说是一个非常宝贵的资源,特别是对于那些中文为母语的开发者,它提供了方便的理解和学习Netty的途径。...
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect
这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
Netty-4.1.97.Final源码提供了对Netty内部机制的深度洞察,对于Java程序员尤其是希望提升网络编程能力或进行定制化开发的人来说,是一份极其宝贵的资料。 首先,让我们从整体上了解Netty的架构设计。Netty采用了...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
标题 "netty-netty-4.1.32.final-remark.zip" 提到了 Netty 的版本号 4.1.32.Final,这表明这是一个关于 Netty 4.1.32.Final 版本的资料包。"final" 表示这是该版本的最终发布,通常意味着经过了充分测试和稳定。...
这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码 ...
netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724796&idx=1&sn=ce5dc3c913d464b0e2e4e429a17bb01e&scene=19#wechat_redirect
netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect
netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码 ...
netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724778&idx=1&sn=72e4b1ea5323475b16e99c6720c7069d&scene=19#wechat_redirect
赠送jar包:netty-common-4.1.68.Final.jar; 赠送原API文档:netty-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-common-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.68.Final....