浏览 2934 次
锁定老帖子 主题:netty源代码解析(2)——客户端流程
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2012-06-27
最后修改:2012-06-27
ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipleline = pipeline(); pipleline.addLast("encode", new ObjectEncoder(1048576 * 16)); pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null))); pipleline.addLast("handler", handler); return pipleline; } }); bootstrap.setOption("receiveBufferSize", 1048576 * 64); bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法 //tcp定期发送心跳包 比如IM里边定期探测对方是否下线 //只有tcp长连接下才有意义 // bootstrap.setOption("child.keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port)); Channel channel = future.awaitUninterruptibly().getChannel(); 客户端事件处理顺序如下: UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功) 在connect的时候做了如下处理 public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } ChannelPipeline pipeline; try { pipeline = getPipelineFactory().getPipeline(); } catch (Exception e) { throw new ChannelPipelineException("Failed to initialize a pipeline.", e); } // Set the options.先创建Channel Channel ch = getFactory().newChannel(pipeline); ch.getConfig().setOptions(getOptions()); // Bind. if (localAddress != null) { ch.bind(localAddress); } // Connect. 再进行连接 return ch.connect(remoteAddress); } 首先要创建出Channel NioClientSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { super(null, factory, pipeline, sink, newSocket(), worker); fireChannelOpen(this); } 紧接着会fire一个ChannelOpen事件, if (channel.getParent() != null) { fireChildChannelStateChanged(channel.getParent(), channel); } channel.getPipeline().sendUpstream( new UpstreamChannelStateEvent( channel, ChannelState.OPEN, Boolean.TRUE)); 这样会出发Upstream的ChannelState.OPEN事件。 接下来要继续connect了 if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } ChannelFuture future = future(channel, true); channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent( channel, future, ChannelState.CONNECTED, remoteAddress)); return future; 这样就会出发Downstream的ChannelState.CONNECTED事件。 接下来就要由NioClientSocketPipelineSink来进行处理了 switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; 下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的 所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select private static final class RegisterTask implements Runnable { private final Boss boss; private final NioClientSocketChannel channel; RegisterTask(Boss boss, NioClientSocketChannel channel) { this.boss = boss; this.channel = channel; } public void run() { try { channel.socket.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); } int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); if (connectTimeout > 0) { channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; } } } register方法 void register(NioClientSocketChannel channel) { Runnable registerTask = new RegisterTask(this, channel); Selector selector; synchronized (startStopLock) { if (!started) { // Open a selector if this worker didn't start yet. try { this.selector = selector = Selector.open(); } catch (Throwable t) { throw new ChannelException( "Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( this, "New I/O client boss #" + id + '-' + subId)); success = true; } finally { if (!success) { // Release the Selector if the execution fails. try { selector.close(); } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } this.selector = selector = null; // The method will return to the caller at this point. } } } else { // Use the existing selector if this worker has been started. selector = this.selector; } assert selector != null && selector.isOpen(); started = true; boolean offered = registerTaskQueue.offer(registerTask); assert offered; } RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。 然后run方法处理感兴趣的事件 public void run() { boolean shutdown = false; Selector selector = this.selector; long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); for (;;) { wakenUp.set(false); try { int selectedKeyCount = selector.select(500); ....... processRegisterTaskQueue(); if (selectedKeyCount > 0) { processSelectedKeys(selector.selectedKeys()); } 在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件 private void processSelectedKeys(Set<SelectionKey> selectedKeys) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); if (!k.isValid()) { close(k); continue; } if (k.isConnectable()) { connect(k); } } } 将连接上的Channel注册到worker中,交给worker去注册read和write private void connect(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { if (ch.socket.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } } catch (Throwable t) { ch.connectFuture.setFailure(t); fireExceptionCaught(ch, t); k.cancel(); // Some JDK implementations run into an infinite loop without this. ch.worker.close(ch, succeededFuture(ch)); } } 在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |