论坛首页 Java企业应用论坛

对chainsaw中一个简单Job Scheduler的扩展

浏览 1692 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-10-14   最后修改:2011-10-15

    今天在看apache chainsaw这个项目的源代码时,无意中发现了一个非常简单的Job Scheduler的实现,源代码可以看这里:http://svn.apache.org/repos/asf/logging/chainsaw/trunk/src/main/java/org/apache/log4j/scheduler/ ,其中一个是Scheduler,另一个是Job接口。

 

    Scheduler介绍道:

/**
* A simple but still useful implementation of a Scheduler (in memory only).
* <p/>
* This implementation will work very well when the number of scheduled job is
* small, say less than 100 jobs. If a larger number of events need to be
* scheduled, than a better adapted data structure for the jobList can give
* improved performance.
*
* @author Ceki
*/

 

    测试一下这个Scheduler,写一个非常简单的SimpleJob来实现Job接口。

package cn.lettoo.scheduler;

import java.text.SimpleDateFormat;
import java.util.Date;

public class SimpleJob implements Job {

    private String name;

    public SimpleJob(String name) {
        this.name = name;
    }

    public void execute() {
        Date now = new Date(System.currentTimeMillis());

        System.out.println(String.format("%s: %s executed by thread %s",
                SimpleDateFormat.getDateTimeInstance().format(now), this.name,
                Thread.currentThread().getName()));
    }

}

 

    再写一个测试类:

package cn.lettoo.scheduler;

public class JobTest {

    public static void main(String[] args) {
        Scheduler scheduler = new Scheduler();
        
        Job job1 = new SimpleJob("job1");
        scheduler.schedule(job1, System.currentTimeMillis(), 5000);
        
        scheduler.start();
    }

}

 

    *这里的scheduler.schedule(job1, System.currentTimeMillis(), 5000);表示立即运行,且每5秒运行一次。

    执行结果如下:

2011-10-14 22:12:58: job1 executed by thread Thread-0
2011-10-14 22:13:03: job1 executed by thread Thread-0
2011-10-14 22:13:08: job1 executed by thread Thread-0
......

 

    这样一个简单的Job Scheduler就实现了,但我发现这样只是一个单线程的Job Scheduler,假如我每个Job运行时间是10秒,而间隔是5秒,同时有多个Job运行的话,这个Scheduler的效率还是很差的。

 

    改动一下SimpleJob,让Job运行时sleep 10秒钟,来模拟job运行10秒。

