`
情情说
  • 浏览: 39071 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Netty事件监听和处理(下)【有福利】

阅读更多

 

上一篇 介绍了事件监听、责任链模型、socket接口和IO模型、线程模型等基本概念,以及Netty的整体结构,这篇就来说下Netty三大核心模块之一:事件监听和处理。

前面提到,Netty是一个NIO框架,它将IO通道的建立、可读、可写等状态变化,抽象成事件,以责任链的方式进行传递,可以在处理链上插入自定义的Handler,对感兴趣的事件进行监听和处理。

通过介绍,你会了解到:

  • 事件监听和处理模型
  • 事件监听:EventLoop
  • 事件处理:ChannelPipeline和ChannelHandler
  • 使用Netty实现Websocket协议

文章末尾有福利 ~

事件监听和处理模型

进行网络编程时,一般的编写过程是这样的:

  • 创建服务端Socket,监听某个端口;
  • 当有客户端连接时,会创建一个新的客户端Socket,监听数据的可读、可写状态,每一个连接请求都会创建一个客户端Socket;
  • 读取和写入数据都会调用Socket提供的接口,接口列表在上一篇提到过;

传统的模型,每个客户端Socket会创建一个单独的线程监听socket事件,一方面系统可创建的线程数有限,限制了并发数,一方面线程过多,线程切换频繁,导致性能严重下降。

随着操作系统IO模型的发展,可以采用多路复用IO,一个线程监听多个Socket,另外,服务端处理客户端连接,与客户端Socket的监听,可以在不同的线程进行处理。

Netty就是采用多路复用IO进行事件监听,另外,使用不同的线程分别处理客户端的连接、数据读写。

整个处理结构如下图,简单说明下:

  • Boss EventLoopGroup主要处理客户端的connect事件,包含多个EventLoop,每个EventLoop一个线程;
  • Worker EventLoopGroup主要处理客户端Socket的数据read、write事件,包含多个EventLoop,每个EventLoop一个线程;
  • 无论是Boos还是Worker,事件的处理都是通过Channel Pipleline组织的,它是责任链模式的实现,包含一个或多个Handler;
  • 侦听一个端口,只会绑定到Boss EventLoopGroup中的一个Eventloop;
  • Worker EventLoopGroup中的一个Eventloop,可以监听多个客户端Socket;

事件监听和处理模型

EventLoop

一个EventLoop其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改。

EventLoop肩负着两种任务:

  • 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用select等待就绪的IO事件、读写数据与数据的处理等;
  • 第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的;

第一个任务比较好理解,主要解释下第二个:从socket数据到数据处理,再到写入响应数据,Netty都在一个线程中处理,主要是为了线程安全考虑,减少竞争和线程切换,通过任务队列的方式,可以在用户线程提交处理逻辑,在Eventloop中执行。

整个EventLoop干的事情就是select -> processIO -> runAllTask,processIO处理IO事件相关的逻辑,runAllTask处理任务队列中的任务,如果执行的任务过多,会影响IO事件的处理,所以会限制任务处理的时间,整个处理过程如下图:

EventLoop处理过程

EventLoop的run代码如下:

protected void run() {
     for (; ; ) {
         oldWakenUp = wakenUp.getAndSet(false);
         try {
             if (hasTasks()) { //如果有任务,快速返回
                 selectNow();
             } else {
                 select(); //如果没任务,等待事件返回
                 if (wakenUp.get()) {
                     selector.wakeup();
                 }
             }
             cancelledKeys = 0;
             final long ioStartTime = System.nanoTime();
             needsToSelectAgain = false;

             //处理IO事件
             if (selectedKeys != null) {
                 processSelectedKeysOptimized(selectedKeys.flip());
             } else {
                 processSelectedKeysPlain(selector.selectedKeys());
             }

             //计算IO处理时间
             final long ioTime = System.nanoTime() - ioStartTime;
             final int ioRatio = this.ioRatio; //默认为50

             //处理提交的任务
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

             if (isShuttingDown()) {
                 closeAll();
                 if (confirmShutdown()) {
                     break;
                 }
             }
         } catch (Throwable t) {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
             }
         }
     }
 }

ChannelPipeline和ChannelHandler

ChannelPipeline是一个接口,其有一个默认的实现类DefaultChannelPipeline,内部有两个属性:head和tail,
这两者都实现了ChannelHandler接口,对应处理链的头和尾。

 protected DefaultChannelPipeline(Channel channel) {
     this.channel = ObjectUtil.checkNotNull(channel, "channel");
     succeededFuture = new SucceededChannelFuture(channel, null);
     voidPromise =  new VoidChannelPromise(channel, true);

     tail = new TailContext(this);
     head = new HeadContext(this);

     head.next = tail;
     tail.prev = head;
}

每个Channel创建时,会创建一个ChannelPipeline对象,来处理channel的各种事件,可以在运行时动态进行动态修改其中的 ChannelHandler。

ChannelHandler承载业务处理逻辑的地方,我们接触最多的类,可以自定义Handler,加入处理链中,实现自定义逻辑。

ChannelHandler 可分为两大类:ChannelInboundHandler 和 ChannelOutboundHandle,这两接口分别对应入站和出站消息的处理,对应数据读取和数据写入。它提供了接口方法供我们实现,处理各种事件。

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}

自定义Handler时,一般继承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。

需要注意的是,不建议在 ChannelHandler 中直接实现耗时或阻塞的操作,因为这可能会阻塞 Netty 工作线程,导致 Netty 无法及时响应 IO 处理。

ChannelPipline

使用Netty实现Websocket协议

Websocket协议

不是本篇的重点,简单说明下:

  • 是一种长连接协议,大部分浏览器都支持,通过websocket,服务端可以主动发消息给客户端;
  • Websocket协议,在握手阶段使用HTTP协议,握手完成之后,走Websocket自己的协议;
  • Websocket是一种二进制协议;
初始化

Netty提供了ChannelInitializer类方便我们初始化,创建WebSocketServerInitializer类,继承ChannelInitializer类,用于添加ChannelHandler:

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Resource
    private CustomTextFrameHandler customTextFrameHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("codec-http", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

        pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
        pipeline.addLast("custome-handler", customTextFrameHandler);
    }
}

分析下这几个Handler,都是Netty默认提供的:

  • HttpServerCodec:用于解析Http请求,主要在握手阶段进行处理;
  • HttpObjectAggregator:用于合并Http请求头和请求体,主要在握手阶段进行处理;
  • WebSocketServerProtocolHandler:处理Websocket协议;
  • CustomTextFrameHandler:自定义的Handler,用于添加自己的业务逻辑。

是不是很方便,经过WebSocketServerProtocolHandler处理后,读取出来的就是文本数据了,不用自己处理数据合包、拆包问题。

CustomTextFrameHandler

自定义的Handler,进行业务处理:

public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        final String content = frame.text();
        System.out.println("接收到数据:"+content);   

        // 回复数据
        TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的数据");
        if (ctx.channel().isWritable()) {
              ChannelFuture future = ctx.writeAndFlush(respFrame);
          }                    
    }
}

福利说明

最后,说下福利:小爱音箱F码。

准备了2份,主要为了感谢「微信公众号」和「掘金社区」的朋友,每一份包括1个小爱音箱F码和1个小爱音箱 mini F码。

小米手机F码源自于英文单词”Friend”,是小米公司提供给小米核心用户及为小米做出贡献的网友的优先购买权,如果您有小米F码的话无需等待即可直接利用小米F码购买相关产品!

简单来说,F码就是不用抢了,可以直接购买 ~

抽奖截止时间

4月9号中午12点

抽奖规则
掘金社区
  • 需要关注我的掘金账号才有效,个人主页

  • 使用微信抽奖助手随机抽取for掘金社区;
    掘金抽奖

微信公众号
  • 需要关注我的微信公众号才有效;
    情情说

  • 使用微信抽奖助手随机抽取for微信公众号;
    微信公众号抽奖

 

1
0
分享到:
评论

相关推荐

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...

    java应用netty服务端和客户端

    Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    总的来说,这个项目展示了如何利用Netty的强大功能来处理复杂网络环境下的通信需求。通过Netty,我们可以优雅地实现DTU协议的TCP服务器,同时支持多端口通信和多种协议解析,这对于IoT应用和远程监控系统等场景非常...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。它提供了丰富的API,支持多种传输类型,如TCP、UDP和HTTP等,使得开发者能够方便地构建网络应用程序。Netty在Java...

    Android使用Netty网络框架实践(客户端、服务端)

    Netty是一个异步事件驱动的网络应用程序框架,它为高性能、高可用性的网络服务器和客户端提供了一种简单易用的方式。本实践将详细介绍如何在Android环境中使用Netty进行客户端和服务端的通信。 首先,我们需要理解...

    使用netty进行rtsp服务端开发.zip

    Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何使用Netty进行RTSP(Real Time Streaming Protocol)服务端的开发,以及如何处理H264...

    netty搭建TCP、UDP服务

    使用Netty,你可以创建服务器Bootstrap,配置监听端口,然后绑定一个ChannelHandler来处理接收到的连接和数据。客户端则通过ClientBootstrap建立到服务器的连接,同样可以设置ChannelHandler来处理响应。 在UDP...

    Netty服务端和客户端调用demo

    Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的Java应用。这个demo包含了Netty服务器(NettyServer)和Netty客户端(NettyClient),下面我们将详细探讨这两个组件以及它们如何协同...

    安卓基于netty4.x心跳,断线重连,状态监听,数据发送

    安卓基于netty4.x心跳,断线重连,状态监听,数据发送

    Netty4事件处理传播机制,java高级开发工程师要求(csdn)————程序.pdf

    当一个事件(如连接建立、数据接收等)发生时,它会从头节点开始沿着链路向下传递,每个处理器有机会处理事件或将其传递给下一个处理器。这种模式允许灵活地组合和定制网络应用的逻辑,使得事件处理可以被分解到多个...

    Netty 使用Disruptor机制的处理源代码

    4. **启动 Disruptor**:启动 Disruptor,它将开始接收和处理事件。 5. **发布事件**:在 Netty 的 ChannelHandlerContext 中,使用 Disruptor 的发布方法将事件放入环形缓冲区。 文件名 "trapos-master" 可能是指...

    Netty5.0架构剖析和源码解读.pdf

    此外,Pipeline(管道)组件是Handler链的容器,负责事件的传播和处理,它使得Netty具有高度的可扩展性和灵活性。 另外,Netty的零拷贝特性也是其性能优秀的重要原因。通过利用Java NIO的FileChannel.transferTo()...

    netty案例,netty4.1中级拓展篇十《Netty接收发送多种协议消息类型的通信处理方案》源码

    netty案例,netty4.1中级拓展篇十《Netty接收发送多种协议消息类型的通信处理方案》源码 ...

    Netty 消息接收和线程处理模型

    在串行模式中,消息接收和处理都是在同一个线程中完成的,而在并行模式中,消息接收和处理是分离的,处理部分被投递到后台线程池中执行。 从性能角度来看,串行模式由于不存在线程上下文切换,因此处理效率更高。...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty与RocketMQ的关联之前,我们先来详细了解一下Netty的基本概念和核心特性。 Netty的核心设计...

    淘宝netty例子以及原理

    Bootstrap负责初始化和配置网络连接,Channel是网络I/O操作的抽象,EventLoop负责监听和处理事件,而Pipeline则是一系列处理器的链,每个处理器处理特定类型的事件或数据。 Netty的非阻塞I/O模型允许它在高并发环境...

    netty客户端和服务端通讯

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本项目中,"netty客户端和服务端通讯" 实现了基于 Netty 的通信机制,允许客户端与服务端进行高效的数据...

    Netty 文件上传获取进度条

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络服务。在处理文件上传时,为了提供良好的用户体验,通常需要实现文件上传的进度条功能,让用户能够实时了解文件传输的状态。在...

    netty-socketio api接口文档.7z

    5. **SocketIO事件**:文档会详细介绍SocketIO的各种事件,如连接建立、断开、消息接收和发送等,以及如何注册监听器来处理这些事件。 6. **数据编码与解码**:Netty-SocketIO提供了编码器和解码器来处理SocketIO的...

    Netty一个端口支持tcp和websocket

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 实现一个端口同时支持 TCP 和 WebSocket,从而减少资源消耗并简化...

Global site tag (gtag.js) - Google Analytics