用nettys收发网络数据的时候,一般不会注册SelectionKey.OP_WRITE事件。但是,如果在netty写数据的时候AbstractNioWorker.write0,发现写不进去了buf.finished()==false,可能是操作系统内核缓冲区满了,这个时候会把SelectionKey关联的Channel注册上OP_WRITE事件。
protected void write0(AbstractNioChannel<?> channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { …… if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment evt = null; buf = null; future.setSuccess(); } else {//写不进去啦,内核缓冲区可能满了 // Not written fully - perhaps the kernel buffer is full. addOpWrite = true;//这个参数在后边会导致注册OP_WRITE事件 channel.writeSuspended = true;//这个参数会影响到后续的写入操作 if (localWrittenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { …… } } channel.inWriteNowLoop = false; // Initially, the following block was executed after releasing // the writeLock, but there was a race condition, and it has to be // executed before releasing the writeLock: // // https://issues.jboss.org/browse/NETTY-410 // if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } …… }
同时,AbstractNioWorker.write0的时候,会把被写的Channel.writeSuspended设置为true,这个参数在Channels.write -> DefaultChannelPipeline.sendDownstream -> NioClientSocketPipelineSink.eventSunk -> AbstractNioWorker.writeFromUserCode时会控制后续写入操作。
public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; NioClientSocketChannel channel = (NioClientSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: …… case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event); assert offered; channel.worker.writeFromUserCode(channel);//写入前的判断逻辑 } } void writeFromUserCode(final AbstractNioChannel<?> channel) { …… if (channel.writeSuspended) {//如果这里为true,永远也不会调用真正的写入操作了 return; } …… write0(channel); }
但是上边提到,在写入不了的时候,注册了OP_WRITE事件,一旦内核缓冲有空间了,在selector.selectKeys()的时候,就会抓到这个注册了OP_WRITE事件的selectionKey(问题是,哪个线程会做这样的事情?channel和worker是绑定的?一个worker关联一个selector?)NioWorker.run -> AbstractNioWorker.run
public void run() { thread = Thread.currentThread(); int selectReturnsImmediately = 0; Selector selector = this.selector; if (selector == null) { return; } // use 80% of the timeout for measure final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; boolean wakenupFromLoop = false; for (;;) { wakenUp.set(false); try { long beforeSelect = System.nanoTime(); int selected = select(selector);//select准备就绪的key if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { long timeBlocked = System.nanoTime() - beforeSelect; if (timeBlocked < minSelectTimeout) { boolean notConnected = false; // loop over all keys as the selector may was unblocked because of a closed channel for (SelectionKey key: selector.keys()) { SelectableChannel ch = key.channel(); try { if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() || ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { notConnected = true; // cancel the key just to be on the safe side key.cancel(); } } catch (CancelledKeyException e) { // ignore } } if (notConnected) { selectReturnsImmediately = 0; } else { // returned before the minSelectTimeout elapsed with nothing select. // this may be the cause of the jdk epoll(..) bug, so increment the counter // which we use later to see if its really the jdk bug. selectReturnsImmediately ++; } } else { selectReturnsImmediately = 0; } …… } else { // reset counter selectReturnsImmediately = 0; } …… if (wakenUp.get()) { wakenupFromLoop = true; selector.wakeup(); } else { wakenupFromLoop = false; } cancelledKeys = 0; processTaskQueue();//处理task,真正的逻辑是在这里处理的? selector = this.selector; // processTaskQueue() can call rebuildSelector() if (shutdown) { …… } else { process(selector);//处理就绪的key } } catch (Throwable t) { …… } } } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); //读事件 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k);//注册了OP_WRITE事件!现在内核告知可以继续写啦! } } catch (CancelledKeyException e) { close(k); } …… } } void writeFromSelectorLoop(final SelectionKey k) { AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment(); ch.writeSuspended = false;//写不再挂起,调用Channel.write走到writeFromUserCode()的时候,也可以顺利的走到write0了 write0(ch); }
相关推荐
Netty 3.6.2.Final 稳定版本 含源码
赠送jar包:netty-3.6.2.Final.jar; 赠送原API文档:netty-3.6.2.Final-javadoc.jar; 赠送源代码:netty-3.6.2.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.6.2.Final.pom; 包含翻译后的API文档:netty-...
java运行依赖jar包
赠送jar包:netty-3.6.2.Final.jar; 赠送原API文档:netty-3.6.2.Final-javadoc.jar; 赠送源代码:netty-3.6.2.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.6.2.Final.pom; 包含翻译后的API文档:netty-...
8. **零拷贝**:Netty通过直接将数据从网络缓冲区传递到用户缓冲区,或者反之,实现了零拷贝,减少了不必要的内存复制,提高了效率。 9. **心跳与空闲检测**:Netty提供心跳机制和空闲检测,以保持连接的活跃状态并...
5. **零拷贝**:Netty实现了操作系统层面的零拷贝,通过直接缓冲区和FileChannel的transferTo方法,减少了数据在用户空间和内核空间之间传输的次数,提高了性能。 6. **线程模型**:Netty的EventLoopGroup负责调度...
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...
如果接收缓冲区中没有数据,那么应用程序就会在系统调用上阻塞,直到Socket接收缓冲区有数据,然后CPU将内核空间(Socket接收缓冲区)的数据拷贝到用户空间,最后系统调用read返回,应用程序读取数据。
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见...
3. **灵活的管道(Pipeline)机制**:网络数据在Netty中通过一系列处理器(ChannelHandler)进行处理,形成了数据处理的管道,开发者可以根据需求自定义处理器,实现复杂的数据处理逻辑。 4. **丰富的协议支持**:...
Netty (netty-netty-4.1.77.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
《基于Netty的Java数据采集软件详解》 在IT领域,高效、稳定的数据采集系统是许多业务场景中的关键环节。本文将深入探讨一个基于Netty的Java数据采集软件,它利用Netty强大的网络通信框架,实现了对大规模分布式...
1. **ByteBuf**: Netty自定义的字节缓冲区,比Java的ByteBuffer更加强大和高效,支持读写分离,便于数据操作和内存管理。 2. **Channel**: 表示网络连接,可以是TCP连接、UDP连接或者本地套接字等。每个Channel对应...