`

netty 源码阅读 及 future promise学习

 
阅读更多

 

netty文档说明netty的网络操作都是async的, 在源码上大量使用了future, promise这种类,自己在js框架中也看到了很多future的使用,以前不太明白,这次好好学学。

 

 

wiki里面写到  a future is a read-only placeholder view of a variable, while a promise is a writable

 

在netty里面也是这样定义的, 

future接口定义了isSuccess(),isCancellable(),cause(),这些判断异步执行状态的方法。(read-only)

Promise接口在extneds future的基础上增加了setSuccess(), setFailure()这些方法。(writable)

 

future接口就是用来封装异步操作的执行状态的,在执行异步操作时,可以立马返回一个future,future可以sync来等待执行结果,如果有些操作要等到future代表的异步操作完了才能执行,可以通过future.addListener()的方式来在之前的异步操作完成的时候执行新的操作。

 

外界看到的是future,内部其实是promise,异步的时候一般是初始线程代码里封装一个runnable,new一个promise,  把runnable放到其他线程池去执行,执行的时候把promise对象传进去(通过final关键字),在run()方法里结束时去改变promise的状态或设置result value。初始线程在设置异步操作时可以立马返回future(其实是promise),可以根据情况选择future.sync()来同步等待结果或者future.addListener()来设置异步操作。 

 

 

下面结合netty的源码来具体看一个列子, netty version 4.0.27

 

	public void start() throws Exception {
		EventLoopGroup group = new NioEventLoopGroup(); // 3
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(group) // 4
					.channel(NioServerSocketChannel.class) // 5
					.localAddress(new InetSocketAddress(port)) // 6
					.childHandler(new ChannelInitializer<SocketChannel>() {// 7
								@Override
								public void initChannel(SocketChannel ch)
										throws Exception {
									ch.pipeline().addLast(
											new EchoServerHandler());
								}
							});
			ChannelFuture f = b.bind().sync(); // 8
			f.channel().closeFuture().sync(); // 9
		} finally {
			group.shutdownGracefully().sync(); // 10
		}
	}

	这段代码用过netty的应该很熟悉了,服务器启动的代码, 其中注释8的地方返回了一个future对象。来看看里面发生了什么:
	几番跳转来到了AbstractBootstrap的doBind方法。


    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();  //1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {         //2
            return regFuture;
        }

        if (regFuture.isDone()) {         //3
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();  //5
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);  //5
            regFuture.addListener(new ChannelFutureListener() {    //4
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);  //1
                }
            });
            return promise;
        }
    }

    上面这段代码里
    1.开始会去注册initAndRegister(), 后面才会去bind  doBind0()。
    2.最开始拿到注册操作的future时就做了一次失败的判断,因为假如注册失败后面就没必要做了。
    3.这里检查了下future有没有做完,假如做完了这里就可以顺序执行,不用异步了。
    4.在没有做完,必须异步的情况下,因为之前的future就是异步的,没做完的,这时接下来的bind操作也只能是异步的,采用Listener的方式,最后也是返回个future(promise)
    5.同步执行和异步执行的情况下都返回的是promise,  promise只是个result的placeholder,所以同步的情况下可以用。


    final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }


    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {  //1
            register0(promise);
        } else {
            try {
                eventLoop.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

    上面这段
    1. 判断eventloop是不是当前运行线程,是就直接做。不是就放到队列里异步做。



    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (firstRegistration && isActive()) {
                pipeline.fireChannelActive();
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

    上面的代码就是真正做Register操作的代码了,这里全是同步的代码,做完了后设置promise的状态(safeSetSuccess(promise))。

注册操作 和 bind操作 都可能放到eventloop里面去异步执行, 但他们之间是有顺序的, 因为他们做为task是放到同一个eventloop的queue里去,是一个一个fifo的执行的,注册先进去先执行。
这样异步模型就简单了,不用考虑竞争,锁啊,变成串行执行。

 

 

 

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    部门绩效考核评价表excel.xls

    部门绩效考核评价表excel

    全面的公司行政费用统计表.xls

    全面的公司行政费用统计表

    视觉跟踪算法综述.pdf

    视觉跟踪算法综述.pdf

    CMD 命令行高级教程精选合编

    CMD 命令行高级教程精选合编

    apr-devel-1.4.8-7.el7.x64-86.rpm.tar.gz

    1、文件内容:apr-devel-1.4.8-7.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/apr-devel-1.4.8-7.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装

    10-4-生产主管绩效考核表(自动计算、等级评价).xlsx

    10-4-生产主管绩效考核表(自动计算、等级评价)

    深度学习python基础(第三节) 函数、列表

    深度学习python基础(第三节) 函数、列表

    岗位绩效考核评定表excel表格模板.xlsx

    岗位绩效考核评定表excel表格模板

    成品库仓管员绩效考核表.xls

    成品库仓管员绩效考核表

    环卫业务 基础知识培训(小步创想)PPT(133页).pptx

    一、智慧环卫管理平台的建设背景与目标 智慧环卫管理平台的建设源于对环卫管理全面升级的需求。当前,城管局已拥有139辆配备车载GPS系统、摄像头和油耗传感器的环卫车辆,但环卫人员尚未配备智能移动终端,公厕也缺乏信息化系统和智能终端设备。为了提升环卫作业效率、实现精细化管理并节省开支,智慧环卫管理平台应运而生。该平台旨在通过信息化技术和软硬件设备,如车载智能终端和环卫手机App,实时了解环卫人员、车辆的工作状态、信息和历史记录,使环卫作业管理透明化、精细化。同时,平台还期望通过数据模型搭建和数据研读,实现更合理的环卫动态资源配置,为环卫工作的科学、健康、持续发展提供决策支持。 二、智慧环卫管理平台的建设内容与功能 智慧环卫管理平台的建设内容包括运行机制体制建设、业务流程设计、智慧公厕系统建设、网络建设、主机和储存平台需求、平台运维管理体系、硬件标准规范体系以及考核评价体系等多个方面。其中,智慧公厕系统建设尤为关键,它能实时监控公厕运行状态,保障公厕的清洁和正常运行。平台建设还充分利用了现有的电子政务网络资源,并考虑了有线和无线网络的需求。在功能上,平台通过普查、整合等手段全面收集环卫车辆、企业、人员、设施、设备等数据,建立智慧环卫基础数据库。利用智能传感、卫星定位等技术实现环卫作业的在线监管和远程监控,实现对道路、公共场所等的作业状况和卫生状况的全面监管。此外,平台还建立了环卫作业网格化管理责任机制,实现从作业过程到结果的全面监管,科学评价区域、部门、单位和人员的作业效果。 三、智慧环卫管理平台的效益与风险规避 智慧环卫管理平台的建设将带来显著的环境、经济和管理效益。环境方面,它将有力推进环境卫生监管服务工作,改善环境卫生状况,为人民群众创造更加清洁、卫生的工作和生活环境。经济方面,通过智慧化监管,大大降低了传统管理手段的成本,提高了监管的准确性和效率。管理方面,平台能够追踪溯源市民反映的问题,如公厕异味、渣土车辆抛洒等,并找到相应的责任单位进行处置,防止类似事件再次发生。同时,平台还拥有强大的预警机制功能,能够在很多环卫问题尚未出现前进行处置。然而,平台建设也面临一定的风险,如部门协调、配合问题,建设单位选择风险以及不可预测的自然灾害等。为了规避这些风险,需要加强领导、统一思想,选择优秀的系统集成商承接项目建设,并做好计算机和应用系统的培训工作。同时,也要注意标准制定工作和相关法律法规的制定工作,以保证系统建设完成后能够真正为环卫管理工作带来便利。

    基于平衡计分卡绩效考核表(管理高层)模板.xls

    基于平衡计分卡绩效考核表(管理高层)模板

    网站运营各部门绩效考核表.xls

    网站运营各部门绩效考核表

    XX公司行政部绩效考核指标.xls

    XX公司行政部绩效考核指标

    基于齿向修形的抛物线锥齿轮仿真分析.pdf

    基于齿向修形的抛物线锥齿轮仿真分析.pdf

    三相半桥逆变器低电压穿越控制策略设计:两级式光伏并网系统电路原理与容量优化报告,两级式光伏并网系统及其低电压穿越控制策略设计,容量30kW 三相半桥逆变器,boost电路作前级 带低电压穿越,有一

    三相半桥逆变器低电压穿越控制策略设计:两级式光伏并网系统电路原理与容量优化报告,两级式光伏并网系统及其低电压穿越控制策略设计,容量30kW。 三相半桥逆变器,boost电路作前级。 带低电压穿越,有一万七千字的报告,没有水文字。 报告内容,电路原理,pi参数设计,bode和根轨迹分析,波形良好 ,关键词:两级式光伏并网系统;低电压穿越控制策略;30kW容量;三相半桥逆变器;boost电路;前级设计;低电压穿越功能;报告内容;电路原理;PI参数设计;Bode和根轨迹分析;波形良好。,基于30kW容量两级式光伏并网系统的控制策略设计:低电压穿越及高效逆变技术研究

    毕业设计文本预测项目python源码+托尔斯泰《战争与和平》文本分析数据集-最新出炉.zip

    毕业设计文本预测项目python源码+托尔斯泰《战争与和平》文本分析数据集-最新出炉 关于数据集 背景: 该数据集包含列夫·托尔斯泰的《战争与和平》的全文,这是一部于 1869 年出版的开创性文学作品。作为公共领域文本,它为对文学分析、自然语言处理和历史研究感兴趣的研究人员和爱好者提供了丰富的资源。这部小说以俄国拿破仑战争为背景,探讨了战争、和平和人类状况的主题。 内容: 数据集由一个纯文本文件组成,其中包含《战争与和平》的完整叙述。文本已进行预处理,以方便分析和建模,使其适用于各种应用,包括文本挖掘、情感分析和机器学习项目。该文件可通过以下链接访问:战争与和平文本数据集。

    18 -广告部经理绩效考核表1.xlsx

    18 -广告部经理绩效考核表1

    永磁同步电机电流内环PR控制Simulink仿真模型:转速电流双闭环矢量控制,波形完美带原理说明与文献参考,永磁同步电机电流内环PR控制Matlab simulink仿真模型,参数已设置好,可直接运行

    永磁同步电机电流内环PR控制Simulink仿真模型:转速电流双闭环矢量控制,波形完美带原理说明与文献参考,永磁同步电机电流内环PR控制Matlab simulink仿真模型,参数已设置好,可直接运行。 属于PMSM转速电流双闭环矢量控制系统模型。 电流内环采用PR控制器,不需要旋转坐标变,在静止坐标下进行矢量控制,转速外环采用PI控制器。 波形完美,包含原理说明文档和参考文献。 ,关键词:永磁同步电机;电流内环PR控制;Matlab simulink仿真模型;PMSM转速电流双闭环矢量控制系统;PR控制器;PI控制器;波形完美;原理说明文档;参考文献。,"基于PR控制的永磁同步电机电流内环仿真模型:静止坐标矢量控制与波形解析"

    基于主从博弈理论的共享储能与综合能源微网优化运行策略研究:Stackelberg均衡下的优化调度与运行框架,基于主从博弈理论的共享储能与综合能源微网优化运行研究 关键词:主从博弈 共享储能 综合能源微

    基于主从博弈理论的共享储能与综合能源微网优化运行策略研究:Stackelberg均衡下的优化调度与运行框架,基于主从博弈理论的共享储能与综合能源微网优化运行研究 关键词:主从博弈 共享储能 综合能源微网 优化调度 参考文档:《基于主从博弈理论的共享储能与综合能源微网优化运行研究》完全复现 仿真平台:MATLAB yalmip+cplex 主要内容:代码主要做的是基于主从博弈理论的共享储能与综合能源微网优化运行研究,首先介绍了系统运行框架,分析了系统内各利益体的功能。 其次,分别针对微网运营商、共享储能服务商以及用户聚合商建立优化运行模型。 进一步,分析了微网运营商与用户聚合商间的博弈关系,提出共享储能背景下微网运营商与用户聚合商间的 Stackelberg 博弈模型,并证明Stackelberg 均衡解的存在性与唯一性。 最后,在 MATLAB平台上进行算例仿真,通过 Yalmip 工具与 CPLEX 求解器进行建模与求解,利用启发式算法与求解器相结合的方法优化微网运营商与用户聚合商的策略。 结果表明,本文所提模型所提模型不仅能有效权衡微网运营商与用户聚合商的利益,也实现了用户聚合商

Global site tag (gtag.js) - Google Analytics