Sunday, March 17, 2024

Trigger any Google Service based on Google Cloud Storage Events

Recently, I encountered a requirement that involved triggering a Google Cloud Composer DAG based on events occurring in Google Cloud Storage (GCS).

To illustrate this scenario, let's consider an example: Suppose I have a Composer DAG designed to process data files stored in a GCS bucket. These files may arrive at any time throughout the day, and it's crucial to process them promptly upon arrival since they contain time-sensitive information.

Now, let's explore several solutions to address this requirement:

Option 1: Scheduled DAG Runs

Scheduling a DAG to run at regular intervals throughout the day is a straightforward approach. However, it suffers from several drawbacks:

  1. Potential Overkill: Running the DAG every few minutes, regardless of whether new files are available, can lead to unnecessary resource consumption and costs, especially if the frequency far exceeds the actual arrival rate of files.

  2. Processing Delay: If a file arrives just after a DAG run completes, it will have to wait until the next scheduled run, potentially introducing processing delays of up to the interval period.

For scenarios where files arrive sporadically or with varying frequencies, Option 1 may not be the most efficient solution.

Option 2: Cloud Function with GCS Trigger

Creating a Cloud Function triggered by GCS events offers a more responsive and efficient solution:

  1. Real-Time Processing: By triggering the Cloud Function upon file creation or finalization in GCS, we ensure immediate processing of incoming files, eliminating unnecessary delays.

  2. Dynamic DAG Submission: The Cloud Function can dynamically determine the appropriate DAG to submit based on the incoming file's metadata, allowing for flexibility and automation.

However, this approach has a notable drawback:

  1. Potential Overhead: While the Cloud Function provides real-time processing capabilities, it triggers for every file that arrives in the designated bucket, potentially leading to unnecessary function invocations and associated costs.

Option 3: Pub/Sub Notifications for GCS

Utilizing Pub/Sub notifications for GCS object changes introduces a more refined and scalable solution:

  1. Fine-Grained Triggering: By subscribing a Cloud Function to a Pub/Sub topic associated with specific GCS events (e.g., object finalization in a designated folder), we can ensure that the function only triggers when relevant files are created or modified.

  2. Efficient Resource Utilization: With Pub/Sub notifications, we avoid the overhead of continuously polling GCS for changes and running unnecessary function invocations, leading to more efficient resource utilization and cost savings.

  3. Real-Time Processing with Reduced Overhead: This approach combines the benefits of real-time file processing with reduced overhead, making it an optimal choice for scenarios where responsiveness and efficiency are paramount.

