`
youaremoon
  • 浏览: 32500 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

netty5笔记-总体流程分析5-客户端连接过程

阅读更多

前面几篇文章,我们从服务端的视角,分析了从启动到接收连接,到连接的read-write-close。现在我们开始切换到客户端的视角,看看客户端连接服务端的一些实现细节。

还是从snoop的example代码开始,见HttpSnoopClient(稍有修改):

 

    public static void main(String[] args) throws Exception {
        // 配制client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new HttpSnoopClientInitializer(null));

            // 创建连接.
            Channel ch = b.connect("127.0.0.1", 80).sync().channel();

            // 构造HTTP请求.
            HttpRequest request = new DefaultFullHttpRequest(
                    HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
            request.headers().set(HttpHeaderNames.HOST, host);
            request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
            request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);

            // 发送HTTP请求.
            ch.writeAndFlush(request);

            // 等待服务端关闭连接
            ch.closeFuture().sync();
        } finally {
            // 关闭group.
            group.shutdownGracefully();
        }
    }

有了前面ServerBootstrap的铺垫,这里就比较简单了,ServerBootstrap和Bootstrap都继承自AbstractBootstrap。但需要注意的是,在ServerBootstrap中group、channel、handler都是针对NioServerSocketChannel的设置,而切到Bootstrap后,group、channel、handler则是针对NioSocketChannel的设置了。相应的在ServerBootstrap中设置NioSocketChannel的属性和选项使用childAttr、childOption,而Bootstrap中设置NioSocketChannel则是直接使用attr、option。

 

下面直接进入主题,b.connect("127.0.0.1", 80)创建了一个客户端的链接,调用的方法:

 

    public ChannelFuture connect(String inetHost, int inetPort) {
        return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
    }

    /**
     * 创建一个连接.
     */
    public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        // 主要验证必要的参数是否设置,如group何channelFactory(通过channel方法设置)
        validate();
        // 解析并连接
       return doResolveAndConnect(remoteAddress, localAddress());
    }

 private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
        // 初始化并注册到EventLoop(注册这块过程与server端类似,不再细讲)
 final ChannelFuture regFuture = initAndRegister();
 if (regFuture.cause() != null) {
 return regFuture;
 }

 final Channel channel = regFuture.channel();
 final EventLoop eventLoop = channel.eventLoop();
 final NameResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

 if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
 // Resolver has no idea about what to do with the specified remote address or it's resolved already.
 return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
 }
        // 对remoteAddress进行解析,如果地址为ip(127.0.0.1)则直接返回,如地址为域名(www.baidu.com)则需要解析为ip(180.97.33.107)
 final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
 final Throwable resolveFailureCause = resolveFuture.cause();

 if (resolveFailureCause != null) {
 // Failed to resolve immediately
 channel.close();
 return channel.newFailedFuture(resolveFailureCause);
 }

        // 在解析完成后调用doConnect(最终调用doConnect0)连接远程的端口
 if (resolveFuture.isDone()) {
 // Succeeded to resolve immediately; cached? (or did a blocking lookup)
 return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
 }

 // Wait until the name resolution is finished.
 final ChannelPromise connectPromise = channel.newPromise();
 resolveFuture.addListener(new FutureListener<SocketAddress>() {
 @Override
 public void operationComplete(Future<SocketAddress> future) throws Exception {
 if (future.cause() != null) {
 channel.close();
 connectPromise.setFailure(future.cause());
 } else {
 doConnect(future.getNow(), localAddress, regFuture, connectPromise);
 }
 }
 });

 return connectPromise;
 }
    
    // 初始化pipeline、各个option和attr
    void init(Channel channel) throws Exception {
 ChannelPipeline p = channel.pipeline();
 p.addLast(handler());

 final Map<ChannelOption<?>, Object> options = options();
 synchronized (options) {
 for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
 try {
 if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
 logger.warn("Unknown channel option: " + e);
 }
 } catch (Throwable t) {
 logger.warn("Failed to set a channel option: " + channel, t);
 }
 }
 }

 final Map<AttributeKey<?>, Object> attrs = attrs();
 synchronized (attrs) {
 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
 channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
 }
 }
 } 

    private static void doConnect0(
 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
 final ChannelPromise connectPromise) {

 // 这个方法在 channelRegistered()之前调用. 这样就给handlers机会在channelRegistered()方法中设置pipeline
 final Channel channel = connectPromise.channel();
 channel.eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 if (regFuture.isSuccess()) {
                    // 执行channel的connect方法
 if (localAddress == null) {
 channel.connect(remoteAddress, connectPromise);
 } else {
 channel.connect(remoteAddress, localAddress, connectPromise);
 }
 connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 } else {
 connectPromise.setFailure(regFuture.cause());
 }
 }
 });
 }

这个过程比较简单,1、创建一个NioSocketChannel实例,并用Bootstrap中的参数初始化该实例;2、将创建的channel注册到EventLoop中;3、创建一个解析host的任务(如果是ip,任务直接完成,如果是域名需要将域名解析为ip);4、解析完成后,在EventLoop中调用channel.connect,连接到远程端口。

channel.connect将connect任务交给pipeline去处理,最终调用到TailContext中的connect方法,该方法调用代码:unsafe.connect(...)。

 

        // AbstractNioUnsafe
        public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                if (connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }

                boolean wasActive = isActive();
                // 创建连接,如果连接立即成功了则返回true, 否则会返回false
                if (doConnect(remoteAddress, localAddress)) {
                    // 完成连接过程
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // 如果设置了连接超时时间,则创建一个超时检测任务,如果超时未连接成功则关闭连接
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
                    // 如果连接被取消,则关闭前面创建的超时检测任务并关闭连接
                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
 if (promise == null) {
 // Closed via cancellation and the promise has been notified already.
 return;
 }

 // 如果取消了连接则此处会返回false
 boolean promiseSet = promise.trySuccess();

 // 只要是连接确实打开了,则无论是否被取消channelActive都会触发
 if (!wasActive && isActive()) {
 pipeline().fireChannelActive();
 }

 // 如果用户取消了连接,则会调用close方法,此方法会触发channelInactive
 if (!promiseSet) {
 close(voidPromise());
 }
 }

    // doConnect来自NioSocketChannel
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
 if (localAddress != null) {
 javaChannel().socket().bind(localAddress);
 }

 boolean success = false;
 try {
            // 发起连接请求,由于是非阻塞模式,因此主要是两种情况:1、连接本地,可能会立即完成,此时返回true;2、其他情况,没有立即完成连接,返回false
 boolean connected = javaChannel().connect(remoteAddress);
 if (!connected) {
                // 注册OP_CONNECT事件,如果连接成功则会进入NioEventLoop中的processSelectedKey方法
 selectionKey().interestOps(SelectionKey.OP_CONNECT);
 }
 success = true;
 return connected;
 } finally {
 if (!success) {
 doClose();
 }
 }
 }

            // NioEventLoop中的processSelectedKey方法片段  
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
 // 移除OP_CONNECT否则会导致cpu 100%
 int ops = k.interestOps();
 ops &= ~SelectionKey.OP_CONNECT;
 k.interestOps(ops);
                // 此调用最终也会调用上面的fulfillConnectPromise,进而触发后面的channelActive
 unsafe.finishConnect();
 }

上面的连接过程也是比较简单:

1、发起连接请求。1.1如果立即成功则执行连接成功的后续处理,如channelActive方法的调用;1.2如果未立即成功,则将连接事件注册到Selector;

2、Selector检测到连接事件后触发unsafe.finishConnect,该方法最终也执行连接成功的后续处理(同1.1);

3、1.2的分支会创建一个超时检测任务,如果超过指定时间未连接成功,则直接关闭此次连接请求。

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics