前面几篇文章,我们从服务端的视角,分析了从启动到接收连接,到连接的read-write-close。现在我们开始切换到客户端的视角,看看客户端连接服务端的一些实现细节。
还是从snoop的example代码开始,见HttpSnoopClient(稍有修改):
public static void main(String[] args) throws Exception { // 配制client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new HttpSnoopClientInitializer(null)); // 创建连接. Channel ch = b.connect("127.0.0.1", 80).sync().channel(); // 构造HTTP请求. HttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); request.headers().set(HttpHeaderNames.HOST, host); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); // 发送HTTP请求. ch.writeAndFlush(request); // 等待服务端关闭连接 ch.closeFuture().sync(); } finally { // 关闭group. group.shutdownGracefully(); } }
有了前面ServerBootstrap的铺垫,这里就比较简单了,ServerBootstrap和Bootstrap都继承自AbstractBootstrap。但需要注意的是,在ServerBootstrap中group、channel、handler都是针对NioServerSocketChannel的设置,而切到Bootstrap后,group、channel、handler则是针对NioSocketChannel的设置了。相应的在ServerBootstrap中设置NioSocketChannel的属性和选项使用childAttr、childOption,而Bootstrap中设置NioSocketChannel则是直接使用attr、option。
下面直接进入主题,b.connect("127.0.0.1", 80)创建了一个客户端的链接,调用的方法:
public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } /** * 创建一个连接. */ public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } // 主要验证必要的参数是否设置,如group何channelFactory(通过channel方法设置) validate(); // 解析并连接 return doResolveAndConnect(remoteAddress, localAddress()); } private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) { // 初始化并注册到EventLoop(注册这块过程与server端类似,不再细讲) final ChannelFuture regFuture = initAndRegister(); if (regFuture.cause() != null) { return regFuture; } final Channel channel = regFuture.channel(); final EventLoop eventLoop = channel.eventLoop(); final NameResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise()); } // 对remoteAddress进行解析,如果地址为ip(127.0.0.1)则直接返回,如地址为域名(www.baidu.com)则需要解析为ip(180.97.33.107) final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); return channel.newFailedFuture(resolveFailureCause); } // 在解析完成后调用doConnect(最终调用doConnect0)连接远程的端口 if (resolveFuture.isDone()) { // Succeeded to resolve immediately; cached? (or did a blocking lookup) return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise()); } // Wait until the name resolution is finished. final ChannelPromise connectPromise = channel.newPromise(); resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); connectPromise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, regFuture, connectPromise); } } }); return connectPromise; } // 初始化pipeline、各个option和attr void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { try { if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + channel, t); } } } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } } private static void doConnect0( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture, final ChannelPromise connectPromise) { // 这个方法在 channelRegistered()之前调用. 这样就给handlers机会在channelRegistered()方法中设置pipeline final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 执行channel的connect方法 if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { connectPromise.setFailure(regFuture.cause()); } } }); }
这个过程比较简单,1、创建一个NioSocketChannel实例,并用Bootstrap中的参数初始化该实例;2、将创建的channel注册到EventLoop中;3、创建一个解析host的任务(如果是ip,任务直接完成,如果是域名需要将域名解析为ip);4、解析完成后,在EventLoop中调用channel.connect,连接到远程端口。
channel.connect将connect任务交给pipeline去处理,最终调用到TailContext中的connect方法,该方法调用代码:unsafe.connect(...)。
// AbstractNioUnsafe public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); // 创建连接,如果连接立即成功了则返回true, 否则会返回false if (doConnect(remoteAddress, localAddress)) { // 完成连接过程 fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // 如果设置了连接超时时间,则创建一个超时检测任务,如果超时未连接成功则关闭连接 int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // 如果连接被取消,则关闭前面创建的超时检测任务并关闭连接 promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } } private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // 如果取消了连接则此处会返回false boolean promiseSet = promise.trySuccess(); // 只要是连接确实打开了,则无论是否被取消channelActive都会触发 if (!wasActive && isActive()) { pipeline().fireChannelActive(); } // 如果用户取消了连接,则会调用close方法,此方法会触发channelInactive if (!promiseSet) { close(voidPromise()); } } // doConnect来自NioSocketChannel protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { // 发起连接请求,由于是非阻塞模式,因此主要是两种情况:1、连接本地,可能会立即完成,此时返回true;2、其他情况,没有立即完成连接,返回false boolean connected = javaChannel().connect(remoteAddress); if (!connected) { // 注册OP_CONNECT事件,如果连接成功则会进入NioEventLoop中的processSelectedKey方法 selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } } // NioEventLoop中的processSelectedKey方法片段 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 移除OP_CONNECT否则会导致cpu 100% int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 此调用最终也会调用上面的fulfillConnectPromise,进而触发后面的channelActive unsafe.finishConnect(); }
上面的连接过程也是比较简单:
1、发起连接请求。1.1如果立即成功则执行连接成功的后续处理,如channelActive方法的调用;1.2如果未立即成功,则将连接事件注册到Selector;
2、Selector检测到连接事件后触发unsafe.finishConnect,该方法最终也执行连接成功的后续处理(同1.1);
3、1.2的分支会创建一个超时检测任务,如果超过指定时间未连接成功,则直接关闭此次连接请求。
相关推荐
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-transport-rxtx-4.1.74.Final.jar; 赠送原API文档:netty-transport-rxtx-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-rxtx-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-codec-redis-4.1.73.Final.jar; 赠送原API文档:netty-codec-redis-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-redis-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-codec-http2-4.1.74.Final.jar; 赠送原API文档:netty-codec-http2-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-codec-haproxy-4.1.73.Final.jar; 赠送原API文档:netty-codec-haproxy-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-haproxy-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
Netty (netty-netty-4.1.77.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-http2-4.1.73.Final.jar; 赠送原API文档:netty-codec-http2-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-http2-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-codec-http-4.1.27.Final.jar; 赠送原API文档:netty-codec-http-4.1.27.Final-javadoc.jar; 赠送源代码:netty-codec-http-4.1.27.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
基于Netty实现的MQTT客户端_netty-mqtt-client