Running Spark Jobs using Apache Airflow™ on IBM Analytics Engine
 
IBM Analytics Engine provides Apache Spark environments a service that decouples the compute and storage tiers to control costs, and achieve analytics at scale. Instead of a permanent cluster formed of dual-purpose nodes, IBM Analytics Engine enables users to store data in an object storage layer such as IBM Cloud Object Storage and spins up clusters of compute notes when needed.
Apache Airflow™
Apache Airflow is an open-source workflow management platform for data engineering pipelines.
Usecase: Spark Sequential Jobs using IAE
Consider the scenario of Extract, Transform, and Load (ETL) operations, which often entail executing several Spark jobs either sequentially or concurrently. With Airflow, we can craft a DAG to facilitate this process.
Today, our focus is on constructing a DAG that orchestrates a series of Spark jobs. Initially, it will execute one Spark job and pause until its completion. Subsequently, it will concurrently trigger two additional jobs and monitoring all of them to ensure they finish successfully. While the implementation currently leverages IBM Analytics Engine on Cloud Pak for Data to execute the Spark jobs, you have the flexibility to modify the code to operate on any supported environments.
In order to start working on Apache Airflow™, make sure it is installed, refer to this document to install airflow.
In order to start airflow in standalone mode, you can run -
airflow standalone
Let’s create our DAG file called “spark_pipeline.py” in $AIRFLOW_HOME/dags/ directory. (Default path for $AIRFLOW_HOME is set to ~/airflow)
vi $AIRFLOW_HOME/dags/spark_pipeline.py
Add the following code in the file -
import base64
from datetime import timedelta, datetime
import json
from time import sleep
import os
# The DAG object
from airflow import DAG
# Operators
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
import requests
# initializing the default arguments
default_args = {
    'owner': 'IAE',
    'start_date': datetime(2022, 3, 4),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'cpd_endpoint': 'https://cpd-cpd-instance.apps.your-cluster.com', # CPD Endpoint
    'cpd_username': 'testuser', # CPD Username
    'cpd_api_key': 'set-your-api-key-here', # CPD User's ApiKey
    'iae_instance_id': '1585c6d5-7130-4ab6-9cbc-82d9150edbc2' # Spark Instance Id from Service Instance Details
}
# Instantiate a DAG object
spark_pipeline_dag = DAG('spark_pipeline_job',
		default_args=default_args,
		description='Spark Sequential/Parallel Job',
		schedule_interval='*/5 * * * *', 
		catchup=False,
		tags=['iae, spark']
)
# Python requests package workaround
os.environ['NO_PROXY'] = '*'
def submit_spark_job():
    try:
        print('submit_spark_job')
        url = f"{default_args['cpd_endpoint']}/v4/analytics_engines/{default_args['iae_instance_id']}/spark_applications"
        
        token = f"{default_args['cpd_username']}:{default_args['cpd_api_key']}".encode('ascii')
        zenApiToken= base64.b64encode(token).decode('ascii')
        headers = {'Content-type': 'application/json', 'Authorization': f'ZenApiKey {zenApiToken}'}
        
        response = requests.post(url, None, {
            "application_details": {
                "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
                "application_arguments": ["/opt/ibm/spark/examples/src/main/resources/people.txt"],
                "conf": {
                    "ae.spark.driver.log.level":"ERROR",
                    "ae.spark.executor.log.level":"ERROR",
                }
            }
        } , headers=headers, verify=False)
        
        print("Response", response.content)
        return response.json()['application_id']
    except Exception as inst:
        print('Error')
        exit
        print(inst)
def wait_till_all_jobs_completed():
    try:
        print('wait_till_all_spark_job_completes')
        
        while True:
            url = f"{default_args['cpd_endpoint']}/v4/analytics_engines/{default_args['iae_instance_id']}/spark_applications?state=ACCEPTED,STARTING,RUNNING,WAITING"
            
            token = f"{default_args['cpd_username']}:{default_args['cpd_api_key']}".encode('ascii')
            zenApiToken= base64.b64encode(token).decode('ascii')
            headers = {'Content-type': 'application/json', 'Authorization': f'ZenApiKey {zenApiToken}'}
            
            response = requests.get(url, headers=headers, verify=False)
            print(response.content)
            if len(response.json()['applications']) == 0:
                break
            print('Job is not completed, sleeping for 10secs')
            sleep(10)
    except Exception as inst:
        print(inst)
def spark_job_task():
    appId = submit_spark_job()
def wait_for_complete():
    wait_till_all_jobs_completed()
# Creating first task
spark_job_1 = PythonOperator(task_id='spark_job_1', python_callable=spark_job_task, dag=spark_pipeline_dag)
# Creating second task
wait_till_all_jobs_completed_1 = PythonOperator(task_id='wait_till_all_jobs_completed', python_callable=wait_for_complete, dag=spark_pipeline_dag)
spark_job_2 = PythonOperator(task_id='spark_job_2', python_callable=spark_job_task, dag=spark_pipeline_dag)
spark_job_3 = PythonOperator(task_id='spark_job_3', python_callable=spark_job_task, dag=spark_pipeline_dag)
wait_till_all_jobs_completed_2 = PythonOperator(task_id='wait_till_all_jobs_completed_2', python_callable=wait_for_complete, dag=spark_pipeline_dag)
# Set the order of execution of tasks. 
spark_job_1 >> wait_till_all_jobs_completed_1 >> [spark_job_2, spark_job_3] >> wait_till_all_jobs_completed_2
Note that the default_args needs parameters from your IBM Analytics Engine instance and user credentials of Cloud Pak for Data.
This code sets up an Airflow DAG (Directed Acyclic Graph) named spark_pipeline_job which orchestrates a Spark job sequence. Here’s a summary of what it does:
- 
    It imports necessary libraries/modules such as base64, datetime, json, requests, and Airflow components like DAG, DummyOperator, and PythonOperator. 
- 
    It defines default arguments including owner, start date, retries, retry delay, CPD (Cloud Pak for Data) endpoint, CPD username, CPD API key, and IAE (IBM Analytics Engine) instance ID. 
- 
    The DAG is instantiated with specific parameters like the name, default arguments, description, schedule interval, catchup, and tags. 
Two Python functions are defined:
- 
    submit_spark_job(): This function submits a Spark job to the specified endpoint with provided details such as application, application arguments, and configuration. It returns the application ID.wait_till_all_jobs_completed(): This function continuously checks the status of Spark jobs until all jobs are completed. Four Python tasks are created using PythonOperator:
- spark_job_1: Submits the first Spark job.
- wait_till_all_jobs_completed_1: Waits for the completion of all Spark jobs after the first job.
- spark_job_2and- spark_job_3: Submit subsequent Spark jobs.
- wait_till_all_jobs_completed_2: Waits for the completion of all Spark jobs after the second set of jobs.
This DAG orchestrates the execution of Spark jobs in sequence, waiting for each job to complete before starting the next set of jobs.
Finally, we just need to enable this DAG from Apache Airflow, head over to your airflow console and search for “spark_pipeline_job”, open it and enable -
 
Conclusion
Finally, the DAG will being to execute every 5 mins. A green run should like the one above, you can click on each task and switch to logs directory to view the logs.
This DAG can also be exeecuted for other environments of IBM Analytics Engine with authentication & API related code changes. Let me know if you are looking for other examples and I hope you learnt something new today. Thanks for reading.
