logo资料库

airflow指南.pdf

第1页 / 共21页
第2页 / 共21页
第3页 / 共21页
第4页 / 共21页
第5页 / 共21页
第6页 / 共21页
第7页 / 共21页
第8页 / 共21页
资料共21页,剩余部分请下载后查看
Getting started with  Apache Airflow Credit Airflow Official Site In this post, I am going to discuss Apache Airflow, a workflow  management system developed by Airbnb.
Earlier I had discussed writing basic ETL pipelines in Bonobo.  Bonobo is cool for write ETL pipelines but the world is not all  about writing ETL pipelines to automate things. There are other  use cases in which you have to perform tasks in a certain order  once or periodically. For instance: Monitoring Cron jobs transferring data from one place to other. Automating your DevOps operations. Periodically fetching data from websites and  update the database for your awesome price  comparison system. Data processing for recommendation based  systems. Machine Learning Pipelines. Possibilities are endless. Before we move on further to implement Airflow in our systems,  let’s discuss what actually is Airflow and it’s terminologies. What is Airflow? From the Website: Airflow is a platform to programmatically author, schedule  and monitor workflows.
Use airflow to author workflows as directed acyclic graphs  (DAGs) of tasks. The airflow scheduler executes your tasks on  an array of workers while following the specified  dependencies. Rich command line utilities make performing  complex surgeries on DAGs a snap. The rich user interface  makes it easy to visualize pipelines running in production,  monitor progress, and troubleshoot issues when needed. Basically, it helps to automate scripts in order to perform tasks.  Airflow is Python­based but you can execute a program  irrespective of the language. For instance, the first stage of your  workflow has to execute a C++ based program to perform image  analysis and then a Python­based program to transfer that  information to S3. Possibilities are endless. What is Dag? From Wikipedia In mathematics and computer science, a directed acyclic graph  (DAG /ˈdæɡ/ (About this sound listen)), is a finite directed  graph with no directed cycles. That is, it consists of finitely  many vertices and edges, with each edge directed from one  vertex to another, such that there is no way to start at any  vertex v and follow a consistently­directed sequence of edges  that eventually loops back to v again. Equivalently, a DAG is a 
directed graph that has a topological ordering, a sequence of  the vertices such that every edge is directed from earlier to  later in the sequence. Let me try to explain in simple words: You can only be a son of  your father but not vice versa. OK, it’s lame or weird but could  not find a better example to explain a directed cycle. Airflow DAG(Credit: Apache Airflow) In Airflow all workflows are DAGs. A Dag consists of operators.  An operator defines an individual task that needs to be performed.  There are different types of operators available( As given on  Airflow Website):
BashOperator - executes a bash command PythonOperator - calls an arbitrary Python function EmailOperator - sends an email SimpleHttpOperator - sends an HTTP request MySqlOperator, SqliteOperator, PostgresOperator,  MsSqlOperator, OracleOperator, JdbcOperator, etc. -  executes a SQL command Sensor - waits for a certain time, file, database row,  S3 key, etc… You can also come up with a custom operator as per your need. Installation and Setup Airflow is Python based. The best way to install it is via pip tool. pip install apache­airflow To verify whether it got installed, run the command: airflow  version and it should print something like: [2018­09­22 15:59:23,880] {__init__.py:51} INFO ­ Using  executor SequentialExecutor ____________       _____________ ____    |__( )_________  __/__  /________      __ ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / / ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/ v1.10.0 You will need to install mysqlclient as well to incorporate  MySQL in your workflows. It is optional though. pip install mysqlclient Before you start anything, create a folder and set it as  AIRFLOW_HOME. In my case it is airflow_home. Once created you  will call export command to set it in the path. export AIRFLOW_HOME='pwd' airflow_home Make sure you are a folder above of airflow_home before running  the export command. Within airflow_home you will create  another folder to keep DAGs. Call it dags If you set load_examples=False it will not load default examples  on the Web interface. Now you have to call airflow initdb within airflow_home folder.  Once it’s done it creates airflow.cfgand unitests.cfg airflow.db is an SQLite file to store all configuration related to  run workflows. airflow.cfg is to keep all initial settings to keep  things running.
In this file, you can see sql_alchemy_conn parameter with the  value ../airflow_home/airflow.db You can use MySQL if you want. For now, just stick with basic  settings. So far so good, now without wasting any time let’s start the web  server. airflow webserver When starts it shows the screen like: 2018­09­20 22:36:24,943] {__init__.py:51} INFO ­ Using  executor SequentialExecutor /anaconda3/anaconda/lib/python3.6/site­ packages/airflow/bin/cli.py:1595: DeprecationWarning: The  celeryd_concurrency option in [celery] has been renamed to  worker_concurrency ­ the old setting has been used, but  please update your config. default=conf.get('celery', 'worker_concurrency')), ____________       _____________ ____    |__( )_________  __/__  /________      __ ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / / ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ / _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
v1.10.0 [2018­09­19 14:21:42,340] {__init__.py:57} INFO ­ Using  executor SequentialExecutor ____________       _____________ ____    |__( )_________  __/__  /________      __ ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / / ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ / _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/ /anaconda3/anaconda/lib/python3.6/site­ packages/flask/exthook.py:71: ExtDeprecationWarning:  Importing flask.ext.cache is deprecated, use flask_cache  instead. .format(x=modname), ExtDeprecationWarning [2018­09­19 14:21:43,119] [48995] {models.py:167} INFO ­  Filling up the DagBag from /Development/airflow_home/dags Running the Gunicorn Server with: Workers: 4 sync Host: 0.0.0.0:8080 Now when you visit 0.0.0.0:8080 it shows a screen like:
分享到:
收藏