`

重写CountDownLatch以实现线程状态监视

阅读更多

需求:管理线程需要及时知道工作线程全部处于等待状态,并满足等待条件让他们恢复运行。

 

思想:有一种方法是“心跳方法”,让工作线程定时向管理线程“报到”。而这里我想通过并发的状态计数来实现状态监视。jdk提供有两种并发的状态计数:java.util.concurrent.CountDownLatch 和 java.util.concurrent.CyclicBarrier 。CountDownLatch 初始化后,工作线程调用countDown方法,计数减为0以前,调用await方法的其他线程会一直阻塞。它适用于一个或几个其他线程等待一组线程都经过某点的情形,但有一个局限,即它是一次性的能重置。CyclicBarrier 可以重置,但它只适用于一组线程分别到达某点后互相等待以实现步调一致的情形。(见JDK 1.6)

    于是我考虑修改CountDownLatch 类,使其支持重置,以胜任连续监视的需求。

 

代码如下:

修改后的新计数器类:ResetableCountDownLatch

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

public class ResetableCountDownLatch {
    /**
     * Synchronization control For ResetableCountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setCount(count);
        }
        
        void setCount(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code ResetableCountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResetableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    // 增加了这个方法
    public void setCount(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        sync.setCount(count);
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

 

测试用例:

 

    public static void main(String[] args)
    {
        // 初始化计数器,本例有3个工作线程
        ResetableCountDownLatch latch = new ResetableCountDownLatch(3);
        ThreadGroup tg = new ThreadGroup("");

        class RunBody implements Runnable {
            ResetableCountDownLatch latch;
            
            RunBody(ResetableCountDownLatch latch) {
                this.latch = latch;
            }

            @Override
            public void run()
            {
                System.out.println(Thread.currentThread().getName() + " start.");
                
                for(int i=0; i<Thread.currentThread().getId(); ++i) {
                    try
                    {
                        Thread.sleep(1000);

                        // 在wait前递减计数器
                        synchronized(this) {
                            System.out.println(Thread.currentThread().getName() + " wait " + (i+1) + "time(s)");
                            latch.countDown();
                            this.wait();
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    
                    System.out.println(Thread.currentThread().getName() + " continue.");
                }

                // 线程结束前也要countDown “告知”监视线程
                System.out.println(Thread.currentThread().getName() + " finish.");
                latch.countDown();
            }
            
        }
        
        RunBody threadBody = new RunBody(latch);
        
        for(int i=0; i<3; ++i) {
            new Thread(tg, threadBody).start();
        }

        while(true) {
            try
            {
                latch.await();
                // 需要判断工作线程是否全结束了
                if(0==tg.activeCount()) {
                    break;
                }

                System.out.println("Main: there are " + tg.activeCount() + " live threads all waiting");
                synchronized(threadBody) {
                    // 重置计数器,注意:是存活的工作线程数
                    latch.setCount(tg.activeCount());
                    System.out.println("Main: wake them up.");
                    threadBody.notifyAll();
                }
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        System.out.println("Main: All threads finished.");
    }

 

测试输出:

Thread-1 start.
Thread-2 start.
Thread-0 start.
Thread-2 wait 1time(s)
Thread-0 wait 1time(s)
Thread-1 wait 1time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-2 wait 2time(s)
Thread-0 wait 2time(s)
Thread-1 wait 2time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-0 continue.
Thread-2 continue.
Thread-2 wait 3time(s)
Thread-1 wait 3time(s)
Thread-0 wait 3time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-2 wait 4time(s)
Thread-0 wait 4time(s)
Thread-1 wait 4time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-1 wait 5time(s)
Thread-2 wait 5time(s)
Thread-0 wait 5time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-2 continue.
Thread-1 continue.
Thread-0 wait 6time(s)
Thread-1 wait 6time(s)
Thread-2 wait 6time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-0 continue.
Thread-2 wait 7time(s)
Thread-1 wait 7time(s)
Thread-0 wait 7time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-0 wait 8time(s)
Thread-1 wait 8time(s)
Thread-2 wait 8time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-0 finish.
Thread-2 wait 9time(s)
Thread-1 wait 9time(s)
Main: there are 2 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-1 finish.
Thread-2 wait 10time(s)
Main: there are 1 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-2 finish.
Main: All threads finished.

 

0
2
分享到:
评论
2 楼 nudtgk2000 2012-11-05  
flysnail 写道
思路挺好,

谢谢鼓励,还是初学者,不懂的太多。:)
1 楼 flysnail 2012-11-05  
思路挺好,:)

相关推荐

    多线程的使用与线程锁的两种实现

    Java提供了一些线程通信的工具,如wait(), notify()和notifyAll(),它们都与对象的监视器(monitor)相关联。但在实际使用中,由于它们容易导致死锁和不易管理,现在更推荐使用java.util.concurrent包中的高级并发...

    JAVA多线程模式高清版+DEMO

    - `CountDownLatch`:用于多线程同步,计数器减至零后所有线程继续执行。 - `CyclicBarrier`:多线程到达屏障后一起继续,可重用。 - `Semaphore`:信号量,用于限制并发访问的线程数量。 7. **线程设计模式**:...

    Java多线程编程

    创建一个新的线程通常有两种方式:一是继承`Thread`类并重写`run()`方法,二是实现`Runnable`接口并提供`run()`方法,然后将实现类的实例传给`Thread`类的构造函数。通过调用`start()`方法启动线程,它会自动调用`...

    thread线程

    线程是计算机编程中的一个核心概念,特别是在多任务和并发执行的环境中。在操作系统中,进程是资源分配的基本单位,而线程...在实际开发中,应根据具体需求选择合适的线程模型和同步机制,以实现高效、稳定的并发执行。

    JAVA多线程总结

    5. **障碍器**:`CyclicBarrier`和`CountDownLatch`等工具,用于线程间的同步和协调。 Java多线程涉及的内容广泛,理解并熟练掌握这些知识对于编写高效、安全的并发程序至关重要。在实际开发中,应根据具体需求选择...

    java常见面试题---线程篇

    - 实现`Runnable`接口:创建一个类实现`Runnable`接口,并重写`run()`方法,然后将实例传递给`Thread`类的构造函数。 - 继承`Thread`类:创建一个类继承自`Thread`类,并覆盖`run()`方法。 2. **线程状态**: - ...

    java线程案例

    - 阻塞(Blocked):线程正在等待监视器锁。 - 等待(Waiting):线程已经释放了所有锁,正在等待其他线程执行特定操作。 - 超时等待(Timed Waiting):线程在等待一段时间后会自动唤醒。 - 结束(Terminated):...

    Java企业系列面试题(线程篇).docx

    监视器(Monitor)是Java中实现线程同步的关键机制。每个对象都有一个与之关联的锁,线程通过`synchronized`关键字请求获取锁来进入监视器。如果锁已被其他线程持有,请求线程将被阻塞,直到锁释放。这样保证了同一...

    java线程.part002

    Java线程是Java编程中的一个核心概念,它允许程序同时执行多个任务,从而实现多任务并行处理。在Java中,线程是程序执行的最小单位,由JVM(Java虚拟机)进行调度。理解Java线程对于开发高效、响应迅速的并发应用...

    java多线程.rar

    - `CountDownLatch`:计数器,允许一个或多个线程等待其他线程完成操作。 4. **线程池**: - `ExecutorService`:线程池接口,用于管理和控制线程的生命周期。 - `ThreadPoolExecutor`:最常见的线程池实现,...

    Java应用开发多线程

    - 实现`PropertyChangeListener`,监听后台任务进度或状态变化,以实时更新UI。 总的来说,学习Java多线程不仅涉及理论知识,还需要实践操作,例如通过`JFrameThread.java`这样的示例代码来理解如何在实际项目中...

    java多线程学习笔记

    Java并发API提供了丰富的工具类,如Semaphore(信号量)用于限制同时访问特定资源的线程数量,CyclicBarrier(循环栅栏)和CountDownLatch(计数器门闩)用于协调多线程间的同步,ThreadPoolExecutor(线程池)则...

    java多线程和锁,用于学习参考

    创建新线程主要有两种方式:一是继承`Thread`类并重写`run()`方法,二是实现`Runnable`接口并提供`run()`方法实现,然后通过`Thread`对象启动。此外,Java 5.0引入了`ExecutorService`和`Future`,提供了更灵活的...

    java多线程代码笔记

    如果你选择继承`Thread`类,你需要重写`run()`方法,并创建实例来启动线程: ```java public class MyThread extends Thread { @Override public void run() { // 这里编写线程要执行的代码 } } // 创建并启动...

    【Java核心知识面试】-15个顶级Java多线程面试题答案.zip

    2. **线程的状态**: - 新建(New):线程被创建但尚未启动。 - 可运行(Runnable):线程已经启动,等待CPU分配资源执行。 - 阻塞(Blocked):线程在等待监视器锁。 - 等待(Waiting):线程在等待其他线程...

    Java线程编程

    2. **线程状态与生命周期**: - 新建(New):线程被创建但尚未启动。 - 运行(Runnable):线程被启动,等待JVM分配CPU时间片。 - 阻塞(Blocked):线程正在等待监视器锁。 - 等待(Waiting):线程等待其他...

    vip-v2-concurrent.zip

    继承`Thread`类直接重写`run()`方法,而实现`Runnable`接口则需要创建`Thread`对象并传入`Runnable`实例,然后调用`start()`方法启动线程。 3. **线程状态** 线程有五种基本状态:新建、就绪、运行、阻塞和死亡。...

    java面试精选必备题集

    + 封装:将对象的状态和行为结合在一起,隐藏内部实现 + 多态:对象可以表现出多种形态 * final, finally, finalize的区别 + final:用于修饰符、变量、方法,表示不可改变或重写 + finally:用于异常处理,执行...

    java面试题2024资源下载

    - 在线程体内部检查中断状态,如Thread.interrupted()或Thread.currentThread().isInterrupted()。 - **使用volatile变量**: - 在外部维护一个volatile类型的标记变量。 - 线程内部循环检查该变量是否为true,...

Global site tag (gtag.js) - Google Analytics