`
lettoo
  • 浏览: 35066 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
博客专栏
58ccff5b-5ca6-387a-9c99-a277f31a9e51
我和Java数据库操作的那...
浏览量:9461
社区版块
存档分类
最新评论

对chainsaw中一个简单Job Scheduler的扩展

阅读更多

    今天在看apache chainsaw这个项目的源代码时,无意中发现了一个非常简单的Job Scheduler的实现,源代码可以看这里:http://svn.apache.org/repos/asf/logging/chainsaw/trunk/src/main/java/org/apache/log4j/scheduler/ ,其中一个是Scheduler,另一个是Job接口。

 

    Scheduler介绍道:

/**
* A simple but still useful implementation of a Scheduler (in memory only).
* <p/>
* This implementation will work very well when the number of scheduled job is
* small, say less than 100 jobs. If a larger number of events need to be
* scheduled, than a better adapted data structure for the jobList can give
* improved performance.
*
* @author Ceki
*/

 

    测试一下这个Scheduler,写一个非常简单的SimpleJob来实现Job接口。

package cn.lettoo.scheduler;

import java.text.SimpleDateFormat;
import java.util.Date;

public class SimpleJob implements Job {

    private String name;

    public SimpleJob(String name) {
        this.name = name;
    }

    public void execute() {
        Date now = new Date(System.currentTimeMillis());

        System.out.println(String.format("%s: %s executed by thread %s",
                SimpleDateFormat.getDateTimeInstance().format(now), this.name,
                Thread.currentThread().getName()));
    }

}

 

    再写一个测试类:

package cn.lettoo.scheduler;

public class JobTest {

    public static void main(String[] args) {
        Scheduler scheduler = new Scheduler();
        
        Job job1 = new SimpleJob("job1");
        scheduler.schedule(job1, System.currentTimeMillis(), 5000);
        
        scheduler.start();
    }

}

 

    *这里的scheduler.schedule(job1, System.currentTimeMillis(), 5000);表示立即运行,且每5秒运行一次。

    执行结果如下:

2011-10-14 22:12:58: job1 executed by thread Thread-0
2011-10-14 22:13:03: job1 executed by thread Thread-0
2011-10-14 22:13:08: job1 executed by thread Thread-0
......

 

    这样一个简单的Job Scheduler就实现了,但我发现这样只是一个单线程的Job Scheduler,假如我每个Job运行时间是10秒,而间隔是5秒,同时有多个Job运行的话,这个Scheduler的效率还是很差的。

 

    改动一下SimpleJob,让Job运行时sleep 10秒钟,来模拟job运行10秒。

public void execute() {
        Date now = new Date(System.currentTimeMillis());

        System.out.println(String.format("%s: %s executed by thread %s",
                SimpleDateFormat.getDateTimeInstance().format(now), this.name,
                Thread.currentThread().getName()));
        
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

    同时,在JobTest中创建多个job,并且让Scheduler去执行:

public static void main(String[] args) {
        Scheduler scheduler = new Scheduler();
        
        Job job1 = new SimpleJob("job1");
        scheduler.schedule(job1, System.currentTimeMillis(), 5000);
        
        Job job2 = new SimpleJob("job2");
        scheduler.schedule(job2, System.currentTimeMillis() + 1000, 5000);
        
        Job job3 = new SimpleJob("job3");
        scheduler.schedule(job3, System.currentTimeMillis() + 2000, 5000);
        
        Job job4 = new SimpleJob("job4");
        scheduler.schedule(job4, System.currentTimeMillis() + 3000, 5000);
        
        Job job5 = new SimpleJob("job5");
        scheduler.schedule(job5, System.currentTimeMillis() + 4000, 5000);
        
        scheduler.start();
    }

     再运行:

2011-10-14 22:21:51: job1 executed by thread Thread-0
2011-10-14 22:22:01: job2 executed by thread Thread-0
2011-10-14 22:22:11: job3 executed by thread Thread-0
2011-10-14 22:22:21: job4 executed by thread Thread-0
2011-10-14 22:22:31: job5 executed by thread Thread-0
2011-10-14 22:22:41: job1 executed by thread Thread-0
......

     可以看到,虽然我设置的job运行间隔都是5秒,但由于job本身要执行10秒,同时有多个job在排队执行,实现上job1的间隔已经到了50秒才执行。这样肯定是不行的。

 

    那么,使用多线程应该就可以解决这个问题了,加入线程池。让每个job都由线程池里的一个线程去执行。

    Scheduler源代码里,执行Job的方法是这样的:

/**
     * Run scheduler.
     */
    public synchronized void run() {
        while (!shutdown) {
            if (jobList.isEmpty()) {
                linger();
            } else {
                ScheduledJobEntry sje = (ScheduledJobEntry) jobList.get(0);
                long now = System.currentTimeMillis();
                if (now >= sje.desiredExecutionTime) {
                    executeInABox(sje.job);
                    jobList.remove(0);
                    if (sje.period > 0) {
                        sje.desiredExecutionTime = now + sje.period;
                        schedule(sje);
                    }
                } else {
                    linger(sje.desiredExecutionTime - now);
                }
            }
        }
        // clear out the job list to facilitate garbage collection
        jobList.clear();
        jobList = null;
        System.out.println("Leaving scheduler run method");
    }

    /**
     * We do not want a single failure to affect the whole scheduler.
     * @param job job to execute.
     */
    void executeInABox(final Job job) {
        try {
            job.execute();
        } catch (Exception e) {
            System.err.println("The execution of the job threw an exception");
            e.printStackTrace(System.err);
        }
    }

     可以看到,只要在executeInABox的方法里,使用线程池的线程来执行job,就可以了。现在加一个Scheduler的子类,我加上一个ExecutorService来实现线程池。同时我重写了executeInABox的方法,使用一个Runnable的实现类JobThread来运行job的execute方法。

package cn.lettoo.scheduler;

import java.util.concurrent.ExecutorService;

public class ThreadPoolScheduler extends Scheduler {
   
    private ExecutorService  pool;

    public ThreadPoolScheduler(ExecutorService  pool) {
        super();
        this.pool = pool;
    }

    @Override
    void executeInABox(final Job job) {
        pool.execute(new JobThread(job));
    }
    
    class JobThread implements Runnable {

        private Job job;
        
        public JobThread(Job job) {
            this.job = job;
        }
        
        public void run() {
            try {
                this.job.execute();
            } catch (Exception e) {
                System.err.println("The execution of the job threw an exception");
                e.printStackTrace(System.err);
            }
        }
        
    }
}

 

    再修改JobTest:

        // 创建一个可缓存的线程池
        ExecutorService pool = Executors.newCachedThreadPool();
        // 构造带线程池的Scheduler
        ThreadPoolScheduler scheduler = new ThreadPoolScheduler(pool);
        .......
         scheduler.start();

 

    再运行,结果如下:

2011-10-14 22:37:50: job1 executed by thread pool-1-thread-1
2011-10-14 22:37:51: job2 executed by thread pool-1-thread-2
2011-10-14 22:37:52: job3 executed by thread pool-1-thread-3
2011-10-14 22:37:53: job4 executed by thread pool-1-thread-4
2011-10-14 22:37:54: job5 executed by thread pool-1-thread-5
2011-10-14 22:37:55: job1 executed by thread pool-1-thread-6

    可以看到,这时,job已经按我的要求,每5秒运行一次了。

 

    但再仔细一想,如果job是有状态的,我的job运行要10秒,而5秒就要再运行一次,有时我们是需要一个job完全执行完才能下一次再执行的,比如上面的job1,第一次运行完,才可以执行第二次。

 

    怎么解决这个问题?我目前的做法是在ThreadPoolScheduler里增加一个Set,存储正在执行的Job,当Job执行完成后,从这个Set中删除。在下次执行的时候,判断是否在Set中,如果在,则不执行。

private Set<Job> runningJobList = new HashSet<Job>();
    
    @Override
    void executeInABox(final Job job) {
        if (!runningJobList.contains(job)) {
            runningJobList.add(job);
            pool.execute(new JobThread(job));
        }
    }    

    class JobThread implements Runnable {

        private Job job;

        public JobThread(Job job) {
            this.job = job;
        }

        public void run() {
            try {
                this.job.execute();
                synchronized (this) {
                    runningJobList.remove(job);
                }
            } catch (Exception e) {
                System.err
                        .println("The execution of the job threw an exception");
                e.printStackTrace(System.err);
            }
        }

    }

 

    再执行:

2011-10-14 23:29:27: job1 executed by thread pool-1-thread-1
2011-10-14 23:29:28: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:29: job3 executed by thread pool-1-thread-3
2011-10-14 23:29:30: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:31: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:38: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:40: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:41: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:42: job1 executed by thread pool-1-thread-3
2011-10-14 23:29:44: job3 executed by thread pool-1-thread-1

     可以看到,这里已经避免了job在执行的时候,再次被执行。当然,也发生了其他的问题,如job1,第一次执行在23:29:27,执行过程是10秒,那应该在23:29:37执行完,而我们要求是每5秒执行一次的话,则应该立即执行才对,可是实际上是在23:29:42才执行的。为什么会这样呢?原来,在Scheduler中的run()方法中,只要执行了executeInABox方法之后,都会在jobList.remove(0),也就是在job1被scheduler并且到了时间之后,即使没有被执行,但是也被从jobList里remove掉了,然后再重新加5秒再次scheduler上,也就是在23:29:37秒job1真正执行完成时,才再次重新scheduler上,也就是在42秒执行了。这是一个问题,如果要实现这个问题,需要重新对Scheduler的代码进行重构,即在run()方法加上对runningJobList的检查功能。我这里就没有实现,如果您有更好的方法,欢迎指出。

 

分享到:
评论

相关推荐

    Chainsaw_oi yes

    《Chainsaw_oi:揭示木材加工的科技力量》 在现代林业与建筑行业中,电锯,特别是"Chainsaw_oi",扮演着至关重要的角色。它是一种高效率、高强度的工具,专为切割木材和其他硬质材料设计。本文将深入探讨"Chainsaw_...

    chainsaw软件log4j日志查看软件

    在IT行业中,日志管理是系统监控、故障排查和性能优化的重要环节,而`chainsaw`就是这样一个专门针对`log4j`日志框架的强大工具。 `log4j`是Java平台上广泛使用的日志记录框架,它允许开发者灵活地控制日志信息的...

    BitDefender.2011.Patch.3.1&Chainsaw_1

    标签 "BitDefender.2011.Patch.3.1&Chainsaw_1" 是对压缩包内容的简单重复,没有提供额外的信息,但我们知道它与2011年版的BitDefender安全更新和Chainsaw工具有关。 压缩包中的两个文件名 "BitDefender.2011.Patch...

    Chainsaw_1 asli yea

    【标题】"Chainsaw_1 asli yea" 暗示了这是一个与原始或真实的 Chainsaw_1 工具相关的主题,可能是某种软件或应用程序。"yea" 可能代表确认或强调该版本的正宗性。在IT领域,这可能是一个专门用于数据处理、分析或...

    log4j-chainsaw-1.3alpha-3.jar

    Tomcatlog4j日志文件 log4j-chainsaw-1.3alpha-3.jar

    octo-chainsaw-源码.rar

    在这个压缩包"octo-chainsaw-源码.rar"中,我们有机会一窥其内部的工作原理和设计思路,这对于任何想要提升自身技术能力、特别是对代码管理和自动化流程感兴趣的开发者来说,都是一个宝贵的资源。 源码是理解软件...

    Chainsaw CUTTER

    【压缩包子文件名称列表】中的五个文件(Chainsaw_2.zip、Chainsaw_5.zip、Chainsaw_1.zip、Chainsaw_4.zip、Chainsaw_3.zip)可能是关于链锯的详细资料,如用户手册、维修指南、操作视频、软件更新或设计图纸等。...

    Python库 | git_chainsaw-0.1.11-py3-none-any.whl

    标题中的"Python库 | git_chainsaw-0.1.11-py3-none-any.whl"指的是一个针对Python编程语言的库,名为git_chainsaw,版本为0.1.11。这个库通常是一个封装了特定功能的代码集合,方便开发者在Python项目中调用。"py3-...

    chainsaw:电锯-日志数据生成器

    "电锯",或者在英文中被称为"Chainsaw",是一个日志数据生成器,主要用在系统和应用程序的日志测试与分析场景中。它并非我们常见的木材切割工具,而是针对日志处理的一种软件工具。这个工具尤其适用于开发者和系统...

    Chainsaw:Chainsaw 是一个针对大型本体的 OWLreasoner 实现-开源

    Chainsaw 的开发位于 https://bitbucket.org/ignazio1977/chainsaw 如需最近的源代码和... Chainsaw 使用模块化和原子分解将推理任务委托给其他 OWL 2 推理器; 它利用分而治之来将本体减少到回答特定查询所需的部分。

    urban-octo-chainsaw

    标题“urban-octo-chainsaw”似乎是一个项目或软件的名称,暗示着与城市环境相关的某种技术解决方案,可能是模拟、设计或者与建筑、城市规划有关的工具。然而,由于提供的标签为空,我们无法直接获得关于这个主题的...

    chainsaw Log4J Viewer-开源

    Chainsaw是一个Log4J软件包的GUI日志查看器和过滤器。 它侦听使用SocketAppender发送的LoggingEvent对象,并将它们显示在表中。 可以根据优先级,线程名称,类别名称和消息来过滤事件。 它可以

    PyPI 官网下载 | git_chainsaw-0.1.11-py3-none-any.whl

    资源来自pypi官网。 资源全名:git_chainsaw-0.1.11-py3-none-any.whl

    CSF405 EN 2001673 P0802183-01, Rev B CHAINSAW (TOP HANDLE)

    根据给定文件的信息,我们可以总结出以下关于CSF405 EN 2001673 P0802183-01, Rev B TOP HANDLE CHAINSAW的相关知识点: ### 1. 产品概述 #### 1.1 目的 这款链锯(CSF405)设计用于切割树枝、树干、木段以及直径...

    curly-octo-chainsaw

    Octo-Chainsaw" 这个项目名听起来非常独特,可能是一个与编程相关的创意项目,结合了“curly”(可能指的是代码中的花括号),"octo"(通常与八边形或 octopus 相关,可能是对多任务处理的暗示),以及 "chainsaw"...

    redesigned-octo-chainsaw

    redesigned-octo-chainsaw

    animated-chainsaw:仅出于在DSC项目管理站点上共享文件的目的

    标题中的“animated-chainsaw”可能是一个特定的项目或软件工具的名称,它与DSC(可能是Data Science Community或者其他的缩写)项目的文件管理有关。这个项目或工具可能涉及到动画制作、图形处理、编程或者是某种...

    special-chainsaw

    "Special Chainsaw"项目就是一个这样的例子,它利用C++编程语言来构建一个特定用途的工具。这个项目的核心在于,它不仅提供了一个基础的执行框架,还允许用户通过实验性设置来调整其行为,以满足特定需求。 首先,...

    glowing-octo-chainsaw

    发光的电锯("glowing-octo-chainsaw")是一个基于JavaScript的项目,它可能涉及了前端开发、交互效果、或者是一个创意编程实验。JavaScript是一种广泛应用于Web开发的编程语言,尤其在构建动态和交互式的网页内容...

Global site tag (gtag.js) - Google Analytics