Airflow and XCOM: Inter Task Communication Use Cases

Airflow is a robust workflow pipeline framework that we’ve used at Precocity for with a number of clients with great success. This blog is not geared towards introducing you to Airflow and all that it can do, but focused on a couple of XCOM use cases that may be beneficial to your own projects.

I’m going to walk you through my own introduction to XCOM that came from Michal Karzynski’s great blog:  http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/

So rather than reinvent the wheel, I suggest you read his blog and reference his code where applicable and I will call out the changes I made.

The Problem

First, a little bit of background: As part of our Airflow implementations, we’ve developed custom plugins that do a great job of encapsulating the need for querying databases, storing the results in a CSV file to an S3 or GCS bucket and then ingesting that data into a Cloud Data Warehouse. Other plugins handle the SFTP of files, GNUPG encryption and decryption tasks, etc. My thoughts on custom plugins vs. writing code in your DAGs can be covered in a separate blog post.

For the SFTP handling, dynamic tasks were written that downloaded the file, decrypted it and then deleted the source file on the SFTP site. The SFTP implementation went through a couple of revisions, primarily due to the chattiness of the DAG. This is what some of the code looked like:

directory_list = sftp_handler('sftp', None, '/Remote/', None, SFTPToS3Operation.LIST)
for file_path in directory_list:
    path, file = os.path.split(file_path)
    get_remote_file_task = SFTPToS3Operator(
        task_id='get_remote_file' + '-' + file,
        ssh_conn_id='sftp',
        source_file_path='/' + file,
        destination_file_path='s3://' + bucket_name + '/SFTP/raw/' + file,
        operation=SFTPToS3Operation.GET,
        dag=dag)

    db_load_task = DBLoadOperator(task_id='data_load',
                                  sf_schema='db',
                                  load_table='tbl',
                                  trunc=False,
                                  dag=dag)

    get_remote_file_task.set_downstream(db_load_task)
    
    delete_remote_file_task = SFTPToS3Operator(
        task_id='delete_remote_file' + '-' + file, 
        ssh_conn_id='sftp',
        source_file_path='/Remote/' + file, 
        destination_file_path=None,
        operation=SFTPToS3Operation.DELETE,
        dag=dag)

    db_load_task.set_downstream(delete_remote_file_task)

It’s not a complicated DAG, and you can see that the task_id’s are uniquely named based on the files found on the remote SFTP site by appending the filename to each task_id. The problem came down to one that currently exists in Airflow 1.9.0 where the scheduler refreshes the DAG list every second because it doesn’t maintain DAG state and the scheduler interval isn’t currently configurable. AFAIK, this hasn’t changed for Airflow 1.10.0.

Because the code is retrieving a remote file list every second, there’s an excessive amount of chattiness from Airflow to the SFTP server. Additionally, any log entries generated from executing the top level code essentially creates “noise” that is best left unseen.

Prior to trying XCOM, my solution was to create an Airflow variable that tracked actual execution of the DAG. If it was really executing, only then would the LIST command run on the SFTP site and generate the dynamic tasks from there. While it generated less noise, it added additional complexity because if there was a failure in one of the upstream tasks, the Airflow variable had to be cleared so it would run again on the next scheduled interval.

From a debugging standpoint, because an Airflow variable was being read to prevent task execution, the DAG always showed the task that tested for the Airflow variable and rarely showed the fully populated DAG graph, with the dynamically rendered tasks. Occasionally, you  would see the full graph during execution, only to have it disappear when the DAG completed. This meant digging through the file system logs instead of viewing them through the web UI. Not terribly elegant, especially when combined with the next problem.

Oddly enough, the DAG wouldn’t always register properly in one environment.  Other DAGs following the same pattern were fine. Sometimes a restart of the services helped, sometimes not. More time was spent troubleshooting this type of use case than was warranted.

That led me to finally taking a closer look at XCOM and Michal’s post. What I realized would work better was to retrieve the file list, store it in an XCOM variable and pass the same file list to each of the downstream tasks, letting each iterate through the file list. The result would be a simpler DAG.

The Solution Part 1: Flexible Task Parameters

Michal’s sample code was missing a couple of small changes that would help productionalize it. Below is a screenshot of my slightly modified code:

MyFirstSensor

