logo资料库

Airflow使用指南【About云整理】.pdf

第1页 / 共322页
第2页 / 共322页
第3页 / 共322页
第4页 / 共322页
第5页 / 共322页
第6页 / 共322页
第7页 / 共322页
第8页 / 共322页
资料共322页,剩余部分请下载后查看
www.aboutyun.com 活到老学到老 Airflow 使用指南 日期:2019.06.27
www.aboutyun.com 活到老学到老 问题导读 1.如何安装和配置 Airflow? 2.如何通过 Airflow UI 监控 data pipeline(管道)并对其进行故障排除 3.什么是 Airflow Platform? 4. Airflow 是如何进行数据分析,追踪数据,调试数据流的? 5. Airflow 命令行接口的基本操作有哪些? 6.什么是集成以及我们如何进行配置? 快速开始 安装是快速而直接的。 # airflow 需要 home 目录,默认是~/airflow, # 但是如果你需要,放在其它位置也是可以的 # (可选) export AIRFLOW_HOME = ~/airflow # 使用 pip 从 pypi 安装 pip install apache-airflow # 初始化数据库 airflow initdb # 启动 web 服务器,默认端口是 8080 airflow webserver -p 8080 # 启动定时器 airflow scheduler # 在浏览器中浏览 localhost:8080,并在 home 页开启 example dag 运行这些命令后,Airflow 将创建$AIRFLOW_HOME 文件夹,并放置一个 airflow.cfg 文件,其默认值可以 让您快速上手。您可以通过查看$AIRFLOW_HOME/airflow.cfg 文件或者在 UI 的 Admin->Configuration 菜 单 中 检 查 相 关 配 置 。 如 果 由 systemd 启 动 , 则 webserver 的 PID 文 件 将 存 储 在 $AIRFLOW_HOME/airflow-webserver.pid 或/run/airflow/webserver.pid。 开箱即用,Airflow 使用 sqlite 数据库,由于使用此数据库后端无法进行并行化,因此您应该迅速替换 它。它与 SequentialExecutor 一起使用,但仅能按顺序运行任务实例。虽然这是非常有限的,但它允许您 快速启动和运行并浏览 UI 和命令行实用程序。
www.aboutyun.com 活到老学到老 以下是一些将触发一些任务实例的命令。在运行以下命令时,您应该能够在 example_bash_operatorDAG 中 看到任务的状态发生变化。 # 运行第一个任务实例 airflow run example_bash_operator runme_0 2015-01-01 # 运行两天的任务回填 airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02 安装 1.获取 Airflow: 安装最新稳定版 Airflow 的最简单方法是使用 pip : pip install apache-airflow 您还可以安装 Airflow 的一些别的支持功能组件,例如 gcp_api 或者 postgres: pip install apache-airflow[postgres,gcp_api] 2.额外的扩展包: 通过 PyPI 的 apache-airflow 命令下载的基本包只含有启动的基础部分内容。您可以根据您环境的需要 下载您的扩展包。例如,如果您不需要连接 Postgres,那么您就不需要使用 yum 命令安装 postgres-devel, 或者在您使用的系统上面安装 postgre 应用,并在安装中的经历一些痛苦过程。 除此之外,Airflow 可以按照需求导入这些扩展包来使用。 如下是列举出来的子包列表和他的功能: 包名 安装命令 说明 all all_dbs async celery cloudant crypto pip install apache-airflow[all] 所有 Airflow 功能 pip install apache-airflow[all_dbs] 所有集成的数据库 pip install Gunicorn 的异步 worker apache-airflow[async] classes pip install apache-airflow[celery] CeleryExecutor pip install apache-airflow[cloudant] Cloudant hook pip install apache-airflow[crypto] 加密元数据 db 中的连接密码
www.aboutyun.com 活到老学到老 devel pip install apache-airflow[devel] 最小开发工具要求 devel_hadoo p druid pip install apache-airflow[devel_hado Airflow + Hadoop stack 的依赖 op] pip install Druid.io 相关的 operators 和 apache-airflow[druid] hooks gcp_api pip install apache-airflow[gcp_api] Google 云平台 hooks 和 operators(使用 google-api-python-c lient ) github_enter prise pip install apache-airflow[github_ent Github 企业版身份认证 erprise] pip install google_auth apache-airflow[google_aut Google 身份认证 hdfs hive jdbc h] pip install apache-airflow[hdfs] pip install apache-airflow[hive] pip install apache-airflow[jdbc] HDFS hooks 和 operators 所有 Hive 相关的 operators JDBC hooks 和 operators kerberos pip install Kerberos 集成 Kerberized apache-airflow[kerberos] Hadoop pip install kubernetes apache-airflow[kubernetes ] Kubernetes Executor 以及 operator ldap mssql mysql pip install apache-airflow[ldap] 用户的 LDAP 身份验证 pip install apache-airflow[mssql] Microsoft SQL Server operators 和 hook,作为 Airflow 后端支持 pip install apache-airflow[mysql] MySQL operators 和 hook,支 持作为 Airflow 后端。 MySQL 服务器的版本必须是 5.6.4+。 确切的版本上限取决于 mysqlclient 包的版本。 例
www.aboutyun.com 活到老学到老 如, mysqlclient 1.3.12 只能与 MySQL 服务器 5.6.4 到 5.7 一起使用。 pip install apache-airflow[password] 用户密码验证 pip install apache-airflow[postgres] Postgres operators 和 hook,作 为 Airflow 后端支持 pip install apache-airflow[qds] 启用 QDS(Qubole 数据服务) 支持 pip install apache-airflow[rabbitmq] rabbitmq 作为 Celery 后端支 持 pip install apache-airflow[redis] Redis hooks 和 sensors pip install S3KeySensor, apache-airflow[s3] S3PrefixSensor pip install apache-airflow[samba] Hive2SambaOperator pip install SlackAPIPostOperato apache-airflow[slack] r pip install apache-airflow[ssh] SSH hooks 及 Operator pip install apache-airflow[vertica] 做为 Airflow 后端的 Vertica hook 支持 password postgres qds rabbitmq redis s3 samba slack ssh vertica 3.初始化 Airflow 数据库: 在您运行任务之前,Airflow 需要初始化数据库。 如果您只是在试验和学习 Airflow,您可以坚持使用默 认的 SQLite 选项。 如果您不想使用 SQLite,请查看初始化数据库后端以设置其他数据库。 配置完成后,若您想要运行任务,需要先初始化数据库: airflow initdb 操作指南 1.设置配置选项: 第一次运行 Airflow 时,它会在$AIRFLOW_HOME 目录中创建一个名为 airflow.cfg 的文件(默认情况下为 ~/airflow)。此文件包含 Airflow 的配置,您可以对其进行编辑以更改任何设置。您还可以使用以下格式 设置带有环境变量的选项:$AIRFLOW__{SECTION}__{KEY} (注意使用双下划线)。
www.aboutyun.com 活到老学到老 例如,元数据库连接字符串可以在 airflow.cfg 设置,如下所示: [core] sql_alchemy_conn = my_conn_string 或者通过创建相应的环境变量: AIRFLOW__CORE__SQL_ALCHEMY_CONN = my_conn_string 您还可以通过将_cmd 附加到键来在运行时派生连接字符串,如下所示: [core] sql_alchemy_conn_cmd = bash_command_to_run 下列配置选项支持这种_cmd 配置办法         [core]部分中的 sql_alchemy_conn [core]部分中的 fernet_key [celery]部分中的 broker_url [celery]部分中的 result_backend [atlas]部分中的 password [smtp]部分中的 smtp_password [ldap]部分中的 bind_password [kubernetes]部分中的 git_password 这背后的想法是不将密码存储在纯文本文件的框中。 所有配置选项的优先顺序如下 - 1. 环境变量 2. airflow.cfg 中的配置 3. airflow.cfg 中的命令 4. 默认 2.初始化数据库后端: 如果您想对 Airflow 进行真正的试使用,您应该考虑设置一个真正的数据库后端并切换到 LocalExecutor。 由 于 Airflow 是使 用 优 秀 的 SqlAlchemy 库 与 其 元 数 据进 行 交 互 而 构 建 的, 因 此 您 可 以 使 用 任何 SqlAlchemy 所支持的数据库作为后端数据库。我们推荐使用 MySQL 或 Postgres。 注意:我们依赖更严格的 MySQL SQL 设置来获得合理的默认值。确保在[mysqld]下的 my.cnf 中指定了 explicit_defaults_for_timestamp = 1; 注意:如果您决定使用 Postgres,我们建议您使用 psycopg2 驱动程序并在 SqlAlchemy 连接字符串中指 定它。另请注意,由于 SqlAlchemy 没有公开在 Postgres 连接 URI 中定位特定模式的方法,因此您可能 需要使用类似于 ALTER ROLE username SET search_path = airflow, foobar;的命令为您的角色设置默认 模式
www.aboutyun.com 活到老学到老 一旦您设置好管理 Airflow 的数据库以后,您需要更改 配置文件$AIRFLOW_HOME/airflow.cfg 中的 SqlAlchemy 连接字符串。然后,您还应该将“executor”设置更改为使用“LocalExecutor”,这是一个 可以在本地并行化任务实例的执行程序。 # 初始化数据库 airflow initdb 3.使用 Operators(执行器): operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了 DAG 运行时实际执行 的内容。 (1)BashOperator 使用 BashOperator 在 Bash shell 中执行命令。 run_this = BashOperator( task_id='run_after_loop', bash_command='echo 1', dag=dag) 模板 您可以使用 Jinja 模板来参数化 bash_command 参数。 also_run_this = BashOperator( task_id='also_run_this', bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag, ) 故障排除 (a)找不到 Jinja 模板 在使用 bash_command 参数直接调用 Bash 脚本时,需要在脚本名称后添加空格。这是因为 Airflow 尝试 将 Jinja 模板应用于一个失败的脚本。 t2 = BashOperator( task_id='bash_example', # 这将会出现`Jinja template not found`的错误 # bash_command="/home/batcher/test.sh", # 在加了空格之后,这会正常工作 bash_command="/home/batcher/test.sh ",
www.aboutyun.com 活到老学到老 dag=dag) (2)PythonOperator 使用 PythonOperator 执行 Python 回调。 def print_context ( ds , ** kwargs ): pprint ( kwargs ) print ( ds ) return 'Whatever you return gets printed in the logs' run_this = PythonOperator ( task_id = 'print_the_context' , provide_context = True , python_callable = print_context , dag = dag ) 传递参数 使用 op_args 和 op_kwargs 参数将额外参数传递给 Python 的回调函数。 def my_sleeping_function(random_base): """这是一个将在 DAG 执行体中运行的函数""" time.sleep(random_base) # Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively for i in range(5): task = PythonOperator( task_id='sleep_for_' + str(i), python_callable=my_sleeping_function, op_kwargs={'random_base': float(i) / 10}, dag=dag, ) run_this >> task 模板 当您将 provide_context 参数设置为 True,Airflow 会传入一组额外的关键字参数:一个用于每个 Jinja 模 板变量和一个 templates_dict 参数。 templates_dict 参数是模板化的,因此字典中的每个值都被评估为 Jinja 模板。
分享到:
收藏