`

netty 源码阅读 及 future promise学习

 
阅读更多

 

netty文档说明netty的网络操作都是async的, 在源码上大量使用了future, promise这种类,自己在js框架中也看到了很多future的使用,以前不太明白,这次好好学学。

 

 

wiki里面写到  a future is a read-only placeholder view of a variable, while a promise is a writable

 

在netty里面也是这样定义的, 

future接口定义了isSuccess(),isCancellable(),cause(),这些判断异步执行状态的方法。(read-only)

Promise接口在extneds future的基础上增加了setSuccess(), setFailure()这些方法。(writable)

 

future接口就是用来封装异步操作的执行状态的,在执行异步操作时,可以立马返回一个future,future可以sync来等待执行结果,如果有些操作要等到future代表的异步操作完了才能执行,可以通过future.addListener()的方式来在之前的异步操作完成的时候执行新的操作。

 

外界看到的是future,内部其实是promise,异步的时候一般是初始线程代码里封装一个runnable,new一个promise,  把runnable放到其他线程池去执行,执行的时候把promise对象传进去(通过final关键字),在run()方法里结束时去改变promise的状态或设置result value。初始线程在设置异步操作时可以立马返回future(其实是promise),可以根据情况选择future.sync()来同步等待结果或者future.addListener()来设置异步操作。 

 

 

下面结合netty的源码来具体看一个列子, netty version 4.0.27

 

	public void start() throws Exception {
		EventLoopGroup group = new NioEventLoopGroup(); // 3
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(group) // 4
					.channel(NioServerSocketChannel.class) // 5
					.localAddress(new InetSocketAddress(port)) // 6
					.childHandler(new ChannelInitializer<SocketChannel>() {// 7
								@Override
								public void initChannel(SocketChannel ch)
										throws Exception {
									ch.pipeline().addLast(
											new EchoServerHandler());
								}
							});
			ChannelFuture f = b.bind().sync(); // 8
			f.channel().closeFuture().sync(); // 9
		} finally {
			group.shutdownGracefully().sync(); // 10
		}
	}

	这段代码用过netty的应该很熟悉了,服务器启动的代码, 其中注释8的地方返回了一个future对象。来看看里面发生了什么:
	几番跳转来到了AbstractBootstrap的doBind方法。


    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();  //1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {         //2
            return regFuture;
        }

        if (regFuture.isDone()) {         //3
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();  //5
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);  //5
            regFuture.addListener(new ChannelFutureListener() {    //4
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);  //1
                }
            });
            return promise;
        }
    }

    上面这段代码里
    1.开始会去注册initAndRegister(), 后面才会去bind  doBind0()。
    2.最开始拿到注册操作的future时就做了一次失败的判断,因为假如注册失败后面就没必要做了。
    3.这里检查了下future有没有做完,假如做完了这里就可以顺序执行,不用异步了。
    4.在没有做完,必须异步的情况下,因为之前的future就是异步的,没做完的,这时接下来的bind操作也只能是异步的,采用Listener的方式,最后也是返回个future(promise)
    5.同步执行和异步执行的情况下都返回的是promise,  promise只是个result的placeholder,所以同步的情况下可以用。


    final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }


    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {  //1
            register0(promise);
        } else {
            try {
                eventLoop.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

    上面这段
    1. 判断eventloop是不是当前运行线程,是就直接做。不是就放到队列里异步做。



    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (firstRegistration && isActive()) {
                pipeline.fireChannelActive();
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

    上面的代码就是真正做Register操作的代码了,这里全是同步的代码,做完了后设置promise的状态(safeSetSuccess(promise))。

注册操作 和 bind操作 都可能放到eventloop里面去异步执行, 但他们之间是有顺序的, 因为他们做为task是放到同一个eventloop的queue里去,是一个一个fifo的执行的,注册先进去先执行。
这样异步模型就简单了,不用考虑竞争,锁啊,变成串行执行。

 

 

 

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    netty_learn_netty_源码.zip

    通过阅读和分析这个“netty_learn_netty_源码.zip”中的源代码,你可以深入了解Netty如何实现这些功能,以及它是如何优化网络通信效率的。此外,你还可以学习到Netty如何处理异常、优雅地关闭连接、线程安全等问题,...

    netty源码 4.*版本

    通过阅读 Netty 源码,我们可以学习到以下知识点: - 理解 Java NIO 的工作原理,如何利用 Selector 监听多个 Channel 的事件。 - 掌握 ByteBuf 的内存管理策略,如何避免不必要的内存拷贝。 - 学习如何构建 ...

    netty4.1.15源码

    本压缩包包含的是Netty 4.1.15版本的源代码,对于理解Netty的工作原理、学习如何使用和优化Netty具有极高的价值。 Netty的核心概念包括: 1. **NIO(Non-blocking I/O)**: Netty基于Java NIO(非阻塞I/O)构建,...

    netty4 sources 源码

    这个“netty4 sources 源码”指的是Netty 4.x 版本的源代码,其中4.0.33.Final是特定的版本号。源码分析对于理解Netty的工作原理、优化性能以及定制化开发非常有帮助。 Netty 的核心特性包括: 1. **异步事件驱动*...

    netty-4.1源代码

    这个“netty-4.1源代码”压缩包包含了Netty框架的4.1版本的源码,对于理解Netty的工作原理、学习网络编程以及优化性能有极大的帮助。 Netty 的核心组件包括: 1. **ByteBuf**: Netty 自定义的字节缓冲区,比Java ...

    netty-4.1 源码包

    这个"Netty-4.1 源码包"包含了Netty框架的源代码,允许开发者深入理解其内部工作原理,优化自定义实现,或者排查问题。 在Netty 4.1 版本中,主要包含以下关键知识点: 1. **NIO (Non-blocking I/O)**: Netty 使用...

    netty4.1源码

    此外,通过阅读源码,还可以学习到更多关于 Java NIO、多线程和并发控制的高级技术。 Netty 4.1 的源码学习对于网络编程、分布式系统和高并发应用的开发者来说是一笔宝贵的财富。通过理解并实践其中的设计思想,...

    Netty4.x源码分析详解

    通过深入阅读 Netty 的源码,我们可以了解到网络编程的底层细节,学习到如何设计高性能、可扩展的网络框架,这对于优化分布式系统和微服务架构非常有帮助。同时,Netty 的设计模式和编程思想也对提升软件设计能力...

    Netty源码分析总结.rar

    以上仅是Netty源码分析的一些关键点,实际的学习中还需要结合具体代码和实际案例来深入理解。在分析源码的过程中,我们通常会关注类的设计模式、线程模型、内存管理以及性能优化等方面,这对于提升网络编程和系统...

    netty权威指南第二版源码

    6. **Future 和 Promise**:Netty 使用 Future 和 Promise 对象来处理异步操作的结果,它们允许你在操作完成时注册回调函数。 7. **Codec**:Netty 提供了一系列编解码器,用于将各种协议数据转换为 ChannelBuffer ...

    01.Netty源码剖析简介.rar

    **Netty 源码剖析简介** Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...对于从事高并发、低延迟网络应用开发的工程师来说,Netty源码的学习是不可或缺的。

    Netty 权威指南 源代码

    通过阅读"Netty 权威指南"的源代码,你可以深入理解Netty如何实现上述功能,以及如何优化网络通信的性能。源代码分析可以帮助你掌握Netty的设计模式,比如Actor模型、责任链模式等,并了解其在实际应用中的实现细节...

    Netty源码剖析与实战配套代码.zip

    这个"Netty源码剖析与实战配套代码.zip"文件包含了对Netty框架的深入源码分析和实战应用示例。下面将详细探讨Netty的核心概念和关键功能,以及它如何在实际项目中发挥作用。 1. **Netty概述** - Netty是由JBOSS...

    netty-4.1.4-jar包

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于...总的来说,“netty-4.1.4-jar包”提供了一个完整的Netty框架,结合源代码文件,使得开发者能够深入学习和应用Netty,构建出高效、可靠的网络应用程序。

    netty权威指南第二版源码(可运行)

    这个"Netty 权威指南第二版源码(可运行)"的资源包含的是该书籍的源代码实现,可以供学习者深度理解和实践Netty的相关技术。 Netty 的核心设计理念是基于Reactor模式,这种模式在处理高并发I/O操作时非常有效。它...

    netty 权威指南 源代码,第二版

    《Netty权威指南》第二版源代码是一份深入学习和理解Netty框架的重要参考资料。Netty是一个高性能、异步事件驱动的网络应用框架,适用于Java平台,主要用于快速开发可维护的高性能协议服务器和客户端。本资源包含的...

    Netty源码教程-3

    在本节"Netty源码教程-3"中,我们将深入探讨Netty这一高性能、异步事件驱动的网络应用程序框架。Netty广泛应用于各种分布式系统、服务器和客户端应用,尤其在处理高并发、低延迟的网络通信场景下,其优势尤为突出。...

    netty-source.zip

    Netty 是一个高性能、异步事件驱动...通过阅读和研究 "netty-source.zip" 中的源代码,你可以深入了解 Netty 如何实现这些功能,以及如何优化网络通信。同时,这也是一个深入学习 Java NIO、多线程和并发编程的好机会。

    netty网络源码

    在Netty源码中,我们可以学习到以下几个关键知识点: 1. **NIO(非阻塞I/O)**:Netty基于Java NIO(Non-blocking I/O)库构建,实现了低延迟、高吞吐量的网络通信。NIO的核心概念包括选择器(Selector)、通道...

Global site tag (gtag.js) - Google Analytics