最近要在python里写一些定时跑的任务, 不太想用crontab来调度,也找不到好的调度开源代码。
所以自己花了一点时间,写了一个简单的实现。
代码如下:
# coding=utf-8
import threading, datetime, logging, time
class Scheduler(object):
def schedule(self,trigger,func):
'''将func任务放入调度列表,根据trigger触发运行
'''
pass
def start(self):
'''开启调度
'''
pass
def stop(self):
'''关闭调度
'''
pass
class SimpleScheduler(Scheduler):
def __init__(self):
self.tasks = []
self.isRun = False
def schedule(self,trigger,func):
'''将func任务放入调度列表,根据trigger触发运行
# 参数说明:
# trigger: 允许 字符串(触发表达式)或者继承了Trigger的类
# 当为字符串时,与linux 的crontab相似
# 如:"15 12 * * *" 五部分,分别为: 分,时,日,月, 周(周为1至7,周一为1, 周日为7)
# 用空格隔开。
#
# 支持的特殊字条:
# * 代表任何,
# , 表示多个情况, 15,16,17 表示三个都可以
# - 表示范围 1-10
# */5 表示每隔5单位时间
#
# func: 可执行的函数
'''
if(type(trigger) == type("")):
self.tasks.append([CronTrigger(trigger),func])
else:
self.tasks.append([trigger,func])
def stop(self):
self.isRun = False
def start(self):
self.isRun = True
timer = threading.Timer(1, self._run)
timer.start()
def _run(self):
while(self.isRun):
now = datetime.datetime.now()
logging.debug("SimpleScheduler: %d:%d" %(now.hour,now.minute))
self._schedule(now)
time.sleep(60)
def _schedule(self,now):
for task in self.tasks:
try:
if task[0].trigger(now):
self._doTask(now,task[1])
except Exception,e:
logging.error(str(e))
def _doTask(self,now,func):
logging.info('do task: time=%s,func=%s' % (now.isoformat(' '),func.func_name))
timer = threading.Timer(1, func)
timer.start()
class Trigger(object):
def trigger(self,time):
return False
class CronTrigger(Trigger):
def __init__(self, exp):
if len(exp.split(' ')) != 5 :
raise Exception('the expression is error : ' + exp)
try:
self._analyzeExp(exp.split(' '))
except Exception,e:
raise Exception('the expression is error : ' + exp)
def trigger(self, time):
nows = [time.isoweekday(),time.month, time.day, time.hour, time.minute]
nows.reverse()
result = [n in t for (n,t) in zip(nows,self.triggers)]
return all(result)
def _analyzeExp(self, exp):
self.triggers = []
self.triggers.append(self._analyzeSingle(exp[0],0,59))
self.triggers.append(self._analyzeSingle(exp[1],0,23))
self.triggers.append(self._analyzeSingle(exp[2],1,31))
self.triggers.append(self._analyzeSingle(exp[3],1,12))
self.triggers.append(self._analyzeSingle(exp[4],1,7))
def _analyzeSingle(self, single, min, max):
if single == '*' :
return range(min,max+1)
elif ',' in single:
return [int(x) for x in single.split(',')]
elif '-' in single:
parts = single.split('-')
maxPart = max
if int(parts[1]) < maxPart:
maxPart = int(parts[1])
minPart = min
if int(parts[0]) > minPart:
minPart = int(parts[0])
return range(minPart, maxPart+1)
elif '*/' in single:
den = int(single[2:])
return [x for x in range(min,max+1) if x%den == 0]
else :
return [int(single),]
原文地址(我的blog):
http://www.abublog.info/?p=25
分享到:
相关推荐
【在Windows下配置crontab】的知识点主要集中在如何在Windows环境中使用Cygwin来实现类似于Linux中的计划任务管理。Crontab是Linux和Unix系统中用于管理周期性任务的工具,而在Windows系统中,由于Dos Scripts的功能...
- **创建调度器**:实例化CronTabScheduler对象,这是调度任务的主要入口。 - **定义任务**:创建实现了Runnable或Callable接口的任务类,并编写任务逻辑。 - **设置调度规则**:利用CronExpression类解析cron表达式...
2. **CronTrigger**:CronTrigger允许使用类似于Linux crontab的表达式来定义任务的执行时间。例如,“0 0/5 * * * ?”表示每5分钟执行一次。这种触发器非常灵活,可以满足复杂的时间调度需求。 3. **SimpleTrigger...
前段需要在业务中实现某些时间段的简单定时任务,类似crontab的调度,因为业务会放在docker中,所以不想用直接用crontab,在网上搜了一下,发现一个开源的实现 Pomelo.AspNetCore.TimedJob,使用简单,但是因为是...
在【易语言 Crontab 模块源码】中,开发者可以了解到如何在易语言中构建一个类似于 Unix 系统 crontab 功能的模块。它包含了定时任务的创建、修改、删除以及执行逻辑。源码分析将涉及以下几个核心知识点: 1. **...
总的来说,Cron表达式和Quartz的组合为开发者提供了一种强大且灵活的定时任务解决方案,无论是在简单的日常调度还是复杂的业务流程中,都能发挥出其应有的价值。使用在线生成工具,可以更便捷地创建和测试Cron表达式...
Crontab是Unix/Linux系统中用于设置周期性被执行任务的工具,但在易语言中,这个模块实现了类似的功能,使得开发者能在Windows环境下模拟Crontab的定时任务调度。定时任务在各种软件应用中非常常见,如自动备份、...
Windows PC的简单调度程序实用程序,可在指定的时间/天为当前登录的用户执行指定的命令。 操作规范广泛遵循crontab语法,例如https://en.wikipedia.org/wiki/Cron中所述。 丢失的任务将始终在下一个可能的时间(即...
它支持类似于Unix cron表达式的语法,让我们可以灵活地定义何时运行任务。下面我们将详细探讨如何使用`node-cron`以及相关的编程概念。 1. **安装**:首先,你需要通过npm(Node.js的包管理器)来安装`node-cron`库...
安装Crontab服务的方法简单,通常使用包管理器如yum: ``` yum install crontabs ``` 安装完成后,可以使用以下命令来管理Crontab服务: ``` /sbin/service crond start // 启动服务 /sbin/service crond stop // ...
3. **灵活的时间规则支持**:除了传统的Crontab表达式之外,JobX还支持Quartz表达式,这为用户提供了更多的灵活性。 4. **任务执行状态实时监控**:能够实时展示任务执行的状态,便于及时发现并解决问题。 5. **任务...
"仿制Linux中定时任务Crontab v1.0-易语言"项目旨在为易语言用户提供一个类似Linux Crontab功能的实现,让用户在易语言环境下也能享受到类似的功能。 易语言是一款中国本土开发的编程语言,其目标是让编程变得简单...
4. **启动调度器**:使用`Start`方法启动调度器,这样任务就会按照设定的时间间隔运行: ```go gocron.Start() ``` 5. **任务管理**:如果需要暂停或恢复任务,可以使用`Stop`和`Resume`方法: ```go gocron....
总结,cron4j作为一个轻量级的Java任务调度框架,不仅提供了类似crontab的定时任务配置方式,还拥有易用的API和高效的执行机制。通过理解和掌握其工作原理及API使用,开发者可以轻松地在Java应用中实现复杂的时间...
Cron表达式是Unix系统中的一种时间调度语法,用于定义任务的执行计划。这个JS插件结合了BootStrap的样式,提供了一种直观且美观的界面,使得非技术人员也能轻松理解和配置Cron表达式。 ### Cron表达式详解 Cron...
在 ZN Framework Crontab 软件包中,开发者可以方便地创建、管理和调度各种周期性的任务,无需深入理解原生的 Crontab 文件格式和命令行操作。它提供了一个友好的 API 和面向对象的接口,使得在 PHP 应用中处理定时...
Go-cronsun是一个高效且易于使用的分布式任务系统,它为*nix环境(如Linux、Unix等)提供了类似crontab的功能,但具有更强大的特性和扩展性。这个系统设计用于单个节点以及分布式部署,为那些寻求更高级别任务管理和...
下面是一个简单的APScheduler使用示例,展示了如何创建一个后台任务调度器,每隔10秒执行一次`job_func`函数: ```python from apscheduler.schedulers.background import BackgroundScheduler from apscheduler....
系统级别的定时任务配置在`/etc/crontab`文件中,格式与用户crontab类似,但可能包含更多的环境变量和任务分类。例如,可以定义不同用户组的任务,或者设置系统维护任务。 **Crontab命令技巧** 1. **注释**: 在...
7. **关闭调度器**:`shutdown()`方法用于停止调度器,如果`wait`参数设为`False`,则调度器会立即关闭,不会等待当前运行的任务完成。 APScheduler的强大在于它的灵活性和可扩展性,可以根据项目需求选择适合的...