`
iwinit
  • 浏览: 456159 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Netty之四 Client请求处理

阅读更多

前2篇分析了echo server端的运行机制,本篇同样以echo client为例,分析netty的nio客户端的运行机制。

总体来说client端和server端的处理是类似的,NioWorker是重用的,也就意味着client和server的读写机制是一样的,都是通过worker线程来管理的。所不同的是Boss线程,server端的boss线程一个bind端口起一个,主要负责接收新请求,而client端的boss线程是一个可配置的数组,一个connect端口分配一个,主要负责connect过程,如果connect成功则将channle注册到worker线程中处理。在Client同样有PipelineSink,叫做NioClientSocketPipelineSink,也是负责底层IO和pipeline之间的交互。

EchoClient代码:

 

        // 初始化Bootstrap和NioClientSocketChannelFactory,这一步将启动nioWorker线程,并初始化NioClientSocketPipelineSink,并将Boss线程创建
          ClientBootstrap bootstrap = new ClientBootstrap(
                  new NioClientSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          Executors.newCachedThreadPool()));
  
          // 用户自定义的pipeline工厂
          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
              public ChannelPipeline getPipeline() throws Exception {
                  return Channels.pipeline(
                          new EchoClientHandler(firstMessageSize));
              }
          });
  
          // 异步创建连接
          ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
  
          //等待连接关闭
          future.getChannel().getCloseFuture().awaitUninterruptibly();
  
          // 关闭资源,线程池等
          bootstrap.releaseExternalResources();

具体connect过程:

一.创建client的channel

 

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
	......
	//拿用户自定义的pipeline
        ChannelPipeline pipeline;
        try {
            pipeline = getPipelineFactory().getPipeline();
        } catch (Exception e) {
            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
        }

        // 从ChannelFactory中,创建Channel,对于client来说factory是NioClientSocketChannelFactory,Channel是NioClientSocketChannel
        Channel ch = getFactory().newChannel(pipeline);

        // 通过channel连接
        return ch.connect(remoteAddress);
    }

 二.创建channel时,分配worker

 

 public SocketChannel newChannel(ChannelPipeline pipeline) {
        return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
    }

 三.创建内部的SocketChannel,触发ChannelOpen事件,不过echo client的handler没有对channelOpen事件做处理

 

NioClientSocketChannel(
            ChannelFactory factory, ChannelPipeline pipeline,
            ChannelSink sink, NioWorker worker) {
	//创建socketChannel,并配置成异步模式
        super(null, factory, pipeline, sink, newSocket(), worker);
        //触发open事件
        fireChannelOpen(this);
    }

 四.创建socketChannel过程

 

private static SocketChannel newSocket() {
        SocketChannel socket;
	//创建SocketChannel
        try {
            socket = SocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }

        boolean success = false;
	//使用异步模式
        try {
            socket.configureBlocking(false);
            success = true;
        } catch (IOException e) {
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        } finally {
           ......
        }

        return socket;
    }

 五.通过Channel进行connect

 

 public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
	//是否连接成功的future
        ChannelFuture future = future(channel, true);
	//触发connected的downstream事件,对于echo client来说,由于没有downstream handler,所以直接被PipelineSink处理了
        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
                channel, future, ChannelState.CONNECTED, remoteAddress));
        return future;
    }

 六.NioClientSocketPipelineSink拿到CONNECTED的downstream事件,处理connect

 

case CONNECTED:
                if (value != null) {
                    connect(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
private void connect(
            final NioClientSocketChannel channel, final ChannelFuture cf,
            SocketAddress remoteAddress) {
        try {
	    //尝试连接,如果成功,则直接将channel注册到worker线程中
            if (channel.channel.connect(remoteAddress)) {
                channel.worker.register(channel, cf);
            } 
	    //尝试连接失败,则将channel注册到某个boss线程中处理,该boss线程会接管该channel的connect过程
	    else {
                channel.getCloseFuture().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f)
                            throws Exception {
                        if (!cf.isDone()) {
                            cf.setFailure(new ClosedChannelException());
                        }
                    }
                });
                cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                channel.connectFuture = cf;
                nextBoss().register(channel);
            }

        } catch (Throwable t) {
            cf.setFailure(t);
            fireExceptionCaught(channel, t);
            channel.worker.close(channel, succeededFuture(channel));
        }
    }

七.具体register过程

void register(NioClientSocketChannel channel) {
	    //client的注册任务,让boss线程监听connect事件
            Runnable registerTask = new RegisterTask(this, channel);
            Selector selector;

            synchronized (startStopLock) {
	        //初始化boss线程,并启动之
                if (!started) {
                    // 
                    try {
			//创建一个Selector
                        this.selector = selector =  Selector.open();
                    } catch (Throwable t) {
                        throw new ChannelException(
                                "Failed to create a selector.", t);
                    }

                    // 启动boss线程
                    boolean success = false;
                    try {
                        DeadLockProofWorker.start(bossExecutor,
                                new ThreadRenamingRunnable(this,
                                        "New I/O client boss #" + id + '-' + subId));

                        success = true;
                    } finally {
		......

                started = true;
		//异步提交注册任务
                boolean offered = registerTaskQueue.offer(registerTask);
                assert offered;
            }
	    //根据超时时间,启动一个超时提醒任务
            int timeout = channel.getConfig().getConnectTimeoutMillis();
            if (timeout > 0) {
                if (!channel.isConnected()) {
                    channel.timoutTimer = timer.newTimeout(wakeupTask,
                            timeout, TimeUnit.MILLISECONDS);
                }
            }

        }

八.register之后,主线程就返回了,Boss线程异步执行

 

    public void run() {
            boolean shutdown = false;
            int selectReturnsImmediately = 0;

            Selector selector = this.selector;

            // use 80% of the timeout for measure
            final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
            boolean wakenupFromLoop = false;
            for (;;) {
                wakenUp.set(false);

                try {
                    long beforeSelect = System.nanoTime();
		    //select,500ms超时
                    int selected = SelectorUtil.select(selector);
		    .......
		    //处理注册任务,将channel注册到自己的selector范围中
                    processRegisterTaskQueue();
		    //处理IO就绪事件,这里是connect事件
                    processSelectedKeys(selector.selectedKeys());

                    // 处理超时,如果超时,则设置future失败
                    long currentTimeNanos = System.nanoTime();
                    processConnectTimeout(selector.keys(), currentTimeNanos);
                   ......

        }

 

 九.注册任务执行

 

public void run() {
            try {
		//注册一个connect key到boss的selector上
                channel.channel.register(
                        boss.selector, SelectionKey.OP_CONNECT, channel);
            } catch (ClosedChannelException e) {
                channel.worker.close(channel, succeededFuture(channel));
            }
	    //设置超时点
            int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
            if (connectTimeout > 0) {
                channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
            }
        }

 

 十.boss处理IO,client是connect就绪

 

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
           ......
            for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
                SelectionKey k = i.next();
                i.remove();

                if (!k.isValid()) {
                    close(k);
                    continue;
                }

                try {
		    //如果connect就位,判断connect是否确实完成
                    if (k.isConnectable()) {
                        connect(k);
                    }
                } catch (Throwable t) {
                   ......
                }
            }
        }
private void connect(SelectionKey k) throws IOException {
            NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
	    //如果连接成功,则将channel注册上worker线程中处理读写
            if (ch.channel.finishConnect()) {
                k.cancel();
                if (ch.timoutTimer != null) {
                    ch.timoutTimer.cancel();
                }
                ch.worker.register(ch, ch.connectFuture);
            }
        }

 

 十一.注册过程

 

public void run() {
           ......
		//默认监听read事件
                synchronized (channel.interestOpsLock) {
                    channel.channel.register(
                            selector, channel.getRawInterestOps(), channel);
                }
		//通知等待线程成功
                if (future != null) {
                    channel.setConnected();
                    future.setSuccess();
                }
                ......
		//触发ChannelConnected事件
                fireChannelConnected(channel, remoteAddress);
            ......
            }

        }

 十二.echo client handler处理channelConnected事件,开始发送数据

 

 public void channelConnected(
              ChannelHandlerContext ctx, ChannelStateEvent e) {
          // Send the first message.  Server will not send anything here
          // because the firstMessage's capacity is 0.
          e.getChannel().write(firstMessage);
      }

 十三.write之后的过程client端和server是一样的,可以参考前一篇server端的读写

 

 

分享到:
评论

相关推荐

    uniapp实战商城类app和小程序源码​​​​​​.rar

    uniapp实战商城类app和小程序源码,包含后端API源码和交互完整源码。

    PHP进阶系列之Swoole入门精讲(课程视频)

    本课程是 PHP 进阶系列之 Swoole 入门精讲,系统讲解 Swoole 在 PHP 高性能开发中的应用,涵盖 协程、异步编程、WebSocket、TCP/UDP 通信、任务投递、定时器等核心功能。通过理论解析和实战案例相结合,帮助开发者掌握 Swoole 的基本使用方法及其在高并发场景下的应用。 适用人群: 适合 有一定 PHP 基础的开发者、希望提升后端性能优化能力的工程师,以及 对高并发、异步编程感兴趣的学习者。 能学到什么: 掌握 Swoole 基础——理解 Swoole 的核心概念,如协程、异步编程、事件驱动等。 高并发处理——学习如何使用 Swoole 构建高并发的 Web 服务器、TCP/UDP 服务器。 实战项目经验——通过案例实践,掌握 Swoole 在 WebSocket、消息队列、微服务等场景的应用。 阅读建议: 建议先掌握 PHP 基础,了解 HTTP 服务器和并发处理相关概念。学习过程中,结合 官方文档和实际项目 进行实践,加深理解,逐步提升 Swoole 开发能力。

    matlab齿轮-轴-轴承系统含间隙非线性动力学 基于matlab的齿轮-轴-轴承系统的含间隙非线性动力学模型,根据牛顿第二定律,建立齿轮系统啮合的非线性动力学方程,同时也主要应用修正Capone模

    matlab齿轮-轴-轴承系统含间隙非线性动力学 基于matlab的齿轮-轴-轴承系统的含间隙非线性动力学模型,根据牛顿第二定律,建立齿轮系统啮合的非线性动力学方程,同时也主要应用修正Capone模型的滑动轴承无量纲化雷诺方程,利用这些方程推到公式建模;用MATLAB求解画出位移-速度图像,从而得到系统在不同转速下的混沌特性,分析齿轮-滑动轴承系统的动态特性 程序已调通,可直接运行 ,关键词:Matlab;齿轮-轴-轴承系统;含间隙非线性动力学;牛顿第二定律;动力学方程;修正Capone模型;无量纲化雷诺方程;位移-速度图像;混沌特性;动态特性。,基于Matlab的齿轮-轴-轴承系统非线性动力学建模与混沌特性分析

    2024年移动应用隐私安全观测报告.pdf

    2024年移动应用隐私安全观测报告.pdf

    基于Springboot框架的电影评论网站系统设计与实现(Java项目编程实战+完整源码+毕设文档+sql文件+学习练手好项目).zip

    本电影评论网站管理员和用户。管理员功能有个人中心,用户管理,电影类别管理,电影信息管理,留言板管理,论坛交流,系统管理等。用户可以对电影进行评论。因而具有一定的实用性。本站是一个B/S模式系统,采用SSM框架,MYSQL数据库设计开发,充分保证系统的稳定性。系统具有界面清晰、操作简单,功能齐全的特点,使得电影评论网站管理工作系统化、规范化。 本系统的使用使管理人员从繁重的工作中解脱出来,实现无纸化办公,能够有效的提高电影评论网站管理效率。 关键词:电影评论网站;SSM框架;MYSQL数据库 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术 2 2.1 MYSQL数据库 2 2.2 B/S结构 3 2.3 Spring Boot框架简介 4 3系统分析 4 3.1可行性分析 4 3.1.1技术可行性 4 3.1.2经济可行性 5 3.1.3操作可行性 5 3.2系统性能分析 5 3.2.1 系统安全性 5 3.2.2 数据完整性 6 3.3系统界面分析 6 3.4系统流程和逻辑 7 4系统概要设计 8 4.1概述 8 4.2系统结构 9 4.

    2023-04-06-项目笔记 - 第四百三十六阶段 - 4.4.2.434全局变量的作用域-434 -2025.03.13

    2023-04-06-项目笔记-第四百三十六阶段-课前小分享_小分享1.坚持提交gitee 小分享2.作业中提交代码 小分享3.写代码注意代码风格 4.3.1变量的使用 4.4变量的作用域与生命周期 4.4.1局部变量的作用域 4.4.2全局变量的作用域 4.4.2.1全局变量的作用域_1 4.4.2.434局变量的作用域_434- 2025-03-13

    基于STM32的流量计智能流速流量监测、水泵报警系统(泵启动 1100027-基于STM32的流量计智能流速流量监测、水泵报警系统(泵启动、阈值设置、LCD1602、超阈值报警、proteus) 功

    基于STM32的流量计智能流速流量监测、水泵报警系统(泵启动 1100027-基于STM32的流量计智能流速流量监测、水泵报警系统(泵启动、阈值设置、LCD1602、超阈值报警、proteus) 功能描述: 基于STM32F103C8单片机实现的智能流速、流量,流量计设计 实现的功能是通过信号发生器模拟齿轮传感器,检测流量的大小,同时计算流过液体的总容量 可以设置最大流过的总容量,当超过设定值后通过蜂鸣器与LED灯指示 当没有超过则启动水泵控制电路带动液体流动 1、流速检测 2、流量统计 3、阈值显示与设置(通过按键实现阈值的调节或清零) 4、水泵启动 5、超阈值报警 有哪些资料: 1、仿真工程文件 2、PCB工程文件 3、原理图工程文件 4、源代码 ,核心关键词: 基于STM32的流量计; 智能流速流量监测; 水泵报警系统; 阈值设置; LCD1602; 超阈值报警; Proteus仿真; STM32F103C8单片机; 齿轮传感器; 信号发生器; 流量统计; 蜂鸣器与LED灯指示; 水泵控制电路。,基于STM32的智能流量监测与报警系统(阈值可调、流速与流量监

    (灰度场景下的平面、海底、船、受害者)图像分类数据集【已标注,约1100张数据】

    (灰度场景下的平面、海底、船、受害者)图像分类数据集【已标注,约1100张数据】 数据经过预处理,可以直接作为分类网络输入使用 分类个数【4】:平面、海底、船、受害者【具体查看json文件】 划分了训练集、测试集。存放各自的同一类数据图片。如果想可视化数据集,可以运行资源中的show脚本。 图像分类、分割网络改进:https://blog.csdn.net/qq_44886601/category_12858320.html 计算机视觉完整项目:https://blog.csdn.net/qq_44886601/category_12816068.html

    arkime无geo下的oui文件

    arkime无geo下的oui文件

    图像处理_人脸识别_数据库连接_教学与部署_1741771164.zip

    人脸识别项目实战

    机器人视觉导航_OrbSlam2_语义地图_智能交互与探索_1741771547.zip

    人脸识别项目实战

    CAD 2025 二次开发dll

    CAD 2025 二次开发dll

    人脸识别_Facenet_人脸向量数据库存储_用户识别与匹配_1741777703.zip

    人脸识别项目源码实战

    数据结构_C语言_代码实现_学习复习用途_1741859029.zip

    c语言学习

    基于扩张状态观测器eso扰动补偿和权重因子调节的电流预测控制,相比传统方法,增加了参数鲁棒性 降低电流脉动,和误差 基于扩张状态观测器eso补偿的三矢量模型预测控制 ,基于扩张状态观测器; 扰动补

    基于扩张状态观测器eso扰动补偿和权重因子调节的电流预测控制,相比传统方法,增加了参数鲁棒性 降低电流脉动,和误差 基于扩张状态观测器eso补偿的三矢量模型预测控制 ,基于扩张状态观测器; 扰动补偿; 权重因子调节; 电流预测控制; 参数鲁棒性; 电流脉动降低; 误差降低; 三矢量模型预测控制,基于鲁棒性增强和扰动补偿的电流预测控制方法

    计算机科学_C语言_数据结构_航班信息管理系统_1741863572.zip

    c语言学习

    UE开发教程与学习方法记录.zip

    UE开发教程与学习方法记录.zip

    智慧农批园区综合整体建设方案PPT(63页).pptx

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    VSCodeUserSetup-x64-1.98.0.rar

    VSCodeUserSetup-x64-1.98.0.rar vscode是一种简化且高效的代码编辑器,同时支持诸如调试,任务执行和版本管理之类的开发操作。它的目标是提供一种快速的编码编译调试工具。然后将其余部分留给IDE。vscode集成了所有一款现代编辑器所应该具备的特性,包括语法高亮、可定制的热键绑定、括号匹配、以及代码片段收集等。 Visual Studio Code(简称VSCode)是Microsoft开发的代码编辑器,它支持Windows,Linux和macOS等操作系统以及开源代码。它支持测试,并具有内置的Git版本控制功能以及开发环境功能,例如代码完成(类似于IntelliSense),代码段和代码重构等。编辑器支持用户定制的配置,例如仍在编辑器中时,可以更改各种属性和参数,例如主题颜色,键盘快捷键等,内置的扩展程序管理功能。

Global site tag (gtag.js) - Google Analytics