i). Create a topic on the GCS bucket, folder( ex: gs://gc-us-gcs-xyz-bkt/stg/) Ref: https://cloud.google.com/storage/docs/gsutil/commands/notification Sample Code: gsutil notification create -t tpc_xyz_file_stg -f json -e OBJECT_FINALIZE
-p stg/ gs://gc-us-gcs-xyg-bkt ii). Deploy the Cloud Function and associate the Cloud Function to the topic created above. Refer Cloud Function creation documentation to create a Cloud Function. Sample Code: gcloud functions deploy xyz_trigger_cf_gcs_events --runtime python312
--region us-east4
--ingress-settings=internal-and-gclb --entry-point xyz_trigger_cf_on_gcs_events
--trigger-topic= tpc_xyz_file_stg --source .

Conclusion:

While each option has its merits, Option 3 stands out as the most appropriate solution for triggering Composer DAGs based on GCS events. By leveraging Pub/Sub notifications, we achieve real-time processing with minimal overhead and optimal resource utilization, ensuring timely and efficient handling of time-sensitive data files.


Saturday, March 4, 2023

REST API to trigger Airflow/Composer DAG

REST APIs are a popular way for applications to communicate with each other, allowing for flexible and scalable interactions. In this blog post, we'll explore how to use Google Cloud Run to trigger an Airflow/Composer DAG through a REST API.


Google Cloud Run is a fully managed compute platform that allows you to run stateless containers that are automatically scaled to meet incoming traffic. Airflow is an open-source platform to programmatically author, schedule and monitor workflows. Composer is a managed version of Airflow by Google Cloud.


Before diving into the implementation, let's first understand what a DAG is. A DAG (Directed Acyclic Graph) is a collection of tasks that are dependent on each other, with a defined order of execution. Airflow uses DAGs to define workflows and their dependencies.


To trigger an Airflow/Composer DAG using a REST API, we can create a simple Python Flask app that sends a request to the Airflow/Composer REST API endpoint. The REST API endpoint can be used to trigger a DAG run by passing a JSON payload that contains the DAG ID, and any other necessary parameters.


Here's an example of how to create a Flask app to trigger a DAG using the Airflow REST API:

 from flask import Flask, request

import requests


app = Flask(__name__)


# Airflow/Composer REST API endpoint

AIRFLOW_ENDPOINT = 'https://airflow.googleapis.com/v1beta1/dags/<dag_id>/dagRuns'


# Service account credentials

HEADERS = {

    'Authorization': 'Bearer <access_token>',

    'Content-Type': 'application/json'

}


@app.route('/trigger-dag', methods=['POST'])

def trigger_dag():

    # Get payload data from request body

    payload = request.get_json()


    # Build request data

    data = {

        'conf': payload.get('conf', {}),

        'replace_microseconds': 'false'

    }


    # Send request to Airflow REST API

    response = requests.post(AIRFLOW_ENDPOINT, json=data, headers=HEADERS)


    if response.status_code == 200:

        return 'DAG triggered successfully', 200

    else:

        return 'Failed to trigger DAG', response.status_code

In this example, we have defined a route '/trigger-dag' that accepts a POST request with a JSON payload. The payload contains any necessary parameters for the DAG run. The Flask app sends a request to the Airflow/Composer REST API endpoint using the requests library.

The HEADERS variable contains the service account credentials required to authenticate with the Airflow/Composer REST API. Replace <access_token> with the actual access token for the service account that has the necessary permissions to trigger DAGs.

Replace <dag_id> with the actual ID of the DAG that you want to trigger. You can find the DAG ID in the Airflow/Composer UI or by running the command airflow list_dags in the Cloud Shell.

Once you have created the Flask app, you can deploy it to Google Cloud Run using the following command:

gcloud run deploy <service_name> --image gcr.io/<project_id>/<image_name> --platform managed


Replace <service_name> with the desired name for the Cloud Run service, <project_id> with your Google Cloud project ID, and <image_name> with the name of the container image that contains the Flask app.

Finally, you can test the REST API by sending a POST request to the Cloud Run service URL with the JSON payload. The DAG should trigger successfully, and you can monitor the status of the DAG run in the Airflow/Composer UI.

In conclusion, using Google Cloud Run to trigger an Airflow/Composer DAG through a REST API is a simple and scalable solution

You may check this code repository for working solution



Thursday, December 10, 2020

PL/SQL REST API Call to receive JSON Payload

 declare

  req utl_http.req;

  res utl_http.resp;

  lv_url varchar2(4000) := 'http://<host>:<port>/<something>/';

  buffer varchar2(32767);

  lj_json_obj  json_object_t;

BEGIN

dbms_output.put_line(lv_url);

req := utl_http.begin_request(lv_url,'POST','HTTP/1.1');

                 utl_http.set_header(req,'Content-type','application/json');

                 utl_http.set_header(req, 'Accept', 'application/json');

                 res := utl_http.get_response(req);

  -- process the response from the HTTP call

  begin

      utl_http.read_text(res, buffer,1000000);

      lj_json_obj := json_object_t(buffer);

      utl_http.end_response(res);

  exception

    when utl_http.end_of_body then

      utl_http.end_response(res);

     end ;

end if;

end;

/

Tuesday, October 8, 2019

Python- Pandas

Read individual field values from a data frame

db_df = pd.read_sql("select brms_file_check_notify_pkg.decrypt(a.login_id) password_dr, 
a.* from file_check_notification a"                    " where sysdate > next_run and file_check_id = 3"                       , ebsconn)
ebsconn.close()
print('DB is queried for tab1..')
hostname = db_df.loc[0].at['host']

username = db_df.loc[0].at['login_id']
password = db_df.loc[0].at['password_dr']
location = db_df.loc[0].at['location']
subject  = db_df.loc[0].at['email_subject']
protocol = db_df.loc[0].at['protocol_type']



Thursday, January 26, 2017

Oracle Analytic Functions - Difference between ORDER BY and PARTITION BY

ORDER BY - brings up the running total/sum by certain repeating column
PARTITION BY - brings up the GROUP total by certain repeating column

Consider following simple table.

This projects example contains 2 projects - ABC and CAB
Each project contains 2 tasks and its cost respectively.




By using same Analytic functions with two different window, can get "running totals" and "group totals"

select project_id, project_name, task_no, cost
,sum(cost) over(order by project_id,task_no) running_total_by_project_task
,sum(cost) over(order by project_id) running_total_by_project
,sum(cost) over(partition by project_id) total_cost_by_project
,sum(cost) over(partition by task_no) total_cost_by_task
from temp_project_details;

Output below..

Thursday, October 29, 2015

Getting Concurrent Request Id within Sql Loader based Concurrent Programs

None of the following will work in SQL LOADER.
fnd_global.conc_request_id
- fnd_profile.value('conc_request_id')

First will update request_id as -1 and second will update as blank/NULL

You may consider this.. it works for me.

- Create a function, something like below...
===
CREATE OR REPLACE PACKAGE BODY

IS
-- define global variable..
   gn_sqlldr_req_id     NUMBER ;


FUNCTION get_request_id RETURN NUMBER
IS
ln_req_id NUMBER ;
BEGIN


IF gn_sqlldr_req_id IS NULL then

select max(request_id)
INTO gn_sqlldr_req_id
from fnd_concurrent_requests where  concurrent_program_id  in (
select concurrent_program_id from fnd_concurrent_programs where concurrent_program_name =' YOUR_PROG_SHORT_NAME')   ;
END IF ;

RETURN gn_sqlldr_req_id ;

END ;

====
Your SQL Loader control file..
==
load data
INFILE '/tmp/XMITINVD.TXT'
INTO TABLE xxinv_TABLE_stg
APPEND
FIELDS TERMINATED BY '|'
TRAILING NULLCOLS

(
---
---
---
request_id ".get_request_id" -- your function here
--
--
--
)

This worked for me.