REST APIs are a popular way for applications to communicate with each other, allowing for flexible and scalable interactions. In this blog post, we'll explore how to use Google Cloud Run to trigger an Airflow/Composer DAG through a REST API.
Google Cloud Run is a fully managed compute platform that allows you to run stateless containers that are automatically scaled to meet incoming traffic. Airflow is an open-source platform to programmatically author, schedule and monitor workflows. Composer is a managed version of Airflow by Google Cloud.
Before diving into the implementation, let's first understand what a DAG is. A DAG (Directed Acyclic Graph) is a collection of tasks that are dependent on each other, with a defined order of execution. Airflow uses DAGs to define workflows and their dependencies.
To trigger an Airflow/Composer DAG using a REST API, we can create a simple Python Flask app that sends a request to the Airflow/Composer REST API endpoint. The REST API endpoint can be used to trigger a DAG run by passing a JSON payload that contains the DAG ID, and any other necessary parameters.
Here's an example of how to create a Flask app to trigger a DAG using the Airflow REST API:
from flask import Flask, request
import requests
app = Flask(__name__)
# Airflow/Composer REST API endpoint
AIRFLOW_ENDPOINT = 'https://airflow.googleapis.com/v1beta1/dags/<dag_id>/dagRuns'
# Service account credentials
HEADERS = {
'Authorization': 'Bearer <access_token>',
'Content-Type': 'application/json'
}
@app.route('/trigger-dag', methods=['POST'])
def trigger_dag():
# Get payload data from request body
payload = request.get_json()
# Build request data
data = {
'conf': payload.get('conf', {}),
'replace_microseconds': 'false'
}
# Send request to Airflow REST API
response = requests.post(AIRFLOW_ENDPOINT, json=data, headers=HEADERS)
if response.status_code == 200:
return 'DAG triggered successfully', 200
else:
return 'Failed to trigger DAG', response.status_code
No comments:
Post a Comment