Cloud composer orchestration via cloud build

Google cloud composer is a managed apache airflow service that helps create, schedule, monitor and manage workflows.Cloud Composer automation helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command line tools, so you can focus on your workflows and not your infrastructure.

In this article I will descibe how an engineering team can manage, develop and publish DAGS after running a full CI/CD build pipeline using google cloud build .

Lets imagine a typical engineering scenario:

  1. Engineers break down a big process into series of small logical components or tasks.
  2. Each task is programmed as an operator in the dag.
  3. Enginerring team tests the dag locally and then copy it to composer (airflow) DAG folder.
  4. Composer validates the dag and eventually it is ready to be run.

This generic process can suffer from various issues when multiple engineers are working continuesly speint after sprint, such as :

  1. Deployment issues:

    e.g. Plugins used in development are not available in production

  2. Dependency Errors.

    e.g. Enviroment Variables /connections required for DAG to run is not available in production

  3. Processes errors.

    e.g Logical or programical errors in dag.

  4. Lack of testing and automation.

Dags are code, hence must be treated as such, which means :

  1. They should be backed in repository such as GitHub.

  2. Every PR should trigger automated tests before PR is reviewd by engineers.

  3. Every merge/push to test, uat and prod branch must run a full suit of unit tests.

  4. After successfull test, the dags must be automatically deployed to cloud composer.

Google cloud composer folder structure

Cloud compose uses google cloud storage as its source and looks somethign like this

Dags go inside the dags folder, and same for plugins and env_var.json.

Airflow configuration must go in terraform if used.

Developing a CI/CD Practice for Google Cloud Composer using Cloud Build

The idea here is to use github to back all the dags, use cloud build and preapre a docker image with airflow installed so that tests can be run in docker cotainers.

The Benifits of this process are as follows

  1. Faster development cycles of Airflow DAGs

  2. Uniform repository structure within the team

  3. Reductions in errors when automating DAG deployments

  4. Faster to debug in the event of a failure

  5. Code and configuration stay together in github and is tested and deployed using automation.

Here is an example docker file which creates airflow container to run tests against

FROM python:3.7

COPY requirements.txt ./
RUN pip3 install --no-cache-dir -r requirements.txt
ENV AIRFLOW_HOME=/workspace/airflow

requirements.txt contains plugins and dependencies

werkzeug==0.15.4 # needed for airflow 1.10.3

Git folder structure here resembles the folder structure used by composer in google cloud storage

Here is an example cloudbuild.yaml file

- name: ''
  id: Pull docker cache
  entrypoint: 'bash'
  - '-c'
  - |
   docker pull$PROJECT_ID/airflow-dags-builder:latest || exit 0
- name:
  id: Build Airflow DAGs Builder
  args: [
      '-t', '$PROJECT_ID/airflow-dags-builder',
      '--cache-from', '$PROJECT_ID/airflow-dags-builder:latest',
- name: '$PROJECT_ID/airflow-dags-builder'
  id: Validation Test
  # Validate the integrity of the DAG files.
  entrypoint: python
  - AIRFLOW__CORE__DAGS_FOLDER=/workspace/dags
  - -m
  - unittest
  - tests/

- name:
  # Deploy the DAGs to your composer environment DAGs GCS folder
  id: Deploy DAGs
  - -m
  - rsync
  - -r
  - -c
  - -x
  - .*\.pyc|
  - dags
images: ['$PROJECT_ID/airflow-dags-builder:latest']

The steps describe above are straight forward :

  1. Pull or build airflow docker image using the described docker file.
  2. run validation tests (in python inside the tests folder)
  3. Deploy (rsync) the dags to the dag folder.
  4. Extra steps can be added to rsync env_vars.json and plugins if required.

Sample Dag integrity test

DAG Integrity Tests
import os
import unittest
from airflow.models import DagBag, Variable

class TestDags(unittest.TestCase):
    """DAG Test Case"""


    def setUp(self):
        os.system("airflow initdb")
        for v in ["sample_var",
        self.dagbag = DagBag()

    def test_dags_syntax(self):
        """Assert DAG bag load correctly"""
        for key in self.dagbag.dags:
            f"DAG import errors. Errors: {self.dagbag.import_errors}")

if __name__ == '__main__':
    SUITE = unittest.TestLoader().loadTestsFromTestCase(TestDags)

Running locally using google cloud build

Google cloud build can be used to run this locally if required

Install reps for running cloud build on you local machine

gcloud components install docker-credential-gcr
gcloud auth configure-docker
gcloud components install cloud-build-local

Run Build

mkdir -p /tmp/dags
mkdir -p /tmp/plugins
cloud-build-local \
	 --config=cloudbuild.yaml --dryrun=false \
	 --substitutions=_DEPLOY_DAGS_LOCATION=/tmp .

This will build and test the repo and deploy dags to tmp folder.

Cloud build can eventually be used to act on a git trigger and deploy dags, variables and config to composer.