from datetime import datetime
import logging as log

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class MyFirstSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self, xcom_task_id_key, *args, **kwargs):
        super(MyFirstSensor, self).__init__(*args, **kwargs)
        self.xcom_task_id_key = xcom_task_id_key

    def poke(self, context):
        current_minute = datetime.now().minute
        if current_minute % 3 != 0:
            log.info("Current minute (%s) not is divisible by 3, sensor will retry.", current_minute)
            return False

        log.info("Current minute (%s) is divisible by 3, sensor finishing.", current_minute)
        task_instance = context['task_instance']
        task_instance.xcom_push(self.xcom_task_id_key, current_minute)
        return True

The first change to the operator was to accept the id_key (xcom_task_id_key) to store the value under. This allows you to provide that as a parameter to your task and re-use the operator as needed.

The second change references the same id_key when pushing the value into XCOM. Nothing here is earth-shattering, and pretty obvious, but it was the first step for me to get to my solution.

MyFirstOperator

import logging as log

from airflow.operators import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults


class MyFirstOperator(BaseOperator):

    @apply_defaults
    def __init__(self, xcom_task_id, xcom_task_id_key, *args, **kwargs):
        super(MyFirstOperator, self).__init__(*args, **kwargs)
        self.xcom_task_id = xcom_task_id
        self.xcom_task_id_key = xcom_task_id_key

    def execute(self, context):
        log.info("Hello World!")
        task_instance = context['task_instance']
        sensors_minute = task_instance.xcom_pull(self.xcom_task_id, key=self.xcom_task_id_key)
        log.info('Valid minute as determined by sensor: %s', sensors_minute)


class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]

For MyFirstOperator we now accept the id of the task that pushed the data xcom_task_id, plus the id_key xcom_task_id_key. Several tasks can now use this operator and maintain unique values as this operator hard-codes none of the XCOM information.

Closing the Loop

Here’s the final DAG, passing in the new parameters needed by XCOM:

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from MyFirstOperator import MyFirstOperator
from MyFirstSensor import MyFirstSensor

dag = DAG('my_test_dag', description='Another tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_task = DummyOperator(task_id='dummy_task', dag=dag)

sensor_task = MyFirstSensor(task_id='my_sensor_task', xcom_task_id_key='sensors_minute', poke_interval=30, dag=dag)

operator_task = MyFirstOperator(task_id='my_first_operator_task', xcom_task_id='my_sensor_task',
                                xcom_task_id_key='sensors_minute', dag=dag)

dummy_task >> sensor_task >> operator_task

Note that the MyFirstSensor operator is doesn’t need the “xcom_task_id” parameter, since the task_id is inherently used for that. However, when referencing the XCOM data, MyFirstOperator does pass in the same value for the “xcom_task_id” that is used for MyFirstSensor’s “task_id”. Hopefully, this closes the loop for everyone nicely.

The Solution Part 2: Custom SFTP Operator

In general, it’s recommended that your custom plugin code be lightweight and essentially a wrapper to the underlying Python code that handles all the real work. I’ve followed this practice and it turned out to be very helpful when converting to XCOM as all my changes were at the custom plugin level.

The sample code you see is a custom SFTP operator that was developed for a client using AWS and S3 buckets for their datalake. I’m not going to cover all the nuts and bolts of the operator here, as it shouldn’t be necessary when applying the pattern to your own code.

Original Code

@apply_defaults
    def __init__(self,
                 ssh_conn_id=None,
                 remote_host=None,
                 source_file_path=None,
                 destination_file_path=None,
                 operation=None,
                 *args,
                 **kwargs):
        super(SFTPToS3Operator, self).__init__(*args, **kwargs)
        self.ssh_conn_id = ssh_conn_id
        self.remote_host = remote_host
        self.source_file_path = source_file_path
        self.destination_file_path = destination_file_path
        self.operation = operation.lower()
        if not (self.operation == SFTPOperation.GET or self.operation == SFTPOperation.PUT or
                self.operation == SFTPOperation.DELETE or self.operation == SFTPOperation.LIST):
        if not (self.operation == SFTPToS3Operation.GET or self.operation == SFTPToS3Operation.PUT or
                self.operation == SFTPToS3Operation.DELETE or self.operation == SFTPToS3Operation.LIST):
            raise TypeError("unsupported operation value {0}, expected {1} or {2} or {3} or {4}"
                            .format(self.operation, SFTPOperation.GET, SFTPOperation.PUT,
                                    SFTPOperation.DELETE, SFTPOperation.LIST))
                            .format(self.operation, SFTPToS3Operation.GET, SFTPToS3Operation.PUT,
                                    SFTPToS3Operation.DELETE, SFTPToS3Operation.LIST))

    def execute(self, context):
        return sftp_handler(self.ssh_conn_id, self.remote_host, self.source_file_path,
                            self.destination_file_path, self.operation)


