Efficient ETL Pipeline Using Apache Airflow, PostgreSQL, and Redshift

featureimage

 

Introduction

 

In this case study, we explore the development of a robust ETL (Extract, Transform, Load) pipeline for a data-driven company. The project leverages Apache Airflow for orchestrating workflows, PostgreSQL for intermediate data storage, and Amazon Redshift for data warehousing. The goal is to streamline the processing of large datasets sourced from multiple APIs, ensuring efficient data transformation and storage.

 

The Challenge

 

The company faced several challenges with their existing data processing s olution:

  • Manual Processes: The existing ETL processes were heavily manual, leading to inefficiencies and delays.
  • Scalability Issues: The solution could not handle the growing data volumes, causing frequent slowdowns and failures.
  • Complex Transformations: Increasingly complex business rules made data transformation cumbersome and error-prone.
  • Data Integrity: Ensuring data accuracy and consistency across the pipeline was difficult, leading to potential data quality issues.
  • Maintenance Overhead: The lack of automation and modularity made the system hard to maintain and prone to errors.

Our Solution

 

To address these challenges, we designed and implemented a new ETL pipeline with the following components and features:

 

Technology Stack

  • Apache Airflow: Used for orchestrating ETL workflows, allowing for automation and scheduling.
  • PostgreSQL: Employed for intermediate data storage and transformation tasks.
  • Amazon Redshift: Utilized for the final data warehousing, providing scalable storage and fast query performance.
  • AWS SQS: Managed the message queue, facilitating reliable data processing.
  • AWS S3: Provided temporary data storage during various ETL stages.

 

Architecture

 

The ETL pipeline consisted of three main stages:

  1. Data Extraction:
    • Data was extracted from multiple APIs using Airflow DAGs.
    • Each API response was pushed to an AWS SQS queue for further processing.
  2. Data Transformation:
    • Messages from SQS were read and processed.
    • Data was temporarily stored in PostgreSQL where transformations were applied according to business rules.
  3. Data Loading:
    • Transformed data was loaded into Amazon Redshift.
    • Data integrity checks were performed to ensure accuracy.

 

Implementation

 

Data Extraction:

Python


     

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago

from dags.utils.default_args import default_args

from scripts.aws_sqs import AwsSqs

 

aws_sqs = AwsSqs(queue_url='your_queue_url')

 

def extract_data_from_api(**kwargs):

    response = call_api()  # Replace with actual API call

    aws_sqs.push('data_type', response)

 

with DAG(

    'data_extraction_dag',

    default_args=default_args,

    description='Extract data from APIs and push to SQS',

    schedule_interval='@hourly',

    start_date=days_ago(1),

    catchup=False,

) as dag:

 

    extract_task = PythonOperator(

        task_id='extract_data_from_api',

        python_callable=extract_data_from_api,

        provide_context=True,

        dag=dag,

    )

 

    extract_task

 

Data Transformation:

Python


     

import psycopg2

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago

from dags.utils.default_args import default_args

from scripts.aws_sqs import AwsSqs

import json

 

aws_sqs = AwsSqs(queue_url='your_queue_url')

 

def transform_data(**kwargs):

    messages = aws_sqs.read_batch(max_messages=10)

    if not messages:

        return

    

    conn = psycopg2.connect(dbname='your_db', user='your_user', password='your_password', host='your_host')

    cursor = conn.cursor()

 

    for message in messages:

        data = json.loads(message['Body'])

        transformed_data = transform_logic(data)  # Replace with actual transformation logic

        insert_query = """

        INSERT INTO intermediate_table (type, json_data)

        VALUES (%s, %s)

        """

        cursor.execute(insert_query, ('data_type', json.dumps(transformed_data)))

        conn.commit()

 

    cursor.close()

    conn.close()

 

with DAG(

    'data_transformation_dag',

    default_args=default_args,

    description='Transform data and store in PostgreSQL',

    schedule_interval='@hourly',

    start_date=days_ago(1),

    catchup=False,

) as dag:

 

    transform_task = PythonOperator(

        task_id='transform_data',

        python_callable=transform_data,

        provide_context=True,

        dag=dag,

    )

 

    transform_task

 

Data Loading:

Python


     

import psycopg2

import redshift_connector

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago

from dags.utils.default_args import default_args

 

def load_data_to_redshift(**kwargs):

    pg_conn = psycopg2.connect(dbname='your_db', user='your_user', password='your_password', host='your_host')

    pg_cursor = pg_conn.cursor()

 

    rs_conn = redshift_connector.connect(

        host='your_redshift_host',

        database='your_redshift_db',

        user='your_user',

        password='your_password'

    )

    rs_cursor = rs_conn.cursor()

 

    select_query = "SELECT * FROM intermediate_table"

    pg_cursor.execute(select_query)

    rows = pg_cursor.fetchall()

 

    for row in rows:

        insert_query = """

        INSERT INTO final_table (type, json_data, created_at)

        VALUES (%s, %s, %s)

        """

        rs_cursor.execute(insert_query, (row[1], row[2], row[3]))

        rs_conn.commit()

 

    pg_cursor.close()

    pg_conn.close()

    rs_cursor.close()

    rs_conn.close()

 

with DAG(

    'data_loading_dag',

    default_args=default_args,

    description='Load data from PostgreSQL to Redshift',

    schedule_interval='@hourly',

    start_date=days_ago(1),

    catchup=False,

) as dag:

 

    load_task = PythonOperator(

        task_id='load_data_to_redshift',

        python_callable=load_data_to_redshift,

        provide_context=True,

        dag=dag,

    )

 

    load_task

 

Results

 

The implementation of the new ETL pipeline yielded significant improvements:

  • Processing Time: Reduced by 50% due to parallel processing with Celery Executors.
  • Data Accuracy: Enhanced with automated integrity checks at each stage.
  • Maintenance: Simplified through modular DAG design and clear logging, reducing manual intervention and errors.

 

Conclusion

 

The ETL pipeline project successfully leveraged Apache Airflow, PostgreSQL, and Amazon Redshift to meet the company's data processing needs. This solution not only enhanced performance and scalability but also ensured data accuracy and ease of maintenance, providing a robust foundation for the company’s data-driven decision-making.

 

At Amwhiz, we specialize in developing efficient ETL solutions that streamline data workflows and empower businesses to make informed decisions. Our commitment to innovation and excellence ensures that our clients stay ahead in the competitive data-driven landscape.

Efficient ETL Pipeline Using Apache Airflow, PostgreSQL, and Redshift