Sunday, March 17, 2024

Trigger any Google Service based on Google Cloud Storage Events

Recently, I encountered a requirement that involved triggering a Google Cloud Composer DAG based on events occurring in Google Cloud Storage (GCS).

To illustrate this scenario, let's consider an example: Suppose I have a Composer DAG designed to process data files stored in a GCS bucket. These files may arrive at any time throughout the day, and it's crucial to process them promptly upon arrival since they contain time-sensitive information.

Now, let's explore several solutions to address this requirement:

Option 1: Scheduled DAG Runs

Scheduling a DAG to run at regular intervals throughout the day is a straightforward approach. However, it suffers from several drawbacks:

  1. Potential Overkill: Running the DAG every few minutes, regardless of whether new files are available, can lead to unnecessary resource consumption and costs, especially if the frequency far exceeds the actual arrival rate of files.

  2. Processing Delay: If a file arrives just after a DAG run completes, it will have to wait until the next scheduled run, potentially introducing processing delays of up to the interval period.

For scenarios where files arrive sporadically or with varying frequencies, Option 1 may not be the most efficient solution.

Option 2: Cloud Function with GCS Trigger

Creating a Cloud Function triggered by GCS events offers a more responsive and efficient solution:

  1. Real-Time Processing: By triggering the Cloud Function upon file creation or finalization in GCS, we ensure immediate processing of incoming files, eliminating unnecessary delays.

  2. Dynamic DAG Submission: The Cloud Function can dynamically determine the appropriate DAG to submit based on the incoming file's metadata, allowing for flexibility and automation.

However, this approach has a notable drawback:

  1. Potential Overhead: While the Cloud Function provides real-time processing capabilities, it triggers for every file that arrives in the designated bucket, potentially leading to unnecessary function invocations and associated costs.

Option 3: Pub/Sub Notifications for GCS

Utilizing Pub/Sub notifications for GCS object changes introduces a more refined and scalable solution:

  1. Fine-Grained Triggering: By subscribing a Cloud Function to a Pub/Sub topic associated with specific GCS events (e.g., object finalization in a designated folder), we can ensure that the function only triggers when relevant files are created or modified.

  2. Efficient Resource Utilization: With Pub/Sub notifications, we avoid the overhead of continuously polling GCS for changes and running unnecessary function invocations, leading to more efficient resource utilization and cost savings.

  3. Real-Time Processing with Reduced Overhead: This approach combines the benefits of real-time file processing with reduced overhead, making it an optimal choice for scenarios where responsiveness and efficiency are paramount.

i). Create a topic on the GCS bucket, folder( ex: gs://gc-us-gcs-xyz-bkt/stg/) Ref: https://cloud.google.com/storage/docs/gsutil/commands/notification Sample Code: gsutil notification create -t tpc_xyz_file_stg -f json -e OBJECT_FINALIZE
-p stg/ gs://gc-us-gcs-xyg-bkt ii). Deploy the Cloud Function and associate the Cloud Function to the topic created above. Refer Cloud Function creation documentation to create a Cloud Function. Sample Code: gcloud functions deploy xyz_trigger_cf_gcs_events --runtime python312
--region us-east4
--ingress-settings=internal-and-gclb --entry-point xyz_trigger_cf_on_gcs_events
--trigger-topic= tpc_xyz_file_stg --source .

Conclusion:

While each option has its merits, Option 3 stands out as the most appropriate solution for triggering Composer DAGs based on GCS events. By leveraging Pub/Sub notifications, we achieve real-time processing with minimal overhead and optimal resource utilization, ensuring timely and efficient handling of time-sensitive data files.