class SFTPToS3Plugin(AirflowPlugin):
    name = "SFTPToS3Operator"
    operators = [SFTPToS3Operator]
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []

The original code was designed to operate on a single file that was passed in as the self.source_file_path parameter. Because this operator handles both the LIST and the GET/PUT/DELETE operations, a little refactoring was in order:

@apply_defaults
    def __init__(self,
                 ssh_conn_id=None,
                 remote_host=None,
                 source_file_path=None,
                 destination_file_path=None,
                 operation=None,
                 xcom_task_id=None,
                 xcom_task_id_key=None,
                 *args,
                 **kwargs):
        super(SFTPToS3Operator, self).__init__(*args, **kwargs)
        self.ssh_conn_id = ssh_conn_id
        self.remote_host = remote_host
        self.source_file_path = source_file_path
        self.destination_file_path = destination_file_path
        self.operation = operation.lower()
        self.xcom_task_id = xcom_task_id
        self.xcom_task_id_key = xcom_task_id_key
        if not (self.operation == SFTPToS3Operation.GET or self.operation == SFTPToS3Operation.PUT or
                self.operation == SFTPToS3Operation.DELETE or self.operation == SFTPToS3Operation.LIST):
            raise TypeError("unsupported operation value {0}, expected {1} or {2} or {3} or {4}"
                            .format(self.operation, SFTPToS3Operation.GET, SFTPToS3Operation.PUT,
                                    SFTPToS3Operation.DELETE, SFTPToS3Operation.LIST))

Above is the __init__ method that now accepts optional self.xcom_task_id and self.xcom_task_id_key parameters. Aside from those 2 additional parameters, the __init__ code is identical to before. The execute method is where all the changes occur:

def execute(self, context):
        if self.operation == SFTPToS3Operation.LIST:
            file_list = sftp_handler(self.ssh_conn_id, self.remote_host, self.source_file_path,
                                     self.destination_file_path, self.operation)
            log.info('file_list: %s', file_list)
            task_instance = context['task_instance']
            task_instance.xcom_push(self.xcom_task_id_key, file_list)
        else:
            task_instance = context['task_instance']
            file_list = task_instance.xcom_pull(self.xcom_task_id, key=self.xcom_task_id_key)

            # If no xcom is found, treat it as if the file info were being passed from the DAG
            if isinstance(file_list, list):
                for file_path in file_list:
                    path, file = os.path.split(file_path)
                    log.info('path: %s', path)
                    log.info('file: %s', file)
                    log.info('source file_path: %s', self.source_file_path + file)
                    if self.destination_file_path is not None:
                        log.info('destination file_path: %s', self.destination_file_path + file)
                        sftp_handler(self.ssh_conn_id, self.remote_host, self.source_file_path + file,
                                     self.destination_file_path + file, self.operation)
                    else:
                        sftp_handler(self.ssh_conn_id, self.remote_host, self.source_file_path + file,
                                     self.destination_file_path, self.operation)
            else:
                log.info("No XCOM found in task_instance.")
                sftp_handler(self.ssh_conn_id, self.remote_host, self.source_file_path,
                             self.destination_file_path, self.operation)

Unlike MyFirstSensor and MyFirstOperator, this plugin both reads and writes to XCOM based on whether or not the LIST command is passed. When listing, it writes the file list to the xcom_task_id_key and for the other operations, it reads from that key, plus the xcom_task_id value.

By way of explanation, I have one operator that overloads the LIST command to either read from a remote SFTP file system, or an S3 bucket. That should explain the inner if/else block.

Also note that for the purposes of backward compatibility, I allow the use of the original SFTP handler in the final else statement. If anyone needs the original functionality, or prefers not to use XCOM for some reason, that capability is available. Remember, the parameters passed in are optional for the DAG.

list_remote_files_task = SFTPToS3Operator(
    task_id='list_remote_files',
    ssh_conn_id='sftp',
    source_file_path='/',
    destination_file_path=None,
    operation=SFTPToS3Operation.LIST,
    xcom_task_id_key='sftp_file_list',
    owner='ubuntu',
    dag=dag)