public void execute() {
        Date now = new Date(System.currentTimeMillis());

        System.out.println(String.format("%s: %s executed by thread %s",
                SimpleDateFormat.getDateTimeInstance().format(now), this.name,
                Thread.currentThread().getName()));
        
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

    同时,在JobTest中创建多个job,并且让Scheduler去执行:

public static void main(String[] args) {
        Scheduler scheduler = new Scheduler();
        
        Job job1 = new SimpleJob("job1");
        scheduler.schedule(job1, System.currentTimeMillis(), 5000);
        
        Job job2 = new SimpleJob("job2");
        scheduler.schedule(job2, System.currentTimeMillis() + 1000, 5000);
        
        Job job3 = new SimpleJob("job3");
        scheduler.schedule(job3, System.currentTimeMillis() + 2000, 5000);
        
        Job job4 = new SimpleJob("job4");
        scheduler.schedule(job4, System.currentTimeMillis() + 3000, 5000);
        
        Job job5 = new SimpleJob("job5");
        scheduler.schedule(job5, System.currentTimeMillis() + 4000, 5000);
        
        scheduler.start();
    }

     再运行:

2011-10-14 22:21:51: job1 executed by thread Thread-0
2011-10-14 22:22:01: job2 executed by thread Thread-0
2011-10-14 22:22:11: job3 executed by thread Thread-0
2011-10-14 22:22:21: job4 executed by thread Thread-0
2011-10-14 22:22:31: job5 executed by thread Thread-0
2011-10-14 22:22:41: job1 executed by thread Thread-0
......

     可以看到,虽然我设置的job运行间隔都是5秒,但由于job本身要执行10秒,同时有多个job在排队执行,实现上job1的间隔已经到了50秒才执行。这样肯定是不行的。

 

    那么,使用多线程应该就可以解决这个问题了,加入线程池。让每个job都由线程池里的一个线程去执行。

    Scheduler源代码里,执行Job的方法是这样的:

/**
     * Run scheduler.
     */
    public synchronized void run() {
        while (!shutdown) {
            if (jobList.isEmpty()) {
                linger();
            } else {
                ScheduledJobEntry sje = (ScheduledJobEntry) jobList.get(0);
                long now = System.currentTimeMillis();
                if (now >= sje.desiredExecutionTime) {
                    executeInABox(sje.job);
                    jobList.remove(0);
                    if (sje.period > 0) {
                        sje.desiredExecutionTime = now + sje.period;
                        schedule(sje);
                    }
                } else {
                    linger(sje.desiredExecutionTime - now);
                }
            }
        }
        // clear out the job list to facilitate garbage collection
        jobList.clear();
        jobList = null;
        System.out.println("Leaving scheduler run method");
    }

    /**
     * We do not want a single failure to affect the whole scheduler.
     * @param job job to execute.
     */
    void executeInABox(final Job job) {
        try {
            job.execute();
        } catch (Exception e) {
            System.err.println("The execution of the job threw an exception");
            e.printStackTrace(System.err);
        }
    }

     可以看到,只要在executeInABox的方法里,使用线程池的线程来执行job,就可以了。现在加一个Scheduler的子类,我加上一个 ExecutorService来实现线程池。同时我重写了executeInABox的方法,使用一个Runnable的实现类JobThread来运 行job的execute方法。

package cn.lettoo.scheduler;

import java.util.concurrent.ExecutorService;

public class ThreadPoolScheduler extends Scheduler {
   
    private ExecutorService  pool;

    public ThreadPoolScheduler(ExecutorService  pool) {
        super();
        this.pool = pool;
    }

    @Override
    void executeInABox(final Job job) {
        pool.execute(new JobThread(job));
    }
    
    class JobThread implements Runnable {

        private Job job;
        
        public JobThread(Job job) {
            this.job = job;
        }
        
        public void run() {
            try {
                this.job.execute();
            } catch (Exception e) {
                System.err.println("The execution of the job threw an exception");
                e.printStackTrace(System.err);
            }
        }
        
    }
}

 

    再修改JobTest:

        // 创建一个可缓存的线程池
        ExecutorService pool = Executors.newCachedThreadPool();
        // 构造带线程池的Scheduler
        ThreadPoolScheduler scheduler = new ThreadPoolScheduler(pool);
        .......
         scheduler.start();

 

    再运行,结果如下:

2011-10-14 22:37:50: job1 executed by thread pool-1-thread-1
2011-10-14 22:37:51: job2 executed by thread pool-1-thread-2
2011-10-14 22:37:52: job3 executed by thread pool-1-thread-3
2011-10-14 22:37:53: job4 executed by thread pool-1-thread-4
2011-10-14 22:37:54: job5 executed by thread pool-1-thread-5
2011-10-14 22:37:55: job1 executed by thread pool-1-thread-6

    可以看到,这时,job已经按我的要求,每5秒运行一次了。

 

    但再仔细一想,如果job是有状态的,我的job运行要10秒,而5秒就要再运行一次,有时我们是需要一个job完全执行完才能下一次再执行的,比如上面的job1,第一次运行完,才可以执行第二次。

 

    怎么解决这个问题?我目前的做法是在ThreadPoolScheduler里增加一个Set,存储正在执行的Job,当Job执行完成后,从这个Set中删除。在下次执行的时候,判断是否在Set中,如果在,则不执行。

private Set<Job> runningJobList = new HashSet<Job>();
    
    @Override
    void executeInABox(final Job job) {
        if (!runningJobList.contains(job)) {
            runningJobList.add(job);
            pool.execute(new JobThread(job));
        }
    }    

    class JobThread implements Runnable {

        private Job job;

        public JobThread(Job job) {
            this.job = job;
        }

        public void run() {
            try {
                this.job.execute();
                synchronized (this) {
                    runningJobList.remove(job);
                }
            } catch (Exception e) {
                System.err
                        .println("The execution of the job threw an exception");
                e.printStackTrace(System.err);
            }
        }

    }

 

    再执行:

2011-10-14 23:29:27: job1 executed by thread pool-1-thread-1
2011-10-14 23:29:28: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:29: job3 executed by thread pool-1-thread-3
2011-10-14 23:29:30: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:31: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:38: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:40: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:41: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:42: job1 executed by thread pool-1-thread-3
2011-10-14 23:29:44: job3 executed by thread pool-1-thread-1

     可以看到,这里已经避免了job在执行的时候,再次被执行。当然,也发生了其他的问题,如job1,第一次执行在23:29:27,执行过程是10秒,那 应该在23:29:37执行完,而我们要求是每5秒执行一次的话,则应该立即执行才对,可是实际上是在23:29:42才执行的。为什么会这样呢?原来, 在Scheduler中的run()方法中,只要执行了executeInABox方法之后,都会在jobList.remove(0),也就是在 job1被scheduler并且到了时间之后,即使没有被执行,但是也被从jobList里remove掉了,然后再重新加5秒再次scheduler 上,也就是在23:29:37秒job1真正执行完成时,才再次重新scheduler上,也就是在42秒执行了。这是一个问题,如果要实现这个问题,需 要重新对Scheduler的代码进行重构,即在run()方法加上对runningJobList的检查功能。我这里就没有实现,如果您有更好的方法, 欢迎指出。

 

   发表时间:2011-10-17  
推荐你看看jdk里的ScheduledThreadPoolExecutor,该有的功能都有了。

1. 多线程
2. 同一时间一个job实例完全执行完才会进行下一次调度
3. 支持任务的cancel
0 请登录后投票
   发表时间:2011-10-18  
agapple 写道
推荐你看看jdk里的ScheduledThreadPoolExecutor,该有的功能都有了。

1. 多线程
2. 同一时间一个job实例完全执行完才会进行下一次调度
3. 支持任务的cancel


谢谢推荐,一定去看看。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics