什么是ETL
ETL 是常用的数据处理,在以前的公司里,ETL 差不多是数据处理的基础,要求非常稳定,容错率高,而且能够很好的监控。ETL的全称是 Extract,Transform,Load, 一般情况下是将乱七八糟的数据进行预处理,然后放到储存空间上。可以是SQL的也可以是NoSQL的,还可以直接存成file的模式。
一开始我的设计思路是,用几个cron job和celery来handle所有的处理,然后将我们的log文件存在hdfs,还有一些数据存在mysql,大概每天跑一次。核心是能够scale,稳定,容错,roll back。我们的data warehouse就放在云上,就简单处理了。
有了自己的ETL系统我觉得就很安心了,以后能够做数据处理和机器学习方面就相对方便一些。
问题来了
一开始我设计的思路和Uber一开始的ETL很像,因为我觉得很方便。但是我发觉一个很严重的问题,我一个人忙不过来。首先,要至少写个前端UI来监控cron job,但是市面上的都很差。其次,容错的autorestart写起来很费劲,可能是我自己没有找到一个好的处理方法。最后部署的时候相当麻烦,如果要写好这些东西,我一个人的话要至少一个月的时间,可能还不是特别robust。在尝试写了2两天的一些碎片处理的脚本之后我发觉时间拖了实在太久了。
隆重推荐的工具
airbnb是我很喜欢的公司,他们有很多开源的工具,airflow我觉得是最实用的代表。airflow 是能进行数据pipeline的管理,甚至是可以当做更高级的cron job 来使用。现在一般的大厂都不说自己的数据处理是ETL,美其名曰 data pipeline,可能跟google倡导的有关。airbnb的airflow是用python写的,它能进行工作流的调度,提供更可靠的流程,而且它还有自带的UI(可能是跟airbnb设计主导有关)。话不多说,先放两张截图:
什么是DAG
airflow里最重要的一个概念是DAG。
DAG是directed asyclic graph,在很多机器学习里有应用,也就是所谓的有向非循环。但是在airflow里你可以看做是一个小的工程,小的流程,因为每个小的工程里可以有很多“有向”的task,最终达到某种目的。在官网中的介绍里说dag的特点:
- Scheduled: each job should run at a certain scheduled interval
- Mission critical: if some of the jobs aren’t running, we are in trouble
- Evolving: as the company and the data team matures, so does the data processing
- Heterogenous: the stack for modern analytics is changing quickly, and most companies run multiple systems that need to be glued together
YEAH! It's awesome, right? After reading all of these, I found it was perfectly fit Prettyyes.
如何安装
安装airflow超级简单,使用pip就可以,现在airflow的版本是1.6.1,但是有个小的bug,这个之后会告诉大家如何修改。
pip install airflow
这里有个坑,因为airflow涉及到很多数据处理的包,所以会安装pandas和numpy(这个Data Scientist应该都很熟悉)但是国内pip install 安装非常慢,用douban的源也有一些小的问题。我的解决方案是,直接先用豆瓣的源安装numpy 和 pandas,然后再安装airflow,自动化部署的时候可以在requirements.txt 里调整顺序就行了
如何运行
摘自官方网站
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
然后你就可以上web ui查看所有的dags,来监控你的进程。
如何导入dag
一般第一次运行之后,airflow会在默认文件夹下生成airflow文件夹,然后你只要在里面新建一个文件dag就可以了。我这边部署在阿里云上的文件tree大概是这个样子的。
以下是我自己写的我们公司prettyyes里需要每天处理log的其中一个小的dag:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
import ConfigParser
config = ConfigParser.ConfigParser()
config.read('/etc/conf.ini')
WORK_DIR = config.get('dir_conf', 'work_dir')
OUTPUT_DIR = config.get('dir_conf', 'log_output')
PYTHON_ENV = config.get('dir_conf', 'python_env')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.today() - timedelta(days=1),
'retries': 2,
'retry_delay': timedelta(minutes=15),
}
dag = DAG('daily_process', default_args=default_args, schedule_interval=timedelta(days=1))
templated_command = "echo 'single' | {python_env}/python {work_dir}/mr/LogMR.py"\
.format(python_env=PYTHON_ENV, work_dir=WORK_DIR) + " --start_date {{ ds }}"
task = BashOperator(
task_id='process_log',
bash_command=templated_command,
dag=dag
)
写好之后,只要将这个dag放入之前建立好的dag文件夹,然后运行:
python <dag_file>
来确保没有语法错误。在测试里你可以看到我的
schedule_interval=timedelta(days=1)
这样我们的数据处理的任务就相当于每天跑一次。更重要的是,airflow还提供处理bash处理的接口外还有hadoop的很多接口。可以为以后连接hadoop系统提供便利。很多具体的功能可以看官方文档。
其中的一个小的bug
airflow 1.6.1有一个网站的小的bug,安装成功后,点击dag里的log会出现以下页面:
这个只要将
airflow/www/utils.py
文件替换成最新的airflow github上的utils.py文件就行,具体的问题在这个:
fixes datetime issue when persisting logs
使用supervisord进行deamon
airflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码。
[program:airflow_web]
command=/home/kimi/env/athena/bin/airflow webserver -p 8080
[program:airflow_scheduler]
command=/home/kimi/env/athena/bin/airflow scheduler
我觉得airflow特别适合小的团队,他的功能强大,而且真的部署方便。和hadoop,mrjob又可以无缝连接,对我们的业务有很大的提升。
相关推荐
该项目使用Apache Airflow为Sparkify构建数据管道,该数据管道可自动执行并监视ETL管道的运行。 ETL从S3以JSON格式加载歌曲和日志数据,并在Reshift上以星型模式将数据处理到分析表中。 星型模式已用于允许Sparkify...
数据管道气流项目描述一家音乐流媒体公司Sparkify决定是时候向其数据仓库ETL管道引入更多的自动化和监视功能,并得出结论,达到此目的的最佳工具是Apache Airflow。 他们已决定将您带入项目,并期望您创建可动态使用...
一家音乐流媒体公司Sparkify决定是时候对其数据仓库ETL管道引入更多的自动化和监视功能,并得出结论,实现这一目标的最佳工具是Apache Airflow。 他们决定将您带入项目,并期望您创建动态的,由可重用任务构建,可...
2. 如何通过 Airflow UI 监控 data pipeline (管道)并对其进行故障排除 3. 什么是 Airflow Platform ? 4. Airflow 是如何进行数据分析,追踪数据,调试数据流的? 5. Airflow 命令行接口的基本操作有哪些?
Airflow 是一个使用 python 语言编写的 data pipeline 调度和监控工作流的平台。Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具, 不需要知道业务数据的具体内容,设置任务...
ETL数据管道来处理StreetEasy数据作者:Raviteja Kurva 项目简介: 在线房地产公司有兴趣了解user enagagement通过分析用户的搜索模式,以发送电子邮件的目标与有效的搜索用户。... 用户搜索历史记录和相关数据的每日...
使用Apache Airflow创建数据库 Udacity数据工程师纳米学位项目5 介绍 该项目包括创建数据库,从s3存储桶中提取数据以及将... data_quality.py :将语句存储到数据质量检查中。 sql_queries.py :存储ETL管道SQL查询。
nyc-taxi-data-pipeline关于项目纽约出租车的数据管道历史数据此存储库提供脚本,用于下载和预处理从纽约市开始的过去三年中数十亿次出租车的数据。原始数据来自纽约市出租车和豪华轿车委员会的建于Python火花空气...
在Python编程语言中,有许多强大的工具和库可以构建这样的数据处理流水线,例如Pandas、NumPy、Apache Spark以及Airflow等。 首先,让我们了解数据管道(Data Pipeline)的基本概念。数据管道是一系列将原始数据...
Python的`airflow`或`Luigi`是常用的工作流管理系统,它们可以定义任务依赖关系,自动化调度,并提供可视化的任务监控界面。 此外,为了保证数据的安全性和合规性,数据加密、访问控制和审计日志也是必不可少的。...
Python库databand是数据工程领域的一个强大工具,专注于自动化数据管道的构建、测试和监控。这个库的主要目标是让数据工程师能够更高效地管理他们的数据工作流程,从而提高整个团队的数据生产力。版本0.52.2是该库的...
10. **编排工具**:如果应用是容器化的,可能涉及Kubernetes或Docker Compose来管理容器。 在提供的文件名 "simple_pipeline_app-main" 中,"main"常常指代程序的主要入口点,这可能是整个应用的启动脚本,负责初始...
使用测试+ CI工作流程开发Airflow DAG 该代码是对的文章的补充。 我建议您阅读以更好地理解代码以及我认为如何设置项目的方式。 该项目 以下是该项目将要完成的摘要。 我们将模拟来自电子商务的一些虚假交易数据...
GoodReads数据管道建筑学管道包括各种模块:ETL职位Redshift仓库模块分析模块概述使用Goodreads Python包装器从Goodreads API实时捕获数据...使用Redshift暂存表并在Data Warehouse表上执行UPSERT操作以更新数据集。
LR使用LibSVM格式的数据集, 采用 TFRecords + tf.data.Dataset + model + tf_model_server的tensorflow编程模型。 FM分别使用了csv和LibSVM两种格式的数据,采用 tf.placeholder / tf.Sparse_placeholder+ model + ...
`dbnd-spark` 是一个用于管理和监控Spark作业的数据科学工具,它扩展了Data and Pipeline (Databand) 框架的功能,使Spark任务的开发、测试和运维变得更加便捷。Databand是一个自动化数据工程工作流程的开源平台,...
在这个项目中,我们将Data Modeling与Cassandra结合使用,并使用Python构建ETL管道。 我们将围绕要获取答案的查询建立数据模型。 对于我们的用例,我们需要以下答案: 获取在特定会话期间在音乐应用程序历史记录中...
生成的模式可确保跨表的数据一致性和引用完整性,并且将成为分析查询和BI工具的真实来源。 此外,该数据还丰富了来自第三方数据源的人口统计和天气数据。 整个过程使用Apache Spark,Amazon Redshift和Apache ...
无需应用内数据处理:命令行工具是与数据库和数据进行交互的主要工具。 基于Python的单机流水线执行。 无需分布式任务队列。 易于调试和输出记录。 基于成本的优先级队列:首先运行具有较高成本(基于记录的运行...
6. **数据管道(Data Pipeline)**:使用` luigi `, `Airflow` 或者 `Dask` 等工具可以构建数据处理的流水线,它们可以帮助管理复杂的数据处理流程,确保各个步骤按顺序执行,且易于维护和扩展。 7. **版本控制**:...