get_remote_file_task = SFTPToS3Operator(
    task_id='get_remote_files',
    ssh_conn_id='sftp',
    source_file_path='/',
    destination_file_path='s3://' + bucket_name + '/' + s3_folder_prefix + '/raw/',
    operation=SFTPToS3Operation.GET,
    xcom_task_id='list_remote_files',
    xcom_task_id_key='sftp_file_list',
    owner='ubuntu',
    dag=dag)

decrypt_gpg_task = GPGDecryptOperator(
    task_id='decrypt_gpg',
    bucket_name=bucket_name,
    encrypted_file_name=s3_folder_prefix + '/raw/',
    passphrase=secret_key,
    decrypted_file_name=s3_folder_prefix + '/converted/',
    xcom_task_id='list_remote_files',
    xcom_task_id_key='sftp_file_list',
    owner='ubuntu',
    dag=dag)

The new DAG is now simplified. The list_remote_file_task sets the file list to manage in the key named “sftp_file_list” and since the task_id that pushed the XCOM variable is named “list_remote_files” that is the same parameter passed to the get_remote_file_task and the decrypt_gpg_task.

In this way, we now avoid the non-performant listing of remote files every second, that not only wastes bandwidth, but fills your Airflow logs with unnecessary entries.

Another Use Case

A second use case came about for another plugin I described initially. We have a plugin that accepts a fair number of parameters that can be used to retrieve data from a database, store the results in a CSV file in one or more locations in an S3 bucket and then loads the data into a Cloud Data Warehouse. By wrapping a bit of code inside a custom plugin, we avoid the need for 50+ DAGs that may need modification when functionality needs to change. From the start, adding new tables to the data warehouse has been simply an issue of providing the connection, an external query that provides a level of abstraction, which allows for a copy/paste of a DAG with some parameter tuning.

This pattern has worked beautifully for the past year. Recently, we were asked to ingest a new datasource that didn’t follow the same conventions as the other systems we were ingesting. Basically, there was no last update timestamp for every table that we needed that we could count on. Instead, there was a group of tables that required sub-selects and joins to get to the data that mattered.

The challenge here was that we had reserved parameters for the SQL that would substitute the load timestamp and a last run timestamp, that we could use for loading incremental data. The catch was now we had to identify a start and ending timestamp based on a header record and make sure that each sub-table load ONLY pulled in the data that matched the initial header record entries. XCOM to the rescue again.

I’ll describe the overall process:

  1. Create a custom plugin that retrieves the last run timestamp for a particular entry in a secondary table we use to track in incremental loading.
  2. Accept as a parameter the number of delta days going forward to retrieve. The use here was to allow us the flexibility to perform both historical loading and incremental loading using the same DAG. Previously, we had some historical load DAGs that were essentially one-offs and wanted to get away from that.
  3. Persist the start/stop timestamps in XCOM as separate keys. I called mine xcom_min_date and xcom_max_date.
  4. The downstream plugin operators read the same key values that were initially set, guaranteeing that if new records were added during the ingest, that we’d skip them until the next scheduled interval, providing for consistency of ingested data.

Below is a snippet of the DAG:

get_date_range = SourceDataDatesRangeOperator(
    task_id='get_date_range',
    data_dates_name='DOCUMENT',
    xcom_task_id_key_min='min_date',
    xcom_task_id_key_max='max_date',
    max_date_delta_days=30,
    owner='ubuntu',
    dag=dag)

data_load = SourceToS3Operator(
    task_id='data_load',
    cnx_name='data',
    source_data_dates_name='DOCUMENT',
    script_name=airflow_home + '/sql_scripts/data_query.sql',
    bucket_name=bucket_name,
    data_lake_folder_name=s3_folder_prefix + '/Data/raw',
    staging_folder_name=None,
    file_prefix='Data.csv.',
    xcom_task_id='get_date_range',
    xcom_task_id_key_min='min_date',
    xcom_task_id_key_max='max_date',
    owner='ubuntu',
    dag=dag)

Every follow-on table is represented by a different SourceToS3Operator task and can leverage the XCOM variables set by the SourceDataDatesRangeOperator.

I hope that this has been a helpful follow-on to Michal’s excellent Airflow tutorial and gives you some good ideas for leveraging XCOM in your own tasks.

Conclusion

XCOM is a relatively straightforward concept that can help you simplify workflows that need to maintain some state between tasks. A number of available Airflow operators already support XCOM. Following the content in this blog and Michal’s blog, you should be in a good position to not only take advantage of available XCOM functionality, but use it with your own custom plugins.


Chris DeBracy

Principal Architect / Director of App Dev