`
jnullpointer
  • 浏览: 15891 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

Selector的wakeup的分析

阅读更多
Selector的wakeup的分析可以看http://www.iteye.com/topic/1113639

很久之前看过这篇文章,最近重新看,又有新的理解了。

selectNow的选择过程是非阻塞的,select(timeout)和select()的选择过程是阻塞的。
sun.nio.ch.PollSelectorImpl类中
    protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(totalChannels, 0, timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.getReventOps(0) != 0) {
            // Clear the wakeup pipe
            pollWrapper.putReventOps(0, 0);
            synchronized (interruptLock) {
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

需要注意两点
1.processDeregisterQueue方法
  在SelectionKey的cancel()的API说明中:
  The key will be removed from all of the selector's key sets during the next selection operation
  即在下一次执行selection操作时,才会将key删除掉。从上面的doSelect看到,执行poll调用之前会执行processDeregisterQueue进行删除操作。
2.Select的方法最终会调用内核poll函数,在PollArrayWrapper.c中
    ipoll(struct pollfd fds[], unsigned int nfds, int timeout) {
        jlong start, now;
        int remaining = timeout;
        struct timeval t;
        int diff;
        gettimeofday(&t, NULL);
        start = t.tv_sec * 1000 + t.tv_usec / 1000;
        for (;;) {
            int res = poll(fds, nfds, remaining);
            if (res < 0 && errno == EINTR) {
                if (remaining >= 0) {
                    gettimeofday(&t, NULL);
                    now = t.tv_sec * 1000 + t.tv_usec / 1000;
                    diff = now - start;
                    remaining -= diff;
 
                    if (diff < 0 || remaining <= 0) {
                       return 0;
                    }
                    start = now;
                }
            } else {
                return res;
            }
        }
    }

是不是可以认为,内核的poll函数应该是非阻塞的,实现阻塞和超时是PollArrayWrapper.c的for循环逻辑实现的。

比如selector线程正在阻塞中,对这个线程执行中断操作。植入的中断触发器执行选择器的wakeup操作。怎么唤醒呢?正常情况下,选择器监听的channel中,如果有感兴趣的事件发生,选择器便返回,那么也可以为wakeup构造这样一个场景。比如可以这样实现:在构造Selector时,私有的为Selector加入一个管道,正常情况下,这个管道既没有输入也没有输出,没有任何事件发生。但当selector阻塞时,并且被执行了中断,需要被唤醒,因为此时用户指定的channel是没有事件发生的。这个时候,私有的管道起作用了,可以往这个私有管道的写端输入一个字节,便触发了读感兴趣的事件,selector便被唤醒。

sun.nio.ch.PollSelectorImpl类中:
    PollSelectorImpl(SelectorProvider sp) {
        super(sp, 1, 1);
        int[] fdes = new int[2];
        IOUtil.initPipe(fdes, false);
        fd0 = fdes[0];
        fd1 = fdes[1];
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        pollWrapper.initInterrupt(fd0, fd1);
        channelArray = new SelectionKeyImpl[INIT_CAP];
    }

IOUtil.initPipe,采用系统调用pipe(int fd[2])来创建管道,fd[0]即为ready端,fd[1]即为write端。

sun.nio.ch.PollArrayWrapper类中,构造函数为:
    PollArrayWrapper(int newSize) {
        newSize = (newSize + 1) * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(newSize, false);
        pollArrayAddress = pollArray.address();
        totalChannels = 1;
    }

INIT_CAP默认值 = 10,newSize + 1表明其中增加的一个是为用于wakeup的pollfd结构体分配内存。所以totalChannels = 1。SIZE_POLLFD = 8个字节,实际上pollfd结构体的长度(int fd + short events + short revents)。在调用内核的poll函数时,需要传入的pollfd结构体。在注释中,说明了pollfd的结构体信息
typedef struct pollfd {
    int fd;         //file descriptor  文件描述符
     short events;   //event of interest on fd 对fd感兴趣的事件
     short revents;  //event that occurred on fd 内核返回fd的事件
} pollfd_t;
内存分配是通过unsafe.allocateMemory实现的,AllocatedNativeObject(newSize, false)中false参数为pageAligned,应该是用于内存对齐,但真心没弄懂this.address = a + ps - (a & (ps - 1))
unsafe.allocateMemory返回的是内存起始地址,后面对PollArrayWrapper的pollfd数组操作都是通过起始地址+偏移量来实现。比如:
    int getEventOps(int i) {
        int offset = SIZE_POLLFD * i + EVENT_OFFSET;
        return pollArray.getShort(offset);
    }

计算第i个pollfd的events的偏移量,然后通过起始地址+偏移量来读取。实际上sun.nio.ch.AbstractPollArrayWrapper提供了操作pollfd的get和put方法。
SelectableChannel注册到Selector时,最后也是写入到PollArrayWrapper的pollfd数组,即调用PollArrayWrapper的addEntry方法:
    void addEntry(SelChImpl sc) {
        putDescriptor(totalChannels, IOUtil.fdVal(sc.getFD()));
        putEventOps(totalChannels, 0);
        putReventOps(totalChannels, 0);
        totalChannels++;
    }

可以看到是按照pollfd结构体来写入的。明白了这个概念,就明白了PollArrayWrapper的其他方法。
sun.nio.ch.PollArrayWrapper类中
    void initInterrupt(int fd0, int fd1) {
        interruptFD = fd1;
        putDescriptor(0, fd0);
        putEventOps(0, POLLIN);
        putReventOps(0, 0);
    }

1.这个私有的管道对POLLIN事件感兴趣。当执行wakeup时,往fd1写入一个字节,即可唤醒。在PollArrayWrapper.c中
Java_sun_nio_ch_PollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd){
    int fakebuf[1];
    fakebuf[0] = 1;
    if (write(fd, fakebuf, 1) < 0) {
         JNU_ThrowIOExceptionWithLastError(env, "Write to interrupt fd failed");
    }
}

2.put*方法最后都是通过unsafe类操作内存地址来构造;
sun.nio.ch.AbstractPollSelectorImpl
    protected int updateSelectedKeys() {
        int numKeysUpdated = 0;
        // Skip zeroth entry; it is for interrupts only
        for (int i=channelOffset; i<totalChannels; i++) {
            int rOps = pollWrapper.getReventOps(i);
            if (rOps != 0) {
                SelectionKeyImpl sk = channelArray[i];
                pollWrapper.putReventOps(i, 0);
                if (selectedKeys.contains(sk)) {
                    if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
                        numKeysUpdated++;
                    }
                } else {
                    sk.channel.translateAndSetReadyOps(rOps, sk);
                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                        selectedKeys.add(sk);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

1.Skip zeroth entry; it is for interrupts only:第0个是用于wakeup的管道,这个管道不关心内核的poll返回事件,需要跳过       
2.获取到内核返回的事件后,会将revents清为0;
3.如果sk.nioReadyOps() & sk.nioInterestOps()) != 0,即将SelectionKey加入到SelectorImpl的selectedKeys中;
4.translateAndSetReadyOps将内核返回的revents转化为SelectionKey的ready operation ops
分享到:
评论

相关推荐

    Java-NIO之Selector.doc

    如果Selector正处于阻塞状态,`selector.wakeup()`方法可以用来中断阻塞并让其重新检查通道。这在需要立即处理新注册事件时非常有用。例如,当线程B注册了一个写事件,线程A需要被唤醒以便在下一次`select()`调用中...

    Java NIO 细节也精彩

    #### 一、Selector的Wakeup原理 ##### 1.1 背景介绍 在Java NIO (Non-blocking I/O)中,`Selector` 是核心组件之一,用于监控多个`Channel`上的I/O事件(如可读、可写等)。`Selector`支持三种主要的选择方法:`...

    Netty面试专题及答案.pdf

    - **SelectionKey**: 选路键表示 Channel 和 Selector 之间的注册关系,它记录了 Channel 可以被选中的事件类型,同时也提供了取消注册(`key.cancel()`)和唤醒(`selector.wakeup()`)的能力。 - **Pipe**: Pipe ...

    JAVA_Netty面试专题10道.pdf

    - **SelectionKey**:表示Channel和Selector之间的注册关系,记录了Channel的可读、可写等状态,并提供wakeup方法唤醒Selector。 - **Pipe**:在NIO中提供了单向数据管道,可以从source通道写入,从sink通道读出。...

    Java NIO 聊天室 JSwing

    //selector的wakeup方法被调用,方法返回,而对于客户端来说,通道一直是被选中的 selector.select(); // 获得selector中选中的项的迭代器 Iterator ite = this.selector.selectedKeys().iterator(); while...

    Netty 35道面试题和答案.docx

    Netty是一个高性能、异步事件驱动的网络应用框架,专为Java设计,用于构建高效...Selector的register方法用于向选择器注册通道并设置监听事件,SelectionKey代表了注册关系,wakeup方法用于唤醒被阻塞的Selector线程。

    Netty面试题汇总.pdf

    Selector 的关键操作包括 open、register 和 wakeup 等,其中 register 方法允许将 Channel 注册到 Selector,并指定关注的事件类型。 综上所述,Netty 作为一款强大的网络通信框架,因其高性能、易用性和高度可...

    scalable io in java

    5. **中断和唤醒(Interrupt and Wakeup)**:NIO支持中断IO操作,当操作被中断时,系统会立即停止等待并抛出异常,使得线程可以更快地响应其他事件。 6. **文件系统操作的增强**:NIO还提供了一些高级文件系统功能...

    Java多线程之中断线程(Interrupt)的使用详解

    在`Selector`中,中断会导致线程立即从选择操作返回,就像调用了`Selector.wakeup()`一样。 检测中断通常有两种方式。第一种是在可能会抛出`InterruptedException`的方法中捕获异常,这通常意味着线程应该终止。另...

    深入Java线程中断的本质与编程原则的概述

    对于基于Selector的异步I/O,可以调用`Selector.wakeup()`;对于内部锁,可以使用`Lock`接口提供的`tryLock()`或`unlock()`方法。 4. **恢复中断状态**:有时在处理`InterruptedException`后,为了保持中断状态,...

    TimePickerViewDemo.zip

    alarmManager.set(AlarmManager.RTC_WAKEUP, calendar.getTimeInMillis(), pendingIntent); ``` 2. 日程安排 在日程管理应用中,TimePickerView可以帮助用户设定活动开始和结束时间,然后将这些时间信息存储到...

Global site tag (gtag.js) - Google Analytics