`

一个C#和Java的多线程控制类

    博客分类:
  • .Net
阅读更多

该类用于运行一组线程,该类将阻塞直到该组线程全部执行完毕

 

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Runtime.CompilerServices;

namespace Pkysoft.Utility
{
    /// <summary>
    /// 该类用于运行一组线程,该类将阻塞直到该组线程全部执行完毕。
    /// 如果这组线程中有现成发生意外被中断或发生别的什么错误,该类会在所有现成执行完毕后抛一个MultiThreadManagerThread异常。 
    /// usage:
    /// <code>
    ///        List<ThreadStart> list = new List<ThreadStart>();
    ///        for (int i = 0; i < 1000; i++)
    ///        {
    ///            list.Add(delegate(){new TestThread().Start();});
    ///        }
    ///        MultiThreadManager mtm = new MultiThreadManager(list, "mythread", 0);
    ///         try
    ///         {
    ///             mtm.RunAndWait();
    ///         }
    ///         catch (MultiThreadManagerException ex)
    ///         {
    ///             Console.WriteLine("Error:" + ex);
    ///         }
    /// </code>
    /// </summary>

    public class MultiThreadManager
    {
        List<ThreadStart> Threads;
        string NamePrefix;
        public MultiThreadManager(List<ThreadStart> threads, string namePrefix, int timeout)
        {
            this.Threads = threads;
            this.NamePrefix = namePrefix;
            this.EachThreadTimeout = timeout;
        }
        int EachThreadTimeout = 0;
        public void RunAndWait()
        {
            List<MultiThreadManagerError> exceptions = new List<MultiThreadManagerError>();
            int size = this.Threads.Count;
            CountdownLatch cdl = new CountdownLatch(size);
            for (int i = 0; i < size; i++)
            {
                ThreadStart t = this.Threads[i];
                string threadName = this.NamePrefix + "-" + i;

                ThreadCompletedHandler threadCompeltedHandler = new ThreadCompletedHandler(delegate(string name) { Console.WriteLine("Thread " + name + " completed"); cdl.Signal(); });
                ThreadErrordHandler threadErrorHandler = new ThreadErrordHandler(delegate(string name, Exception ex) { exceptions.Add(new MultiThreadManagerError(name, ex)); Console.WriteLine("Thread " + name + " failed("); cdl.Signal(); });
                ThreadRunner tr = new ThreadRunner(t, threadName, this.EachThreadTimeout, threadCompeltedHandler, threadErrorHandler);
                tr.run();
            }
            cdl.Wait();
            if (exceptions.Count > 0)
            {
                throw new MultiThreadManagerException(exceptions);
            }
        }
    }
    public class MultiThreadManagerError
    {
        public MultiThreadManagerError(string name, Exception ex)
        {
            this.Name = name;
            this.Exception = ex;
        }
        public string Name;
        public Exception Exception;
    }
    public class CountdownLatch
    {
        private int remain;
        private EventWaitHandle evt;

        public CountdownLatch(int count)
        {
            remain = count;
            evt = new ManualResetEvent(false);
        }

        public void Signal()
        {
            // The last thread to signal also sets the event.
            if (Interlocked.Decrement(ref remain) == 0)
                evt.Set();
        }

        public void Wait()
        {
            evt.WaitOne();
        }
    }

    public delegate void ThreadCompletedHandler(string name);
    public delegate void ThreadErrordHandler(string name, Exception ex);
    public class ThreadRunnerTimeException : Exception
    {
        public String Name;
        public ThreadRunnerTimeException(string name)
            : base("Thread " + name + " timeout.")
        {
            this.Name = name;
        }
    }
    public class ThreadRunner
    {

        ThreadStart ThreadStart;
        string ThreadName;
        int Timeout;
        ThreadCompletedHandler CompletedHandler;
        ThreadErrordHandler ErrorHandler;

        public   ThreadRunner(ThreadStart ts, string name, int timeout, ThreadCompletedHandler completed, ThreadErrordHandler error)
        {
            this.ThreadStart = ts;
            this.ThreadName = name;
            this.Timeout = timeout;
            this.CompletedHandler = completed;
            this.ErrorHandler = error;
        }
        Exception ThreadException = null;
        public void run()
        {

            new Thread(delegate()
            {
                try
                {
                    Thread t = new Thread(new ThreadStart(StartThreadStart));
                    t.Name = this.ThreadName;
                    t.Start();

                    Console.WriteLine("Thread " + t.Name + " started");
                    bool jret = true;
                    if (this.Timeout > 0)
                        jret = t.Join(Timeout);
                    else
                        t.Join();


                    if (!jret && ThreadException == null) ThreadException = new ThreadRunnerTimeException(this.ThreadName);
                    if (ThreadException != null) throw ThreadException;
                    this.CompletedHandler(this.ThreadName);
                }
                catch (Exception ex)
                {
                    this.ErrorHandler(this.ThreadName, ex);
                }


            }).Start();
        }
        protected void StartThreadStart()
        {
            try
            {

                this.ThreadStart.Invoke();

            }
            catch (Exception ex)
            {
                this.ThreadException = ex;
            }
        }

    }
    public class MultiThreadManagerException : Exception
    {
        public List<MultiThreadManagerError> Errors;
        public MultiThreadManagerException(List<MultiThreadManagerError> errors)
            : base()
        {
            this.Errors = errors;
        }

    }
}
 

 

 

 

package pkysoft.common.thread;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 该类用于运行一组线程,该类将阻塞直到该组线程全部执行完毕。
 * This class can be used to wait on a group of Runnables until they are done executing.
 * The runnables do not have to implement a special interface, however they should wrap any exception they
 * want to throw in a RuntimeException. The real exception that is thrown there will be
 * thrown as a {@link MultiThreadManagerException} from the {@link #runAndWait()} method.
 * <p/>
 * Example:
 * <pre>
 * public class MyRunnable implements Runnable
 * {
 *    public void run()
 *    {
 *       try
 *       {
 *          // do your logic here
 *       }
 *       catch( MyLogicalException e )
 *       {
 *          throw new RuntimeException( e ); //wrap your exception in a RuntimeException
 *       }
 *    }
 * }
 * ...
 * // in your main class
 * try
 * {
 *    MultiThreadManager manager = new MultiThreadManager( runnables, "MyRunnables-" );
 *    manager.runAndWait();
 * }
 * catch( MultiThreadManagerException e )
 * {
 *    MyLogicalException ex = (MyLogicalException)e.getCause();
 * }
 * </pre>
 */
public class MultiThreadManager
{
    private static Log logger = LogFactory.getLog(MultiThreadManager.class);
    private final List<Runnable> _runnables;
    private final String _threadNamePrefix;
    private final int _timeout;
    private final int _interruptTimeout;

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for ever until all
     * runnables are done
     *
     * @param runnables
     * @param threadNamePrefix
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix)
    {
        this(runnables, threadNamePrefix, -1, -1);
    }

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
     * runnables are done
     *
     * @param runnables
     * @param threadNamePrefix
     * @param timeout          the timeout in seconds
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout)
    {
        this(runnables, threadNamePrefix, timeout, -1);
    }

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
     * runnables are done. When the current thread is interrupted, we will not wait more then <code>interruptTimeout</code>
     * for the runnables to stop.
     *
     * @param runnables
     * @param threadNamePrefix
     * @param timeout          the timeout in seconds
     * @param interruptTimeout
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout, int interruptTimeout)
    {
        _runnables = runnables;
        _threadNamePrefix = threadNamePrefix;
        _timeout = timeout;
        _interruptTimeout = interruptTimeout;
    }

    public void runAndWait() throws MultiThreadManagerException
    {
        CountDownLatch countDownLatch = new CountDownLatch(_runnables.size());
        List<Thread> threads = new ArrayList<Thread>(_runnables.size());
        ThreadExceptionHandler exceptionHandler = new ThreadExceptionHandler(threads);
        int counter = 1;
        for (Runnable runnable : _runnables)
        {
            Thread thread = new Thread(new CountDownLatchRunnable(countDownLatch, runnable), _threadNamePrefix + counter);
            thread.setUncaughtExceptionHandler(exceptionHandler);
            thread.start();
            counter++;
            threads.add(thread);
        }

        waitForAllThreadsToFinish(countDownLatch, threads);

        if (exceptionHandler.getException() != null)
        {
            //logger.debug("an exception occurred in one of the worker threads: " + exceptionHandler.getException());
            throw new MultiThreadManagerException(exceptionHandler.getException());
        }
        else
        {
            //logger.debug("no exception occurred in the worker threads");
        }
    }

    /**
     * @param countDownLatch
     * @param threads
     * @throws MultiThreadManagerException when the worker threads are still not finished after running for <code>timeout</code> seconds
     */
    private void waitForAllThreadsToFinish(CountDownLatch countDownLatch, List<Thread> threads) throws MultiThreadManagerException
    {
        try
        {
            if (_timeout == -1)
            {
                countDownLatch.await();
            }
            else
            {
                boolean success = countDownLatch.await(_timeout, TimeUnit.SECONDS);
                if (!success)
                {
                    interruptThreadsAndWait(threads, countDownLatch);
                    throw new MultiThreadManagerException("The worker threads are still not finished after waiting " + _timeout + " seconds!");
                }
            }

            // If an exception occurs, the handler is notified after the finally that lowers the counter
            // In that case, we need to wait for the treads to finish so the handler can receive the exception
            // that has been thrown.
            makeSureAllWorkerThreadsAreDead(threads);
        }
        catch (InterruptedException e)
        {
            //logger.error(e.getMessage(), e);
            interruptThreadsAndWait(threads, countDownLatch);
            throw new MultiThreadManagerException(e);
        }
    }

    private void makeSureAllWorkerThreadsAreDead(List<Thread> threads)
    {
        //logger.debug("makeSureAllWorkerThreadsAreDead() called...");
        while (stillThreadsAlive(threads))
        {
            try
            {
                for (Thread thread : threads)
                {
                    thread.join();
                }
            }
            catch (InterruptedException e)
            {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private boolean stillThreadsAlive(List<Thread> threads)
    {
        boolean result = false;
        for (Thread thread : threads)
        {
            if (thread.isAlive())
            {
                logger.debug("thread '" + thread.getName() + "' is still alive!");
                result = true;
                break;
            }
        }

        return result;
    }

    /**
     * Interrupt the threads and wait for them to finish
     *
     * @param threads
     * @param countDownLatch
     * @throws MultiThreadManagerException
     */
    private void interruptThreadsAndWait(List<Thread> threads, CountDownLatch countDownLatch)
            throws MultiThreadManagerException
    {
        interruptThreads(threads);
        try
        {
            // Wait for interrupted threads to stopAndReturn, so the unexecute will not interfere with the execute of the workers
            if (_interruptTimeout == -1)
            {
                countDownLatch.await();
            }
            else
            {
                boolean success = countDownLatch.await(_interruptTimeout, TimeUnit.SECONDS);
                if (!success)
                {
                    throw new MultiThreadManagerException("The worker threads did not stopAndReturn in " + _interruptTimeout + " seconds!");
                }
            }
        }
        catch (InterruptedException e)
        {
            logger.error(e.getMessage(), e);
            throw new MultiThreadManagerException(e);
        }
    }

    /**
     * Interrups all the threads in the list. Will not interrupt the current thread if it is part of the list
     *
     * @param threads
     */
    private void interruptThreads(List<Thread> threads)
    {
        for (Thread thread : threads)
        {
            if (thread != Thread.currentThread())
            {
                logger.debug("interrupting thread: " + thread);
                thread.interrupt();
            }
        }
    }

    /**
     * Handler for the exceptions that occur in the runnables
     */
    private class ThreadExceptionHandler implements Thread.UncaughtExceptionHandler
    {
        private List<Thread> _threads;
        private Throwable _throwable = null;

        public ThreadExceptionHandler(List<Thread> threads)
        {
            _threads = threads;
        }

        public synchronized void uncaughtException(Thread t, Throwable e)
        {
            //logger.error(e.getMessage(), e);
            if (_throwable == null)
            {
                logger.debug("uncaughtException: " + e);
                _throwable = e.getCause() != null ? e.getCause() : e;
                interruptThreads(_threads);
            }
            else
            {
                logger.debug("Current thread is already interrupted");
            }
        }

        public Throwable getException()
		{
			return _throwable;
		}
	}
}

 

 

分享到:
评论

相关推荐

    多线程面试题

    在Java编程领域,多线程是面试中常见且重要的知识点,尤其对于系统设计和高并发处理的岗位至关重要。本文将围绕“多线程面试题”这一主题,深入探讨相关概念、技术及其应用。 1. **线程的概念**:线程是程序执行的...

    多线程全面遍历磁盘文件

    本文将深入探讨如何利用多线程技术高效地遍历包括隐藏文件和系统文件在内的所有磁盘文件。 首先,我们要理解多线程的基本概念。在单线程环境下,程序执行是顺序的,而多线程则允许多个任务同时进行,提高了计算机...

    多线程控制红绿灯变化

    综上所述,"多线程控制红绿灯变化"是一个涵盖多线程同步、通信、状态管理和异常处理等多个关键点的编程挑战,对于理解和实践并发编程有很好的学习价值。通过解决这个问题,开发者能够提升自己在处理并发问题时的能力...

    C#和JAVA的相同点和不同点_八个文档详细比较

    5. **多线程**:Java 提供了Thread类和ExecutorService,C# 则有System.Threading命名空间,包含Task和ThreadPool。两者都支持异步编程,但C#的async/await更简洁易用。 6. **异常处理**:Java和C#都采用try-catch-...

    多线程控制progressbar

    在编程领域,尤其是在UI设计和用户体验优化中,`ProgressBar`是一个非常...这就是多线程控制`ProgressBar`的核心思想和实现方式。在cs1MultiProcess文件中,可能包含了具体的代码示例和相关资源,供进一步学习和参考。

    C#和 Java比较

    在多线程处理上,C#有`Task`和`Thread`等并发模型,而Java有`Thread`类和`ExecutorService`,两者都提供了同步和异步编程的能力。 总的来说,C#和Java都是强大的编程工具,它们在很多方面有相似之处,但在具体实现...

    java代码转c#

    4. **多线程**:Java有`Thread`类和`Runnable`接口,C#则有`Thread`类和`Task`类,以及`async/await`关键字,后者提供了更简洁的异步编程模型。 5. **集合框架**:Java的集合框架包括List、Set和Map,C#也有相似的`...

    C#多线程学习机制探索

    在C#编程中,多线程是一种重要的机制,它允许多个任务在同一时间并行执行,从而提高程序的效率和CPU的利用率。Windows操作系统是一个多任务环境,能够同时处理多个进程和线程。在C#中,线程是程序中的执行流,每个...

    C#的多线程机制探索

    在C#中,多线程功能主要是通过`System.Threading`命名空间提供的`Thread`类来实现的。以下是一个简单的示例,展示了如何在C#中创建和控制线程: ```csharp using System; using System.Threading; namespace ...

    单线程与多线程的区别

    在Java或C#这样的多线程环境下,可以使用如`Thread`类或`ThreadPool`来创建和管理线程,也可以利用`Runnable`接口或`ThreadStart`委托定义线程的执行逻辑。线程间的通信可以通过`Monitor`、`Mutex`、`Semaphore`等...

    多线程控制程序_并发_buriedutt_多线程_

    在计算机科学中,多线程控制是编程领域中的一个重要概念,尤其在当今的高性能计算环境中。标题"多线程控制程序_并发_buriedutt_多线程_"指出了我们探讨的主题,即如何管理和控制多线程以实现并发执行,从而提高程序...

    多线程实验_1

    在C#编程中,多线程技术是提升应用程序性能和响应能力的重要手段。在这个名为“多线程实验_1”的项目中,我们主要探讨了四种关键的多线程操作:AutoResetEvent、ManualResetEvent、Thread.Join()以及委托多线程回调...

    C#与java比较

    - **内部类**:Java 和 C# 都支持内部类的概念,即在一个类内部定义另一个类。在 Java 中,内部类可以直接访问外部类的静态和非静态成员,而 C# 中的内部类则需要明确地引用外部类的成员。 - **匿名类**:Java 支持...

    D8 D3 德卡C# java实例代码

    2. **Java编程基础**:涵盖类和对象、集合框架、IO流、多线程、网络编程、反射机制等Java特性和API。 3. **Visual Basic(VB)编程**:学习VB的基础语法、控件使用、事件驱动编程、窗体设计以及数据库访问。 4. **...

    java和c#网络编程

    在实际应用中,C#和Java都可以利用多线程或者异步处理来提高网络性能。Java的ExecutorService和Future接口,以及C#的Task类都是进行并发处理的重要工具。对于Web服务,Java有成熟的JAX-RS规范用于构建RESTful API,...

    异步多线程Demo

    在IT领域,多线程是程序设计中的一个重要概念,尤其在高性能和响应迅速的应用中,如服务器端编程、游戏开发和大规模数据处理等。多线程允许程序同时执行多个独立的任务,提高系统的并发性和资源利用率。这个“异步多...

    C#多线程探索---讲诉多线程

    例如,Web浏览器在下载Java小程序或图片的同时滚动页面,访问新页面时播放动画和声音,以及打印文件等操作,正是多线程能力的体现。这不仅提升了用户体验,也最大化了硬件资源的利用。 #### 多线程的潜在挑战 尽管...

    一些c# c++ java的笔试试题 希望对大家有用

    Java的关键知识点包括:垃圾回收机制、多线程、集合框架、IO流、网络编程、反射、泛型、设计模式等。笔试试题可能让你设计一个线程安全的数据结构,或者实现一个基于TCP/IP的简单网络通信程序。 这些笔试试题可以...

    C#多线程编程速成

    C#提供了强大的多线程支持,开发者可以通过`System.Threading`命名空间中的`Thread`类来创建和管理线程。以下是一些基本的操作示例: 1. **创建线程** 创建线程的基本步骤是定义线程体函数,并使用`Thread`类的...

Global site tag (gtag.js) - Google Analytics