`
abu
  • 浏览: 11934 次
  • 性别: Icon_minigender_1
  • 来自: 广州
最近访客 更多访客>>
社区版块
存档分类
最新评论

简单的类似crontab的调度器

阅读更多
最近要在python里写一些定时跑的任务, 不太想用crontab来调度,也找不到好的调度开源代码。
所以自己花了一点时间,写了一个简单的实现。
代码如下:

# coding=utf-8
import threading, datetime, logging, time
 
class Scheduler(object):
 
    def schedule(self,trigger,func):
        '''将func任务放入调度列表,根据trigger触发运行
        '''
        pass
 
    def start(self):
        '''开启调度
        '''
        pass
 
    def stop(self):
        '''关闭调度
        '''
        pass
 
class SimpleScheduler(Scheduler):
    def __init__(self):
        self.tasks = []
        self.isRun = False
 
    def schedule(self,trigger,func):
        '''将func任务放入调度列表,根据trigger触发运行
        #    参数说明:
        #    trigger:    允许 字符串(触发表达式)或者继承了Trigger的类
        #                当为字符串时,与linux 的crontab相似
        #                   如:"15 12 * * *" 五部分,分别为: 分,时,日,月, 周(周为1至7,周一为1, 周日为7)
        #                   用空格隔开。
        #                    
        #                    支持的特殊字条:
        #                    * 代表任何,  
        #                    , 表示多个情况,   15,16,17 表示三个都可以
        #                    - 表示范围  1-10
        #                    */5 表示每隔5单位时间
        #
        #    func:    可执行的函数
        '''
        if(type(trigger) == type("")):
            self.tasks.append([CronTrigger(trigger),func])
        else:
            self.tasks.append([trigger,func])
 
    def stop(self):
        self.isRun = False
 
    def start(self):
        self.isRun = True
        timer = threading.Timer(1, self._run)
        timer.start()
 
    def _run(self):
        while(self.isRun):
            now = datetime.datetime.now()
            logging.debug("SimpleScheduler: %d:%d" %(now.hour,now.minute))
            self._schedule(now)
            time.sleep(60)
 
    def _schedule(self,now):
        for task in self.tasks:
            try:
                if task[0].trigger(now):
                    self._doTask(now,task[1])
            except Exception,e:
                logging.error(str(e))
 
    def _doTask(self,now,func):
        logging.info('do task: time=%s,func=%s' % (now.isoformat(' '),func.func_name))
        timer = threading.Timer(1, func)
        timer.start()
 
class Trigger(object):
    def trigger(self,time):
        return False
 
class CronTrigger(Trigger):
    def __init__(self, exp):
        if len(exp.split(' ')) != 5 :
            raise Exception('the expression is error : ' + exp) 
        try:
            self._analyzeExp(exp.split(' '))
        except Exception,e:
            raise Exception('the expression is error : ' + exp) 
 
    def trigger(self, time):
        nows = [time.isoweekday(),time.month, time.day, time.hour, time.minute]
        nows.reverse()
        result = [n in t for (n,t) in zip(nows,self.triggers)]
        return all(result) 
 
    def _analyzeExp(self, exp):
        self.triggers = []
        self.triggers.append(self._analyzeSingle(exp[0],0,59))
        self.triggers.append(self._analyzeSingle(exp[1],0,23))
        self.triggers.append(self._analyzeSingle(exp[2],1,31))
        self.triggers.append(self._analyzeSingle(exp[3],1,12))
        self.triggers.append(self._analyzeSingle(exp[4],1,7))
 
    def _analyzeSingle(self, single, min, max):
        if single == '*' :
            return range(min,max+1)
        elif ',' in single:
            return [int(x) for x in single.split(',')]
        elif '-' in single:
            parts = single.split('-')
            maxPart = max
            if int(parts[1]) < maxPart:
                maxPart = int(parts[1])
            minPart = min
            if int(parts[0]) > minPart:
                minPart = int(parts[0])
            return range(minPart, maxPart+1)
        elif '*/' in single:
            den = int(single[2:])
            return [x for x in range(min,max+1) if x%den == 0]
        else :
            return [int(single),]



原文地址(我的blog): http://www.abublog.info/?p=25

1
1
分享到:
评论
1 楼 guozhiwei 2011-04-06  
我写过一个多进程版本的 http://guozhiwei.iteye.com/blog/960709

相关推荐

    在windows下配置crontab

    【在Windows下配置crontab】的知识点主要集中在如何在Windows环境中使用Cygwin来实现类似于Linux中的计划任务管理。Crontab是Linux和Unix系统中用于管理周期性任务的工具,而在Windows系统中,由于Dos Scripts的功能...

    jCrontab-用于定时调度的示例

    - **创建调度器**:实例化CronTabScheduler对象,这是调度任务的主要入口。 - **定义任务**:创建实现了Runnable或Callable接口的任务类,并编写任务逻辑。 - **设置调度规则**:利用CronExpression类解析cron表达式...

    Quartz作业调度器

    2. **CronTrigger**:CronTrigger允许使用类似于Linux crontab的表达式来定义任务的执行时间。例如,“0 0/5 * * * ?”表示每5分钟执行一次。这种触发器非常灵活,可以满足复杂的时间调度需求。 3. **SimpleTrigger...

    详解在dotnet core实现类似crontab的定时任务

    前段需要在业务中实现某些时间段的简单定时任务,类似crontab的调度,因为业务会放在docker中,所以不想用直接用crontab,在网上搜了一下,发现一个开源的实现 Pomelo.AspNetCore.TimedJob,使用简单,但是因为是...

    易语言 Crontab 模块源码.zip

    在【易语言 Crontab 模块源码】中,开发者可以了解到如何在易语言中构建一个类似于 Unix 系统 crontab 功能的模块。它包含了定时任务的创建、修改、删除以及执行逻辑。源码分析将涉及以下几个核心知识点: 1. **...

    Node.js-nodejs定时调度任务

    它支持类似于Unix cron表达式的语法,让我们可以灵活地定义何时运行任务。下面我们将详细探讨如何使用`node-cron`以及相关的编程概念。 1. **安装**:首先,你需要通过npm(Node.js的包管理器)来安装`node-cron`库...

    SimpleCron:适用于Windows客户端的简单计划程序,由类似crontab的文件配置-开源

    Windows PC的简单调度程序实用程序,可在指定的时间/天为当前登录的用户执行指定的命令。 操作规范广泛遵循crontab语法,例如https://en.wikipedia.org/wiki/Cron中所述。 丢失的任务将始终在下一个可能的时间(即...

    quartz/Cron/Crontab表达式在线生成工具

    总的来说,Cron表达式和Quartz的组合为开发者提供了一种强大且灵活的定时任务解决方案,无论是在简单的日常调度还是复杂的业务流程中,都能发挥出其应有的价值。使用在线生成工具,可以更便捷地创建和测试Cron表达式...

    linux使用crontab实现PHP执行计划定时任务

    安装Crontab服务的方法简单,通常使用包管理器如yum: ``` yum install crontabs ``` 安装完成后,可以使用以下命令来管理Crontab服务: ``` /sbin/service crond start // 启动服务 /sbin/service crond stop // ...

    分布式任务调度系统调研

    3. **灵活的时间规则支持**:除了传统的Crontab表达式之外,JobX还支持Quartz表达式,这为用户提供了更多的灵活性。 4. **任务执行状态实时监控**:能够实时展示任务执行的状态,便于及时发现并解决问题。 5. **任务...

    易语言-易语言 Crontab 定时任务执行模块 v1.2 支持单位秒 也可做计时器

    Crontab是Unix/Linux系统中用于设置周期性被执行任务的工具,但在易语言中,这个模块实现了类似的功能,使得开发者能在Windows环境下模拟Crontab的定时任务调度。定时任务在各种软件应用中非常常见,如自动备份、...

    仿制Linux中定时任务Crontab v1.0-易语言

    "仿制Linux中定时任务Crontab v1.0-易语言"项目旨在为易语言用户提供一个类似Linux Crontab功能的实现,让用户在易语言环境下也能享受到类似的功能。 易语言是一款中国本土开发的编程语言,其目标是让编程变得简单...

    Go-gocron-使用Go语言开发的轻量级定时任务集中调度和管理系统

    4. **启动调度器**:使用`Start`方法启动调度器,这样任务就会按照设定的时间间隔运行: ```go gocron.Start() ``` 5. **任务管理**:如果需要暂停或恢复任务,可以使用`Stop`和`Resume`方法: ```go gocron....

    cron4j 2.2.3源代码,API下载集合包

    总结,cron4j作为一个轻量级的Java任务调度框架,不仅提供了类似crontab的定时任务配置方式,还拥有易用的API和高效的执行机制。通过理解和掌握其工作原理及API使用,开发者可以轻松地在Java应用中实现复杂的时间...

    Cron表达式选择器JS插件

    Cron表达式是Unix系统中的一种时间调度语法,用于定义任务的执行计划。这个JS插件结合了BootStrap的样式,提供了一种直观且美观的界面,使得非技术人员也能轻松理解和配置Cron表达式。 ### Cron表达式详解 Cron...

    package-crontab:ZN Framework Crontab软件包

    在 ZN Framework Crontab 软件包中,开发者可以方便地创建、管理和调度各种周期性的任务,无需深入理解原生的 Crontab 文件格式和命令行操作。它提供了一个友好的 API 和面向对象的接口,使得在 PHP 应用中处理定时...

    Go-cronsun是一个分布式任务系统单个结点和*nix机器上的crontab近似

    Go-cronsun是一个高效且易于使用的分布式任务系统,它为*nix环境(如Linux、Unix等)提供了类似crontab的功能,但具有更强大的特性和扩展性。这个系统设计用于单个节点以及分布式部署,为那些寻求更高级别任务管理和...

    Python任务调度利器之APScheduler详解

    下面是一个简单的APScheduler使用示例,展示了如何创建一个后台任务调度器,每隔10秒执行一次`job_func`函数: ```python from apscheduler.schedulers.background import BackgroundScheduler from apscheduler....

    Linux定时任务Crontab命令使用详解与总结

    系统级别的定时任务配置在`/etc/crontab`文件中,格式与用户crontab类似,但可能包含更多的环境变量和任务分类。例如,可以定义不同用户组的任务,或者设置系统维护任务。 **Crontab命令技巧** 1. **注释**: 在...

    Python任务调度模块APScheduler使用

    7. **关闭调度器**:`shutdown()`方法用于停止调度器,如果`wait`参数设为`False`,则调度器会立即关闭,不会等待当前运行的任务完成。 APScheduler的强大在于它的灵活性和可扩展性,可以根据项目需求选择适合的...

Global site tag (gtag.js) - Google Analytics