`

Luigi --基于Python语言的流式任务调度框架教程

阅读更多

Luigi:

 

author: vincentzhwg@gmail.com

date: 2014.5.6

 

blog已迁移,最新的Luigi教程更新在:http://guan58.com/archives/38

 

### web

        https://github.com/spotify/luigi

 

### intro

        Luigi是基于python语言的,可帮助建立复杂流式批处理任务管理系统。它主要提供了以下功能:任务依赖管理、工作流管理、任务可视化、错误故障处理机制、命令行交互等。Luigi的主要目的是为了解决需要长期运行的流式批处理任务的管理。你可以链接很多个任务,使它们自动化,并进行故障管理。上面所说的任务可以是任何类型的任务,通常来说有如下几种:Hadoop任务、从数据库导入或导出、机器学习算法训练等。

        

### API概览

        在Luigi中有两个基础类:Task, Target. 另外,Parameter类对于如何控制Task类的运行是一个重要的类。

 

        Target:

                广义地讲,Target可对应为磁盘上的文件,或HDFS上文件,或checkpoint点,或数据库等。对于Target来说,唯一需要实现的方法为exists,返回为True表示存在,否则不存在返回为False.

                在实际应用时,写一个Target子类是很少需要用到的。直接使用开箱即可用的LocalTarget及 hdfs.HdfsTarget类就够用了。Luigi提供了Gzip支持,通过参数format=format.Gzip即可。

        

        Task:

                Task是任务逻辑运行的地方,提供了一些方法来定义任务的逻辑行为,主要有run, output, requires.

                Task通过类名及参数值做为标识符进行唯一区分。实际上,在同一个worker中,两个拥有相同类名及相同参数值的task不单单只是equal,而且实际上还是同一个实例。然而,如果参数在构建声明时指定了参数 significant=False ,对于Task的标识是不起影响的。对于多个Task,它们的类名相同,只是指定了 significant=False 的参数值才不同,而未指定 significant=False 的参数值是相同的,对于这些Task来说,它们拥有相同的标识符,即 hash(taskA) == hash(taskB) 是True的,但它们来自于不同的实例。

                Task.requires:

                        requires方法用来指定依赖关系,除了可指定对其他Task的依赖,还可指定为对自身Task的依赖。requires返回值可为 dicts/lists/tuples 或其他类别的封装。

                Task.output:

                        output方法返回一个或多个的Target对象,类似于requires方法,可返回适应于实际需要的对于Target的任何封装。实际上,建议只返回一个Target,因为如果返回多个,atomicity将会被丢失,除非Task能够确保多个Target能被原子性地创建。当然,如果原子性不是非常重要的时候,那么就可以放心地返回多个Target。

                Task.run:

                        run方法包含实际真正执行的代码。注意到,Luigi将任何事情切分为两个阶段,首先它指出在tasks之间的依赖关系,然后它运行每一件事情。 input() 方法是一个内部帮助方法,用来替代在requires 中的对象的对应输出。

 

        

        Parameter:

                在Python语言中,参数通常是在constructor时提供,但Luigi要求在类级别上声明所需的参数。通过这样子的要求,Luigi通过处理这些模板规范化的代码来为constructor提供所需参数。

                Python是个无需指定类型的语言(Python是个强类型动态语言),对于参数无需指定类型。对于Luigi来说,可以简单地使用 luigi.Parameter 即可,之所以存在 DateParameter 的原因,是为了在命令行交互时,确保命令行参数的值可以转换为对应的类型。

        

        Events and callbacks:

                Luigi内置了事件系统,允许注册callback到event中,并触发它们在所定义的tasks中,可挂接到一些预定义好的事件中,或者自定义事件中。每一个event被绑定到一个Task类中,将会被该Task类或其子类所触发。

 

        Instance caching:

                对于实例,Luigi提供了Instance caching。对于同一个标识符的task,就算在代码中实例化创建了两次,但实际上只会创建一个实例,这个是有必要的,确保了task只会被执行一次。

 

 

### Execution Model

        Luigi拥有一个非常简单的运行模型,最重要的一个方面就是没有执行转移。当执行一个Luigi的工作流,worker调度所有的tasks,并在同样的这些进程内执行这些tasks。受益于这种模式,非常容易对所有执行任务进行debug。并且,开发过程也相当简单。在开发过程中,通常通过命令行来运行Luigi,而当你布署时,可以通过crontab或任何其他的调度器来调度。这种模式所带来的不好的地方,在于Luigi不能自由地进行扩展,不过Luigi认为扩展应该交给Task去实现;另外一个不好的地方就是Luigi需要依赖于外部的调度器来触发工作流,如crontab等。

        

 

### Lugic Patterns

        Code Reuse

                Luigi的一个好处,是非常容易依赖于其他库中所定义的tasks。在执行路径上非常容易进行分叉,其中一项任务的输出可以成为很多其他任务的输入。

                同时,Luigi任务的输出都将被无限期地保存。这点的好处就是当后面的任务失败时,在重跑失败任务时可以重用前面任务的输出,而不需要重跑前面的任务。不好的地方在于,将会有大量的中间结果保存在系统上,一个比较有用的建议就是把这些输出保存在一个特定的目录中,并进行定期地清除。

        

        Triggering Mang Tasks

                一个常见的用例是每晚要运行一个Hadoop任务,但有时因为各种原因该任务会失败。一个有用的模式就是在最后建立一个虚拟任务,仅需声明依赖于最近多天之间的实际真正的任务。

 

 

### Configuration

        所有的配置均可由两个配置文件进行指定,一个是在当前工作目录下的 client.cfg ,另一个则为 /etc/luigi 。当前工作目录下的 client.cfg 高于 /etc/luigi 。

 

        配置选项有:

                default-scheduler-host : 默认的scheduler

                error-email : 当crash时会收到eamil,但在命令行下运行时则没有。

                luigi-history : 如果设置了该选项,值为一文件名,将为记录一些东西(当前仅有job id)在mapreduce任务的输出目录下。

                如果想在Python下运行Hadoop mapreduce任务,需要指定streaming jar的路径。

 

        

        发送error告警邮件的配置示例:

                为了能够在发生error时能够发送邮件,得满足两个条件:一个是输出不能从终端直接输出,得重定向到其他地方;另外一个是在配置文件中进行了相关配置,配置示例如下。

                [core]

                error-email: receiver@xxx.com

                email-sender: sender@xxx.com

                smtp_ssl: False

                smtp_host: smtp.xxx.com

                smtp_port: 25

                smtp_login: sender@xxx.com

                smtp_password: sender_password

 

        

        日志配置示例:

                如果不配置一个日志配置文件的话,会经常报一个提示:No handlers could be found for logger "luigi-interface" 。其中一种配置使用方式如下:

                在工程根目录下建立一个 conf 目录,在其下生成 logging.conf 文件,内容如下:

 [formatters]
keys=simple,detail

[handlers]
keys=file_rotate

[loggers]
keys=root

[formatter_simple]
format=%(levelname)s: %(message)s

[formatter_detail]
format=%(asctime)s %(levelname)s [%(filename)s %(lineno)d]: %(message)s

[handler_file_rotate]
formatter=detail
class=handlers.RotatingFileHandler
args=(os.environ['MAIN_ROOT_DIR'] + "/logs/main.log", 'a', 1024 * 1024 * 500, 1)

[logger_root]
handlers=file_rotate
#level=DEBUG
level=INFO
#level=WARNING

 

 

                在入口执行的py文件中放入以下代码:           

#### MAIN_ROOT_DIR is used in loggin.conf for relative log file path
MAIN_ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
os.environ['MAIN_ROOT_DIR'] = MAIN_ROOT_DIR
loggingConfigFile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conf/logging.conf')
logging.config.fileConfig( loggingConfigFile, disable_existing_loggers = False)
logger = logging.getLogger( __name__ )

 

 

 

### 现阶段不足

        Luigi关注于批处理任务,所以对于实时流式处理及长时间一直运行的处理的帮助不大。

        Luigi假设每一个task是工作中的相当大的一块数据处理。对于调度几千个任务是可以的,但将其扩展到数万个任务的调度是不太可行的。可通过把不同类别的任务拆分成组,同一组或多组的tasks跑在一个Luigi实例上,通过布多个Luigi实例来解决此问题。

        Luigi对于任务调度及任务执行之间进行了严格地划分。动态的for循环及分支在Luigi中是不太容易实现的,举个例子,遍历一个数值计算任务直到它收敛是非常棘手的。

 

 

### 花边

        Luigi是世界上最有名排第二的水管工的名字,中文译名为路易奇或路奇,有着“永远的老二”的外号。至于第一是谁,想必大家都猜到了吧 ^_^

 

 

### 代码示例

 

        当在 Task.requires 下返回多个依赖的task时,在return语句中,越排在后面的反而先执行,代码示例如下:

class Z(luigi.Task):
    def requires(self):
        return S(), P()      ### 在执行时, 先执行 P 任务, 之后再执行 S 任务。所以当对依赖任务的执行顺序有要求时,请注意这里的排列顺序。

 

 

        启动server并以daemon方式运行的shell命令例子,注意将其中的一些路径换成实际路径,不要放在/tmp目录下,以免当luigid实例被kill掉时,所保存的log及state文件因重启系统而丢失了:

luigid --pidfile /tmp/luigid.pid --logdir /tmp/luigi/log/ --state-path /tmp/luigi/state --background

 

 

 

        在central scheduler模式下,提交task到server中的代码示例:

sch = luigi.rpc.RemoteScheduler(host=..., port=...)
w = luigi.worker.Worker(scheduler=sch)
w.add(task)
w.run()

 

 

         将本地文件put到hdfs上的代码示例:

import luigi, luigi.hdfs
from datetime import datetime

## 本示例将本地的 /tmp/abc_%Y%m%d.txt 文件上传到 HDFS 上的 /test/abc_%Y%m%d 路径

class BBF(luigi.ExternalTask):
    date = luigi.Parameter()

    def output(self):
        date = datetime.strptime(self.date, '%Y-%m-%d')
        return luigi.LocalTarget( date.strftime("/tmp/abc_%Y%m%d.txt") )
    

class BF(luigi.Task):
    date = luigi.Parameter()

    def requires(self):
        return BBF(self.date)

    def output(self):
        date = datetime.strptime(self.date, '%Y-%m-%d')
        return luigi.hdfs.HdfsTarget(date.strftime("/test/abc_%Y%m%d.txt"))

    def run(self):
        hdfsClient = luigi.hdfs.HdfsClientApache1()
        hdfsClient.put( self.input().path, self.output().path )
 

 

 

        执行hive语句的代码示例:

import luigi, luigi.hdfs, luigi.hive
from datetime import datetime

## 本示例是在hive上执行建立表分区的语句, 在 test 库的 abc 表上建立三个字段 pyearmonth, pday, pappid 对应的分区,并指向hdfs上的目录路径为 /test/abc 所对应的具体分区路径下

class P(luigi.hive.HiveQueryTask):
    date = luigi.Parameter()
    appid = luigi.Parameter()

    def output(self):
        date = datetime.strptime(self.date, '%Y-%m-%d')
        return luigi.hive.HivePartitionTarget(table='abc', partition={'pyearmonth':date.strftime("%Y%m"), 'pday':date.strftime("%d"), 'pappid':self.appid}, database='test')


    def query(self):
        date = datetime.strptime(self.date, '%Y-%m-%d')
        return """
            USE test;
            ALTER TABLE abc ADD IF NOT EXISTS PARTITION ( pyearmonth='{ym}', pday='{d}', pappid='{appid}' ) LOCATION '/test/abc/pyearmonth={ym}/pday={d}/pappid={appid}';
            """.format(ym=date.strftime("%Y%m"), d=date.strftime("%d"), appid=self.appid)
 

 

 

 

### 安装

        sudo apt-get install build-essential python-dev python-daemon python-setuptools libcurl4-gnutls-dev librtmp-dev

 

        以下包的安装,后面括号内空为安装步骤的简单记录:

        pycares      (./build_inplace;  python setup.py build; sudo python setup.py install)

        pycurl       (python setup.py build;   sudo python setup.py install)

        unittest2     (python setup.py build;   sudo python setup.py install)

        futures        (python setup.py build;   sudo python setup.py install)

        Monotime     (python setup.py build;   sudo python setup.py install)

        Twisted       (sudo python setup.py install)

        backports.ssl_match_hostname      (python setup.py build;   sudo python setup.py install)

        tornado      (python setup.py build;   sudo python setup.py install)

        mechanize        (python setup.py build;   sudo python setup.py install)

        simplejson (这个可提高 luigi 的json性能,在luigi源码中有时在使用json时要是有simplejson则使用simplejson模块,否则使用自带的json库,但自带的json库性能比较差)

        luigi  ( 从 github上下载zip包,解压开之后, sudo python setu.py install )

 

 

分享到:
评论

相关推荐

    PyPI 官网下载 | luigi-swf-0.12.4.tar.gz

    首先,"luigi-swf"是一个基于Python的库,主要设计用于构建和执行工作流任务。这个版本为0.12.4,表明它是经过多次迭代和优化后的稳定版本。通常,版本号的更新反映了软件修复了已知问题、增加了新功能或者提高了...

    Python库 | dbnd_luigi-0.56.1-py2.py3-none-any.whl

    在本案例中,我们关注的是名为"dbnd_luigi"的特定库,其版本为0.56.1,它是一个Python软件包,封装在名为"dbnd_luigi-0.56.1-py2.py3-none-any.whl"的压缩文件中。这个文件格式(.whl)是Python的一种二进制分发格式...

    PyPI 官网下载 | dbnd-luigi-0.37.2.tar.gz

    "dbnd-luigi-0.37.2.tar.gz" 是一个从PyPI官方下载的压缩包,其名称揭示了它的核心内容:dbnd-luigi,这是一个基于版本0.37.2的Python库。 首先,我们要了解什么是dbnd-luigi。dbnd-luigi是一个数据任务构建和调度...

    PyPI 官网下载 | luigi-daisy-0.0.3.tar.gz

    luigi-daisy库可能与另一个知名的Python库Luigi有关,Luigi是一个用于构建大型任务调度的开源工具,常用于数据处理管道。"daisy"可能表示这个库是Luigi的一个扩展或插件,提供了额外的功能或者针对某种特定场景进行...

    PyPI 官网下载 | dbnd-luigi-0.28.16.tar.gz

    **PyPI 官网下载 | dbnd-luigi-0.28.16.tar.gz 知识点详解** PyPI(Python Package Index)是Python社区最常用的软件包仓库,它为开发者提供了一个集中发布Python模块的地方。在本场景中,我们关注的是名为"dbnd-...

    PyPI 官网下载 | dbnd-luigi-0.30.1.tar.gz

    总的来说,`dbnd-luigi-0.30.1.tar.gz`提供了一个强大且灵活的数据处理框架,它扩展了Luigi的功能,帮助数据团队更高效地管理和执行复杂的工作流程。通过这个库,开发者可以专注于编写实际的数据处理逻辑,而无需...

    PyPI 官网下载 | luigi-1.0.9.tar.gz

    《PyPI官网下载 | luigi-1.0.9.tar.gz——深入了解Python库luigi》 在Python的世界里,PyPI(Python Package Index)是广大开发者的重要资源库,它为Python开发者提供了丰富的第三方库,方便大家进行软件开发。今天...

    PyPI 官网下载 | queenbee-luigi-0.5.5.tar.gz

    《PyPI官网下载:queenbee-luigi-0.5.5.tar.gz——探索Python库在分布式环境中的应用》 PyPI(Python Package Index)是Python开发者的重要资源库,它为全球的Python开发者提供了丰富的开源软件包。"queenbee-luigi...

    Python库 | dbnd_luigi-0.51.0-py2.py3-none-any.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:dbnd_luigi-0.51.0-py2.py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | dbnd_luigi-0.37.0-py2.py3-none-any.whl

    资源分类:Python库 所属语言:Python 资源全名:dbnd_luigi-0.37.0-py2.py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | dbnd_luigi-0.28.15-py2.py3-none-any.whl

    资源分类:Python库 所属语言:Python 资源全名:dbnd_luigi-0.28.15-py2.py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | dbnd_luigi-0.30.5-py2.py3-none-any.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:dbnd_luigi-0.30.5-py2.py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python-Dask多任务并行编程与任务调度

    **Python-Dask:多任务并行编程与任务调度** Dask是Python中一个强大的并行计算框架,设计用于处理大规模数据集。它不仅提供了一种简单的方式来分解大型任务,还能在分布式环境中有效地执行这些任务,这使得Dask...

    Python库 | ingaia_luigi_slack-0.1.9-py3-none-any.whl

    Luigi是一个Python编写的任务调度器,专为大数据管道设计,支持复杂的工作依赖关系,而Slack则是一个流行的团队协作平台,提供实时通信功能。通过`ingaia_luigi_slack`,开发者可以将Luigi的任务执行状态无缝地同步...

    luigi-server:路易吉中央调度器

    luigi_history_db: sqlite:///usr/local/var/luigi-task-hist.db luigi_pid_file: /usr/local/var/luigi.pid luigi_record_task_history: True luigi_state_file: /usr/local/var/luigi-state.pickle # enabled task...

Global site tag (gtag.js) - Google Analytics