`
noble510520
  • 浏览: 56912 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

模拟Executor策略的实现

 
阅读更多

Executor作为现在线程的一个管理工具,就像管理线程的管理器一样,不用像以前一样,通过start来开启线程
Executor将提交线程执行线程分离开来,使得用户只需要提交线程,并不需要在乎怎么和什么时候开启线程

需要有以下功能:
1.查看现在开启了哪些进程
2.查看还有哪些进程未执行
3.查看现在开启线程的数量
4.查看还有多少线程未开启
5.设置执行顺序(先提交先执行,先提交后执行)
6.限制最大同时开启线程的个数
7.目前提交的线程执行完之后,关闭管理器(此过程中不允许再提交线程)
8.立即关闭管理器(正在执行的线程也立即停止)


实现原理

实现原理

Executor管理器将提交上来的线程放入线程等待区(一个LinkedList),当线程执行区中有空位时,控制线程1就会将线程等待区中的线程移除转移到线程执行区(一个LinkedList)。接着,控制线程2就会开启线程执行区中未开启的线程(start)。等到线程执行区中的线程跑完了,控制线程3就会把它从线程执行区移除出去


代码实现

import java.util.*;
import java.util.concurrent.*;
public class MyExecutor{
    //静态常量用于决定执行顺序
    public static final String FIFO="FIFO";//先进先出
    public static final String LIFO="LIFO";//后进先出
    //public static final int PRIORITY=2;//优先级
    //建立线程池
    private LinkedList<Thread> waitinglist;
    private String order;//储存指定顺序
    private int maxThreadRun;//储存最大并发运行线程数量
    //建立执行队列
    private LinkedList<Thread> runningList;
    //建立三个线程来控制Executor的运行
    private Thread checkThread;
    private Thread readyThread;
    private Thread runThread;
    //建立一个标记用来控制管理器的开关
    private boolean isShutdown=false;//关闭为true,开启为false
    private boolean isShutdownNow=false;//关闭为true,开启为false




    //构造函数
    public MyExecutor()throws IllegalArgumentException{
        this(MyExecutor.FIFO,10);//默认执行顺序为先进先出,默认最大并行线程数量为10
        }

    public MyExecutor(String order)throws IllegalArgumentException{
        this(order,10);//默认最大并行线程数量为10
        }

    public MyExecutor (String order,int maxThreadRun) throws IllegalArgumentException{
        if((order.equals(FIFO)||order.equals(LIFO))&&maxThreadRun>=1){
            this.order=order;
            waitinglist=new LinkedList<Thread>();
            this.maxThreadRun=maxThreadRun;
            runningList=new LinkedList<Thread>();
            checkThread=new CheckThread(this);
            readyThread=new ReadyThread(this);
            runThread=new RunThread(this);
            checkThread.start();
            readyThread.start();
            runThread.start();
            }
        else{
            throw new IllegalArgumentException("参数order只能为'FIFO'或'LIFO'");
            }
        }



        //提交任务
        public void execute(Thread task)throws RejectedExecutionException{
            if(state==true){
                waitinglist.offer(task);
                //count++;
                }
            else{
                throw new RejectedExecutionException("管理器已经关闭,不能再提交任务");
                }
        }
        public void execute(Runnable command)throws IllegalArgumentException,RejectedExecutionException{
            this.execute(new Thread(command));
            }


        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){
                if(order.equals(FIFO)){
                        if(waitinglist.peekFirst()!=null)
                            runningList.offer(waitinglist.pollFirst());
                }
                else{
                        if(waitinglist.peekLast()!=null)
                        runningList.offer(waitinglist.pollLast());
                }
                }
            }


        //将执行队列中的线程start
        void go(){
            try{
            for(Thread thread:runningList){
                if(thread.getState()==Thread.State.NEW){
                    thread.start();
                    }
                }
                }
                catch(Exception e){
                    //e.printStackTrace();
                    }
            }

        //检测已经结束的线程
        void isTerminated(){
            try{
            for(Thread thread:runningList){
                if(thread.getState()==Thread.State.TERMINATED){
                    runningList.remove(thread);

                    }
                }
                }
                catch(Exception e){
                    //e.printStackTrace();
                    }
            }



    //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
    public void shutdown(){
        isShutdown=true;
        }


    //试图停止所有正在执行的活动任务
    public void shutdownNow(){
        isShutdownNum=true;
        for(Thread thread:waitinglist){
            thread.interrupt();
            }
        for(Thread thread:runningList){
            thread.interrupt();
            }
        }

    //获取管理器状态
    public boolean isShutdown(){
        return isShutdown;
        }
    public boolean isShutdownNow(){
        return isShutdownNow;
        }


    //获取正在等待的线程名字
    public String getWaitingThreadName(){
        return waitinglist.toString();
        }

    //查看现在在执行的线程的名字
    public String getRunningThreadName(){
        return runningList.toString();
        }

    //查看还有多少个线程在等待
    public int getWaitingThreadNum(){
        return waitinglist.size();
        }


    //获取正在执行的线程数量
    public int getRunningThreadNum(){
        return runningList.size();
        }

    //查看管理器中的线程空了没有
    public boolean isEmpty(){
        return (getWaitingThreadNum()==0&&getRunningThreadNum()==0)?true:false;
        }
    }
//三个控制线程的代码
//建立一个线程用来检测runningList中的线程是否已经结束了
class CheckThread extends Thread{
    private MyExecutor executor;
    CheckThread(MyExecutor executor){
        this.executor=executor;
        setName("CheckThread");
        this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.isTerminated();
            Thread.yield();
            }
        }
    }

//建立一个线程用来将提交的线程送进执行队列
class ReadyThread extends Thread{
    private MyExecutor executor;
    ReadyThread(MyExecutor executor){
            this.executor=executor;
            setName("ReadyThread");
            this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.ready();
            Thread.yield();
            }
        }
    }

//建立一个线程用来将执行队列中的线程开启
class RunThread extends Thread{
    private MyExecutor executor;
    RunThread(MyExecutor executor){
            this.executor=executor;
            setName("RunThread");
            this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.go();
            Thread.yield();
            }
        }
    }

几个需要解释的地方

如何控制执行顺序?

首先执行顺序在初始化的时候就需要确定,然后设置一个变量order把这个顺序储存起来
下面看看实现的代码

        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){
                if(order.equals(FIFO)){
                        if(waitinglist.peekFirst()!=null)
                            runningList.offer(waitinglist.pollFirst());//如果是先进先出,取出等待区中第一个线程
                }
                else{
                        if(waitinglist.peekLast()!=null)
                        runningList.offer(waitinglist.pollLast());//如果是后进先出,取出等待区中最后一个线程
                }
                }
            }

从代码上看,执行顺序实际上是在,将线程从等待区中取出到执行区的过程中控制的
先判断order,然后使用不同的poll方法(pollFirst或者是pollLast)

怎么限制最大同时开启线程的个数?

最大同时开启线程的个数也是在实例化管理器对象的时候就需要确定的(否则,默认的最大同时开启线程的个数为10个)
然后,将设置的值储存在变量maxThreadRun中
下面看看代码怎么实现

        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){//当执行区的大小小于最大可同时运行线程的数量时,才能放的进

从代码上看出,实际上也是将线程从等待区中取出到执行区的过程中控制的

为什么要有一个线程来将结束的线程移除出执行区?

因为!!!当执行区中的线程跑完了之后,这个线程对象仍然是在执行区中存在的,所以如果不把结束的线程移除出去,那么提交任务几毫秒后,执行区就会爆满了,不清理的话,等待区的线程也进不来


几个需要注意的地方

转移线程的时候要判断线程是否为空

代码位置:将线程从等待区中取出到执行区中的过程

//FIFO的情况
                        if(waitinglist.peekFirst()!=null)//等待区第一个位置的线程不能为空
                            runningList.offer(waitinglist.pollFirst());//如果是先进先出,取出等待区中第一个线程
//LIFO的情况
                        if(waitinglist.peekLast()!=null)//等待区最后一个位置的线程不能为空
                        runningList.offer(waitinglist.pollLast());//如果是后进先出,取出等待区中最后一个线程

为什么不能将空线程放进执行区呢?
因为这样子,空线程在执行区中start和判断这个线程是否结束的时候(getState()==Thread.State.TERMINATED),会抛出NullPointerException空指针异常,会无缘无故占领了执行区的空间,抛出异常和处理异常也会浪费时间

而且不知道为什么,如果不判断的话,会发生阻塞
我想了想,想到了一个不靠谱的解释:
在主线程提交线程给executor之前,executor一直在把空的线程丢进执行区,然后执行区一直在处理异常,等待区也一直在把空线程丢给执行区,这样子也就没有现象出现
可是这样的话,迟早也会有现象出现的,不可能一直都阻塞在那里啊??

遍历线程的容器会抛出ConcurrentModificationException异常

ConcurrentModificationException这个异常是什么呢?
当遍历线程的容器时,会发生这个异常
这个异常存在的意义是不要我们遍历线程的容器,因为如果对装有线程的容器发生修改(比如,移除啊),就会使得线程没有执行
下面看看API的解释:
某个线程在 Collection 上进行迭代时,通常不允许另一个线性修改* Collection*。通常在这些情况下,迭代的结果是不确定的

API很粗暴的,只要循环体中或者迭代器中,遍历的是Collection的时候,就会直接抛出这个异常
所以当开发的时候,没有对容器线程做出修改,那么直接处理忽视掉这个异常吧

线程一定要适当的yield()切换线程

yield()这个方法的用处是:暂停正在执行的线程,切换给别的线程跑跑
如果不用这个方法的话,会出现阻塞
正在执行的那个线程不放cpu,其他的线程也就执行不到了
可是这样子也不会发生阻塞啊,只是运行的慢一点而已

主线程不能轻易的修改执行优先级

我发现,当把主线程(main线程)的优先级改到最低或者较低,很容易出现阻塞
这是为什么捏??

当把可同时开启的线程数量调到1或2

此时又会发生阻塞了
为什么呢?
我想想的是,这样子,控制线程就需要频繁的从等待区中取出线程,也要频繁的将执行区的已结束的线程移除出去
可是这样子也不会发生阻塞啊,只是运行的慢一点而已
真烦!!

<script type="text/javascript"> $(function () { $('pre.prettyprint code').each(function () { var lines = $(this).text().split('\n').length; var $numbering = $('<ul/>').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('<li/>').text(i)); }; $numbering.fadeIn(1700); }); }); </script>
分享到:
评论

相关推荐

    Java 模拟线程并发

    Java 模拟线程并发是编程领域中的一个重要概念,尤其在多核处理器和高并发应用中,理解并熟练掌握线程并发技术对于提升程序性能至关...在实际开发中,应根据具体需求选择合适的并发策略,以实现最佳性能和资源利用率。

    自定义实现Java线程池1-模拟jdk线程池执行流程1

    首先,Java中的线程池设计始于JDK 5.0,主要通过`java.util.concurrent`包中的`Executor`接口实现。这个接口仅有一个`execute()`方法,用于提交执行任务。我们也将遵循这个设计,实现一个简单的线程池类`...

    java实现操作系统进程之间的调度

    总的来说,虽然Java不直接支持操作系统级别的进程调度,但通过巧妙地使用Java的并发库和线程机制,我们可以模拟和实现各种调度策略,满足不同的并发编程需求。无论是简单的FCFS,还是复杂的优先级调度,都可以在Java...

    操作系统调度算法C#实现.pdf

    为了实现一个调度算法,你需要根据选择的策略更新`executor`对象的状态,例如更新`haveexected`值并计算新的`waittime`。然后,根据所选算法(如FCFS、SJF、HRN或RR),调整进程队列并选择下一个执行的进程。 在...

    操作系统调度算法C#实现可用.pdf

    在这个C#实现中,我们可以看到一个简单的模拟系统,用于演示几种基本的调度算法。这里主要涉及了两个类:`Method`和`executor`。 `Method`类提供了一个通用的方法`AddColumnRow`,用于在Windows Forms应用程序中...

    java 模拟人工服务台(线程连接池)

    在Java编程中,模拟人工服务台常常涉及到线程管理和资源优化,这通常通过线程池技术来实现。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。这样做可以有效地减少创建...

    Hive实战模拟电商数据

    Hive的主要组件包括Hive Metastore、Driver、Compiler、Executor和HDFS。Hive的查询语句会经过编译、优化,最后转化为MapReduce任务运行在Hadoop集群上。 2. **数据加载**:在电商数据分析中,我们首先需要将零售...

    操作系统调度算法java

    在Java中实现这样的调度算法,需要使用数据结构来模拟进程队列,例如使用`LinkedList`或`ArrayDeque`作为基础结构。此外,还需要一个计时器来跟踪和执行时间片切换。在Java中,可以使用`ScheduledExecutorService`...

    java队列实现

    在实际应用中,我们还可以根据需求调整线程池参数,比如设置超时策略、拒绝策略等,以优化性能和资源利用。此外,`ScheduledThreadPoolExecutor`则可以用于定时或周期性地执行任务。 总之,通过Java的并发库和队列...

    Spring 异步多线程动态任务处理的使用心得

    在IT行业中,Spring框架是Java开发者的首选工具之一,它为构建企业级应用程序提供了强大的支持。在现代应用中,高效地处理并发任务是至关重要的...动态调整任务处理策略和监控任务执行情况也是保持系统健康运行的关键。

    Spring3.0 mvc 定时器及多线程任务demo

    // 模拟耗时操作 return new AsyncResult("任务完成"); } } ``` 在这个例子中,`longRunningTask`方法将在单独的线程中异步执行,不会阻塞主线程。 三、结合使用定时器和多线程 在某些场景下,定时任务可能需要...

    AsyncDemo.zip

    `getAsyncExecutor()`用于配置异步任务的Executor,它定义了任务调度策略,如线程池大小、队列容量等。默认情况下,Spring Boot会提供一个简单配置的ThreadPoolTaskExecutor。 ```java @Configuration @EnableAsync...

    自定义mybatis

    总结来说,自定义MyBatis涉及的主要知识点包括:动态代理机制、Java反射、SQL映射、Executor执行策略、以及单元测试。通过这样的自定义,开发者可以根据项目需求调整MyBatis的行为,提高代码的可维护性和扩展性。在...

    Concurrent_and_Real-Time_Programming_in_Java.rar_java programmin

    在Java中实现RTOS功能,可以使用Java的并发库模拟信号量和互斥量,例如Semaphore和ReentrantLock。任务调度可以通过线程优先级或自定义调度器来实现,但Java标准库并不直接支持抢占式调度,这通常需要依赖于特定的...

    Mesos资源共享平台

    主节点根据预设的分配策略(如公平共享或优先级策略)决定资源的分配,并通过可插拔的分配模块支持自定义策略。每个框架包含一个调度程序(scheduler)和执行器进程(executor)。调度程序与主节点交互,接受资源...

    关于线程池的代码demo

    线程池是多线程编程中的一个重要概念,它在Java中通过`java.util.concurrent`包中的`ExecutorService`接口及其实现类实现。线程池的使用能够有效地管理和控制线程资源,避免频繁创建和销毁线程带来的性能开销,提高...

    哲学家算法-java

    3. **实现同步策略**:根据不同的并发控制策略,如避免或检测死锁,实现哲学家获取和释放筷子的方法。 4. **主程序**:创建哲学家线程并启动,模拟哲学家随机地吃饭和思考,同时保证系统不会陷入死锁。 在提供的...

    自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)

    为了克服这个问题,我们可以采用异步处理策略。本文将详细介绍如何使用Java中的`ConcurrentLinkedQueue`数据结构,结合Spring Boot的特性,实现异步处理浏览器发送的请求。 首先,`ConcurrentLinkedQueue`是Java并...

    多线程 售票系统

    在本场景中,我们讨论的是一个模拟火车站售票系统的多线程实现。这个系统通过多线程来模拟现实中售票窗口的并发操作,提高系统效率,提供更好的用户体验。 **线程的创建** 线程是操作系统分配处理器时间的基本单位...

Global site tag (gtag.js) - Google Analytics