Apache Airflow is a system to programmatically author, schedule, and monitor data pipelines. How to quickly deploy, diagnose, and evaluate your data pipeline tasks in Airflow using CLI

Install Airflow

1. Install Airflow

Follow the installation instructions on the Airflow website.

Update Airflow Configurations

To configure Airflow to use Postgres rather than the default Sqlite3, go to airflow.cfg and update this configuration to LocalExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor

The LocalExecutor can parallelize task instances locally.

Also update the SequelAlchemy string to point to a database you are about to create.

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow

Next open a PostgreSQL shell.

psql

And create a new postgres database.

CREATE DATABASE airflow

Your now ready to initialize the DB in Airflow. In bash run:

airflow initdb

Create a DAG

1. Create a DAG folder.

In the console run:

mkdir airflow/dags

2. Add the necessary connections.

The first connection for my API call:

  • connection type of HTTP.
  • connection identifier of moves_profile.
  • host string of the full API endpoint: https://moves….

The second connection for my project database:

  • connection type of Postgres.
  • connection identifier of users (name of the table).
  • host string of 127.0.0.1.
  • schema string (database name) of kojak.
  • login of postgres (default).

3. Create a DAG python configuration file.

In the console run:

touch ~/airflow/dags/moves_profile.py

Then add your DAG configs.

"""
DAG pulls a user's profile information from the Moves API.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json

def get_profile(ds, **kwargs):
    pg_hook = PostgresHook(postgres_conn_id='users')
    api_hook = HttpHook(http_conn_id='moves_profile', method='GET')

    # Get profile info from Moves API
    resp = api_hook.run('')
    profile = json.loads(resp.content.decode('utf-8'))
    moves_user_id = profile['userId']
    moves_first_date = profile['profile']['firstDate']
    timezone = profile['profile']['currentTimeZone']['id']

    # Insert profile values into Postgres DB
    user_insert = """INSERT INTO users (moves_user_id, moves_start_date, timezone)
                      VALUES (%s, %s, %s);"""

    pg_hook.run(user_insert, parameters=(moves_user_id, moves_first_date, timezone))

default_args = {
    'owner': 'rosiehoyem',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 21),
    'email': ['rosiehoyem@gmail.com.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('moves_profile', default_args=default_args, schedule_interval=timedelta(1))


get_profile_task = \
    PythonOperator(task_id='get_profile',
                   provide_context=True,
                   python_callable=get_profile,
                   dag=dag)

Deploy with Docker

1. Setup an EC2 instance

Be sure you’ve set-up Port 8080:

  • Custom TCP Rule
  • Port Range: 80 (for web REST)
  • Source: Anywhere).

2. Install Docker on the EC2 instance.

3. Pull and run the docker-airflow image onto your EC2 instance

Instructions for this instance can be found on the image Github page.

docker pull puckel/docker-airflow
docker run -d -p 8080:8080 puckel/docker-airflow

4. Create a tunnel from your local terminal into your EC2 instance on port 8080.

ssh -i ~/.ssh/aws_key_file.pem -NL 12345:localhost:8080 ubuntu@XX.XXX.XXX.XXX

Go to airflow container

docker exec -it b5d116ad83cc bash

Create user

airflow users create \
    --username airflow \
    --firstname Airflow \
    --lastname Apache \
    --role Admin \
    --email airflow@example.com \
    --password airflow

Airflow CLI

Miscellaneous commands

airflow cheat-sheet | Display cheat sheet
airflow info | Show information about current Airflow and environment
airflow kerberos | Start a kerberos ticket renewer
airflow plugins | Dump information about loaded plugins
airflow rotate-fernet-key | Rotate encrypted connection credentials and variables
airflow scheduler | Start a scheduler instance
airflow sync-perm | Update permissions for existing roles and DAGs
airflow version | Show the version
airflow webserver | Start a Airflow webserver instance

Celery components

airflow celery flower | Start a Celery Flower
airflow celery stop | Stop the Celery worker gracefully
airflow celery worker | Start a Celery worker node

View configuration

airflow config get-value | Print the value of the configuration
airflow config list | List options for the configuration

Manage connections

airflow connections add | Add a connection
airflow connections delete | Delete a connection
airflow connections export | Export all connections
airflow connections get | Get a connection
airflow connections list | List connections

Manage DAGs

airflow dags backfill | Run subsections of a DAG for a specified date range
airflow dags delete | Delete all DB records related to the specified DAG
airflow dags list | List all the DAGs
airflow dags list-jobs | List the jobs
airflow dags list-runs | List DAG runs given a DAG id
airflow dags next-execution | Get the next execution datetimes of a DAG
airflow dags pause | Pause a DAG
airflow dags report | Show DagBag loading report
airflow dags show | Displays DAGs tasks with their dependencies
airflow dags state | Get the status of a dag run
airflow dags test | Execute one single DagRun
airflow dags trigger | Trigger a DAG run
airflow dags unpause | Resume a paused DAG

Database operations

airflow db check | Check if the database can be reached
airflow db check-migrations | Check if migration have finished
airflow db init | Initialize the metadata database
airflow db reset | Burn down and rebuild the metadata database
airflow db shell | Runs a shell to access the database
airflow db upgrade | Upgrade the metadata database to latest version

Tools to help run the KubernetesExecutor

airflow kubernetes cleanup-pods | Clean up Kubernetes pods in evicted/failed/succeeded states
airflow kubernetes generate-dag-yaml | Generate YAML files for all tasks in DAG. Useful for debugging tasks without launching into a cluster

Manage pools

airflow pools delete | Delete pool
airflow pools export | Export all pools
airflow pools get | Get pool size
airflow pools import | Import pools
airflow pools list | List pools
airflow pools set | Configure pool

Display providers

airflow providers behaviours | Get information about registered connection types with custom behaviours
airflow providers get | Get detailed information about a provider
airflow providers hooks | List registered provider hooks
airflow providers links | List extra links registered by the providers
airflow providers list | List installed providers
airflow providers widgets | Get information about registered connection form widgets

Users

airflow users | list will list down all users and their roles
airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin | will create a new user
airflow users delete -u USERNAME | will delete that user

Tasks

airflow tasks list DAG_ID | will list down all tasks related to a given DAG
airflow tasks test DAG_ID> TASK_ID> EXECUTION_TIME> | will perform test on a specific task in a DAG

##CLI

  • Renders all templateable attributes of a given task

airflow tasks render [dag id] [task id] [desired execution date]

airflow tasks render example_dag run_this 2021-01-01
  • Add postgres connections
airflow connections add \
--conn-type postgres \
--conn-host localhost \
--conn-login postgres \
--conn-password mysecretpassword \
my_postgres

Ссылки:

⤧  Previous post Мануал по работе с Docker ⤧  Next post Pycharm Hotkey