www.aboutyun.com 活到老学到老
Airflow 使用指南
日期:2019.06.27
www.aboutyun.com 活到老学到老
问题导读
1.如何安装和配置 Airflow?
2.如何通过 Airflow UI 监控 data pipeline(管道)并对其进行故障排除
3.什么是 Airflow Platform?
4. Airflow 是如何进行数据分析,追踪数据,调试数据流的?
5. Airflow 命令行接口的基本操作有哪些?
6.什么是集成以及我们如何进行配置?
快速开始
安装是快速而直接的。
# airflow 需要 home 目录,默认是~/airflow,
# 但是如果你需要,放在其它位置也是可以的
# (可选)
export AIRFLOW_HOME = ~/airflow
# 使用 pip 从 pypi 安装
pip install apache-airflow
# 初始化数据库
airflow initdb
# 启动 web 服务器,默认端口是 8080
airflow webserver -p 8080
# 启动定时器
airflow scheduler
# 在浏览器中浏览 localhost:8080,并在 home 页开启 example dag
运行这些命令后,Airflow 将创建$AIRFLOW_HOME 文件夹,并放置一个 airflow.cfg 文件,其默认值可以
让您快速上手。您可以通过查看$AIRFLOW_HOME/airflow.cfg 文件或者在 UI 的 Admin->Configuration 菜
单 中 检 查 相 关 配 置 。 如 果 由 systemd 启 动 , 则 webserver 的 PID 文 件 将 存 储 在
$AIRFLOW_HOME/airflow-webserver.pid 或/run/airflow/webserver.pid。
开箱即用,Airflow 使用 sqlite 数据库,由于使用此数据库后端无法进行并行化,因此您应该迅速替换
它。它与 SequentialExecutor 一起使用,但仅能按顺序运行任务实例。虽然这是非常有限的,但它允许您
快速启动和运行并浏览 UI 和命令行实用程序。
www.aboutyun.com 活到老学到老
以下是一些将触发一些任务实例的命令。在运行以下命令时,您应该能够在 example_bash_operatorDAG 中
看到任务的状态发生变化。
# 运行第一个任务实例
airflow run example_bash_operator runme_0 2015-01-01
# 运行两天的任务回填
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
安装
1.获取 Airflow:
安装最新稳定版 Airflow 的最简单方法是使用 pip :
pip install apache-airflow
您还可以安装 Airflow 的一些别的支持功能组件,例如 gcp_api 或者 postgres:
pip install apache-airflow[postgres,gcp_api]
2.额外的扩展包:
通过 PyPI 的 apache-airflow 命令下载的基本包只含有启动的基础部分内容。您可以根据您环境的需要
下载您的扩展包。例如,如果您不需要连接 Postgres,那么您就不需要使用 yum 命令安装 postgres-devel,
或者在您使用的系统上面安装 postgre 应用,并在安装中的经历一些痛苦过程。
除此之外,Airflow 可以按照需求导入这些扩展包来使用。
如下是列举出来的子包列表和他的功能:
包名
安装命令
说明
all
all_dbs
async
celery
cloudant
crypto
pip install
apache-airflow[all]
所有 Airflow 功能
pip install
apache-airflow[all_dbs]
所有集成的数据库
pip install
Gunicorn 的异步 worker
apache-airflow[async]
classes
pip install
apache-airflow[celery]
CeleryExecutor
pip install
apache-airflow[cloudant]
Cloudant hook
pip install
apache-airflow[crypto]
加密元数据 db 中的连接密码
www.aboutyun.com 活到老学到老
devel
pip install
apache-airflow[devel]
最小开发工具要求
devel_hadoo
p
druid
pip install
apache-airflow[devel_hado
Airflow + Hadoop stack 的依赖
op]
pip install
Druid.io 相关的 operators 和
apache-airflow[druid]
hooks
gcp_api
pip install
apache-airflow[gcp_api]
Google 云平台 hooks 和
operators(使用
google-api-python-c
lient )
github_enter
prise
pip install
apache-airflow[github_ent
Github 企业版身份认证
erprise]
pip install
google_auth
apache-airflow[google_aut
Google 身份认证
hdfs
hive
jdbc
h]
pip install
apache-airflow[hdfs]
pip install
apache-airflow[hive]
pip install
apache-airflow[jdbc]
HDFS hooks 和 operators
所有 Hive 相关的 operators
JDBC hooks 和 operators
kerberos
pip install
Kerberos 集成 Kerberized
apache-airflow[kerberos]
Hadoop
pip install
kubernetes
apache-airflow[kubernetes
]
Kubernetes Executor 以及
operator
ldap
mssql
mysql
pip install
apache-airflow[ldap]
用户的 LDAP 身份验证
pip install
apache-airflow[mssql]
Microsoft SQL Server operators
和 hook,作为 Airflow 后端支持
pip install
apache-airflow[mysql]
MySQL operators 和 hook,支
持作为 Airflow 后端。 MySQL
服务器的版本必须是 5.6.4+。
确切的版本上限取决于
mysqlclient 包的版本。 例
www.aboutyun.com 活到老学到老
如, mysqlclient 1.3.12
只能与 MySQL 服务器 5.6.4
到 5.7 一起使用。
pip install
apache-airflow[password]
用户密码验证
pip install
apache-airflow[postgres]
Postgres operators 和 hook,作
为 Airflow 后端支持
pip install
apache-airflow[qds]
启用 QDS(Qubole 数据服务)
支持
pip install
apache-airflow[rabbitmq]
rabbitmq 作为 Celery 后端支
持
pip install
apache-airflow[redis]
Redis hooks 和 sensors
pip install
S3KeySensor,
apache-airflow[s3]
S3PrefixSensor
pip install
apache-airflow[samba]
Hive2SambaOperator
pip install
SlackAPIPostOperato
apache-airflow[slack]
r
pip install
apache-airflow[ssh]
SSH hooks 及 Operator
pip install
apache-airflow[vertica]
做为 Airflow 后端的 Vertica
hook 支持
password
postgres
qds
rabbitmq
redis
s3
samba
slack
ssh
vertica
3.初始化 Airflow 数据库:
在您运行任务之前,Airflow 需要初始化数据库。 如果您只是在试验和学习 Airflow,您可以坚持使用默
认的 SQLite 选项。 如果您不想使用 SQLite,请查看初始化数据库后端以设置其他数据库。
配置完成后,若您想要运行任务,需要先初始化数据库:
airflow initdb
操作指南
1.设置配置选项:
第一次运行 Airflow 时,它会在$AIRFLOW_HOME 目录中创建一个名为 airflow.cfg 的文件(默认情况下为
~/airflow)。此文件包含 Airflow 的配置,您可以对其进行编辑以更改任何设置。您还可以使用以下格式
设置带有环境变量的选项:$AIRFLOW__{SECTION}__{KEY} (注意使用双下划线)。
www.aboutyun.com 活到老学到老
例如,元数据库连接字符串可以在 airflow.cfg 设置,如下所示:
[core]
sql_alchemy_conn = my_conn_string
或者通过创建相应的环境变量:
AIRFLOW__CORE__SQL_ALCHEMY_CONN = my_conn_string
您还可以通过将_cmd 附加到键来在运行时派生连接字符串,如下所示:
[core]
sql_alchemy_conn_cmd = bash_command_to_run
下列配置选项支持这种_cmd 配置办法
[core]部分中的 sql_alchemy_conn
[core]部分中的 fernet_key
[celery]部分中的 broker_url
[celery]部分中的 result_backend
[atlas]部分中的 password
[smtp]部分中的 smtp_password
[ldap]部分中的 bind_password
[kubernetes]部分中的 git_password
这背后的想法是不将密码存储在纯文本文件的框中。
所有配置选项的优先顺序如下 -
1. 环境变量
2. airflow.cfg 中的配置
3. airflow.cfg 中的命令
4. 默认
2.初始化数据库后端:
如果您想对 Airflow 进行真正的试使用,您应该考虑设置一个真正的数据库后端并切换到 LocalExecutor。
由 于 Airflow 是使 用 优 秀 的 SqlAlchemy 库 与 其 元 数 据进 行 交 互 而 构 建 的, 因 此 您 可 以 使 用 任何
SqlAlchemy 所支持的数据库作为后端数据库。我们推荐使用 MySQL 或 Postgres。
注意:我们依赖更严格的 MySQL SQL 设置来获得合理的默认值。确保在[mysqld]下的 my.cnf 中指定了
explicit_defaults_for_timestamp = 1;
注意:如果您决定使用 Postgres,我们建议您使用 psycopg2 驱动程序并在 SqlAlchemy 连接字符串中指
定它。另请注意,由于 SqlAlchemy 没有公开在 Postgres 连接 URI 中定位特定模式的方法,因此您可能
需要使用类似于 ALTER ROLE username SET search_path = airflow, foobar;的命令为您的角色设置默认
模式
www.aboutyun.com 活到老学到老
一旦您设置好管理 Airflow 的数据库以后,您需要更改 配置文件$AIRFLOW_HOME/airflow.cfg 中的
SqlAlchemy 连接字符串。然后,您还应该将“executor”设置更改为使用“LocalExecutor”,这是一个
可以在本地并行化任务实例的执行程序。
# 初始化数据库
airflow initdb
3.使用 Operators(执行器):
operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了 DAG 运行时实际执行
的内容。
(1)BashOperator
使用 BashOperator 在 Bash shell 中执行命令。
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag)
模板
您可以使用 Jinja 模板来参数化 bash_command 参数。
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
故障排除
(a)找不到 Jinja 模板
在使用 bash_command 参数直接调用 Bash 脚本时,需要在脚本名称后添加空格。这是因为 Airflow 尝试
将 Jinja 模板应用于一个失败的脚本。
t2 = BashOperator(
task_id='bash_example',
# 这将会出现`Jinja template not found`的错误
# bash_command="/home/batcher/test.sh",
# 在加了空格之后,这会正常工作
bash_command="/home/batcher/test.sh ",
www.aboutyun.com 活到老学到老
dag=dag)
(2)PythonOperator
使用 PythonOperator 执行 Python 回调。
def print_context ( ds , ** kwargs ):
pprint ( kwargs )
print ( ds )
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
task_id = 'print_the_context' ,
provide_context = True ,
python_callable = print_context ,
dag = dag )
传递参数
使用 op_args 和 op_kwargs 参数将额外参数传递给 Python 的回调函数。
def my_sleeping_function(random_base):
"""这是一个将在 DAG 执行体中运行的函数"""
time.sleep(random_base)
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
模板
当您将 provide_context 参数设置为 True,Airflow 会传入一组额外的关键字参数:一个用于每个 Jinja 模
板变量和一个 templates_dict 参数。
templates_dict 参数是模板化的,因此字典中的每个值都被评估为 Jinja 模板。