Getting started with
Apache Airflow
Credit Airflow Official Site
In this post, I am going to discuss Apache Airflow, a workflow
management system developed by Airbnb.
Earlier I had discussed writing basic ETL pipelines in Bonobo.
Bonobo is cool for write ETL pipelines but the world is not all
about writing ETL pipelines to automate things. There are other
use cases in which you have to perform tasks in a certain order
once or periodically. For instance:
Monitoring Cron jobs
transferring data from one place to other.
Automating your DevOps operations.
Periodically fetching data from websites and
update the database for your awesome price
comparison system.
Data processing for recommendation based
systems.
Machine Learning Pipelines.
Possibilities are endless.
Before we move on further to implement Airflow in our systems,
let’s discuss what actually is Airflow and it’s terminologies.
What is Airflow?
From the Website:
Airflow is a platform to programmatically author, schedule
and monitor workflows.
Use airflow to author workflows as directed acyclic graphs
(DAGs) of tasks. The airflow scheduler executes your tasks on
an array of workers while following the specified
dependencies. Rich command line utilities make performing
complex surgeries on DAGs a snap. The rich user interface
makes it easy to visualize pipelines running in production,
monitor progress, and troubleshoot issues when needed.
Basically, it helps to automate scripts in order to perform tasks.
Airflow is Pythonbased but you can execute a program
irrespective of the language. For instance, the first stage of your
workflow has to execute a C++ based program to perform image
analysis and then a Pythonbased program to transfer that
information to S3. Possibilities are endless.
What is Dag?
From Wikipedia
In mathematics and computer science, a directed acyclic graph
(DAG /ˈdæɡ/ (About this sound listen)), is a finite directed
graph with no directed cycles. That is, it consists of finitely
many vertices and edges, with each edge directed from one
vertex to another, such that there is no way to start at any
vertex v and follow a consistentlydirected sequence of edges
that eventually loops back to v again. Equivalently, a DAG is a
directed graph that has a topological ordering, a sequence of
the vertices such that every edge is directed from earlier to
later in the sequence.
Let me try to explain in simple words: You can only be a son of
your father but not vice versa. OK, it’s lame or weird but could
not find a better example to explain a directed cycle.
Airflow DAG(Credit: Apache Airflow)
In Airflow all workflows are DAGs. A Dag consists of operators.
An operator defines an individual task that needs to be performed.
There are different types of operators available( As given on
Airflow Website):
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
EmailOperator - sends an email
SimpleHttpOperator - sends an HTTP request
MySqlOperator, SqliteOperator, PostgresOperator,
MsSqlOperator, OracleOperator, JdbcOperator, etc. -
executes a SQL command
Sensor - waits for a certain time, file, database row,
S3 key, etc…
You can also come up with a custom operator as per your need.
Installation and Setup
Airflow is Python based. The best way to install it is via pip tool.
pip install apacheairflow
To verify whether it got installed, run the command: airflow
version and it should print something like:
[20180922 15:59:23,880] {__init__.py:51} INFO Using
executor SequentialExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.0
You will need to install mysqlclient as well to incorporate
MySQL in your workflows. It is optional though.
pip install mysqlclient
Before you start anything, create a folder and set it as
AIRFLOW_HOME. In my case it is airflow_home. Once created you
will call export command to set it in the path.
export AIRFLOW_HOME='pwd' airflow_home
Make sure you are a folder above of airflow_home before running
the export command. Within airflow_home you will create
another folder to keep DAGs. Call it dags
If you set load_examples=False it will not load default examples
on the Web interface.
Now you have to call airflow initdb within airflow_home folder.
Once it’s done it creates airflow.cfgand unitests.cfg
airflow.db is an SQLite file to store all configuration related to
run workflows. airflow.cfg is to keep all initial settings to keep
things running.
In this file, you can see sql_alchemy_conn parameter with the
value ../airflow_home/airflow.db
You can use MySQL if you want. For now, just stick with basic
settings.
So far so good, now without wasting any time let’s start the web
server.
airflow webserver
When starts it shows the screen like:
20180920 22:36:24,943] {__init__.py:51} INFO Using
executor SequentialExecutor
/anaconda3/anaconda/lib/python3.6/site
packages/airflow/bin/cli.py:1595: DeprecationWarning: The
celeryd_concurrency option in [celery] has been renamed to
worker_concurrency the old setting has been used, but
please update your config.
default=conf.get('celery', 'worker_concurrency')),
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.0
[20180919 14:21:42,340] {__init__.py:57} INFO Using
executor SequentialExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
/anaconda3/anaconda/lib/python3.6/site
packages/flask/exthook.py:71: ExtDeprecationWarning:
Importing flask.ext.cache is deprecated, use flask_cache
instead.
.format(x=modname), ExtDeprecationWarning
[20180919 14:21:43,119] [48995] {models.py:167} INFO
Filling up the DagBag from /Development/airflow_home/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Now when you visit 0.0.0.0:8080 it shows a screen like: