Saturday, March 4, 2023

REST API to trigger Airflow/Composer DAG

REST APIs are a popular way for applications to communicate with each other, allowing for flexible and scalable interactions. In this blog post, we'll explore how to use Google Cloud Run to trigger an Airflow/Composer DAG through a REST API.


Google Cloud Run is a fully managed compute platform that allows you to run stateless containers that are automatically scaled to meet incoming traffic. Airflow is an open-source platform to programmatically author, schedule and monitor workflows. Composer is a managed version of Airflow by Google Cloud.


Before diving into the implementation, let's first understand what a DAG is. A DAG (Directed Acyclic Graph) is a collection of tasks that are dependent on each other, with a defined order of execution. Airflow uses DAGs to define workflows and their dependencies.


To trigger an Airflow/Composer DAG using a REST API, we can create a simple Python Flask app that sends a request to the Airflow/Composer REST API endpoint. The REST API endpoint can be used to trigger a DAG run by passing a JSON payload that contains the DAG ID, and any other necessary parameters.


Here's an example of how to create a Flask app to trigger a DAG using the Airflow REST API:

 from flask import Flask, request

import requests


app = Flask(__name__)


# Airflow/Composer REST API endpoint

AIRFLOW_ENDPOINT = 'https://airflow.googleapis.com/v1beta1/dags/<dag_id>/dagRuns'


# Service account credentials

HEADERS = {

    'Authorization': 'Bearer <access_token>',

    'Content-Type': 'application/json'

}


@app.route('/trigger-dag', methods=['POST'])

def trigger_dag():

    # Get payload data from request body

    payload = request.get_json()


    # Build request data

    data = {

        'conf': payload.get('conf', {}),

        'replace_microseconds': 'false'

    }


    # Send request to Airflow REST API

    response = requests.post(AIRFLOW_ENDPOINT, json=data, headers=HEADERS)


    if response.status_code == 200:

        return 'DAG triggered successfully', 200

    else:

        return 'Failed to trigger DAG', response.status_code

In this example, we have defined a route '/trigger-dag' that accepts a POST request with a JSON payload. The payload contains any necessary parameters for the DAG run. The Flask app sends a request to the Airflow/Composer REST API endpoint using the requests library.

The HEADERS variable contains the service account credentials required to authenticate with the Airflow/Composer REST API. Replace <access_token> with the actual access token for the service account that has the necessary permissions to trigger DAGs.

Replace <dag_id> with the actual ID of the DAG that you want to trigger. You can find the DAG ID in the Airflow/Composer UI or by running the command airflow list_dags in the Cloud Shell.

Once you have created the Flask app, you can deploy it to Google Cloud Run using the following command:

gcloud run deploy <service_name> --image gcr.io/<project_id>/<image_name> --platform managed


Replace <service_name> with the desired name for the Cloud Run service, <project_id> with your Google Cloud project ID, and <image_name> with the name of the container image that contains the Flask app.

Finally, you can test the REST API by sending a POST request to the Cloud Run service URL with the JSON payload. The DAG should trigger successfully, and you can monitor the status of the DAG run in the Airflow/Composer UI.

In conclusion, using Google Cloud Run to trigger an Airflow/Composer DAG through a REST API is a simple and scalable solution

You may check this code repository for working solution