Thursday, March 7, 2024

Workshop 2 : Stream Processing in SQL with RisingWave

 DTC Data Engineering Zoomcamp 2024

Environments

  1. Github Codespace
  2. GCP VM Instance

Prerequisites

  • Docker and Docker Compose
   @garjita63 ➜ ~/risingwave-data-talks-workshop-2024-03-04 (main) $ docker version
   Client:
      Version:           24.0.9-1
      API version:       1.43
      Go version:        go1.20.13
      Git commit:        293681613032e6d1a39cc88115847d3984195c24
      Built:             Wed Jan 31 20:53:14 UTC 2024
      OS/Arch:           linux/amd64
      Context:           default

   Server:
      Engine:
       Version:          24.0.9-1
       API version:      1.43 (minimum version 1.12)
       Go version:       go1.20.13
       Git commit:       fca702de7f71362c8d103073c7e4a1d0a467fadd
       Built:            Thu Feb  1 00:12:23 2024
       OS/Arch:          linux/amd64
       Experimental:     false
      containerd:
       Version:          1.6.28-1
       GitCommit:        ae07eda36dd25f8a1b98dfbf587313b99c0190bb
      runc:
       Version:          1.1.12-1
       GitCommit:        51d5e94601ceffbbd85688df1c928ecccbfa4685
      docker-init:
       Version:          0.19.0
       GitCommit:        de40ad0
  • Python 3.7 or later
   @garjita63 ➜ ~/risingwave-data-talks-workshop-2024-03-04 (main) $ python --version
   Python 3.10.13
  • pip and virtualenv for Python
   @garjita63 ➜ ~/risingwave-data-talks-workshop-2024-03-04 (main) $ pip --version
   pip 24.0 from /usr/local/python/3.10.13/lib/python3.10/site-packages/pip (python 3.10)
  • psql (I use PostgreSQL-14.9)
   @garjita63 ➜ ~/risingwave-data-talks-workshop-2024-03-04 (main) $ psql -V
   psql (PostgreSQL) 16.2 (Ubuntu 16.2-1.pgdg20.04+1)
  • Clone this repository:
   git clone git@github.com:risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git
   cd risingwave-data-talks-workshop-2024-03-04

Or, if you prefer HTTPS:

   git clone https://github.com/risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git
   cd risingwave-data-talks-workshop-2024-03-04

Datasets

NYC Taxi & Limousine Commission website

  • yellow_tripdata_2022-01.parquet
  • taxi_zone.csv

Dataset have already been downloaded and are available in the data directory.

The file seed_kafka.py contains the logic to process the data and populate RisingWave.

In this workshop, we will replace the timestamp fields in the trip_data with timestamps close to the current time. That's because yellow_tripdata_2022-01.parquet contains historical data from 2022, and we want to simulate processing real-time data.

Project Structure

@garjita63 ➜ ~/risingwave-data-talks-workshop-2024-03-04 (main) $ tree
.
├── README.md
├── assets
│   ├── mv1_plan.png
│   ├── mv2_plan.png
│   └── project.png
├── clickhouse-sql
│   ├── avg_fare_amt.sql
│   └── demo_table.sql
├── commands.sh
├── data
│   ├── taxi_zone.csv
│   └── yellow_tripdata_2022-01.parquet
├── docker
│   ├── Dockerfile
│   ├── Dockerfile.hdfs
│   ├── README.md
│   ├── aws
│   │   ├── Dockerfile
│   │   └── aws-build.sh
│   ├── aws.env
│   ├── dashboards
│   │   ├── risingwave-dev-dashboard.json
│   │   └── risingwave-user-dashboard.json
│   ├── docker-compose.yml
│   ├── grafana-risedev-dashboard.yml
│   ├── grafana-risedev-datasource.yml
│   ├── grafana.ini
│   ├── hdfs_env.sh
│   ├── multiple_object_storage.env
│   ├── prometheus.yaml
│   └── risingwave.toml
├── homework.md
├── index.html
├── requirements.txt
├── risingwave-sql
│   ├── sink
│   │   ├── avg_fare_amt_sink.sql
│   │   └── demo_clickhouse_sink.sql
│   └── table
│       ├── taxi_zone.sql
│       └── trip_data.sql
├── seed_kafka.py
├── server.py
├── slides.pdf
└── workshop.md

Getting Started

Before getting your hands dirty with the project, we will:

  1. Run some diagnostics.
  2. Start the RisingWave cluster.
  3. Setup our python environment.
# Check version of psql
psql --version
source commands.sh

# Start the RW cluster
start-cluster

# Setup python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

commands.sh contains several commands to operate the cluster. You may reference it to see what commands are available.

Workshop (Stream Processing in SQL with RisingWave)

Source

image

These two commands shoudl be run every time opening a new terminal.

source .venv/bin/activate
source commands.sh

Starts the risingwave cluster

start-cluster() {
docker-compose -f docker/docker-compose.yml up -d
}

start-cluster
image

Seed trip data from the parquet file

stream-kafka() {
./seed_kafka.py update
}

stream-kafka
image

Now we can let that run in the background.

Let's open another terminal to create the trip_data table:

source commands.sh
psql -f risingwave-sql/table/trip_data.sql
image
psql -c 'SHOW TABLES;'

Stream Processing with Materialized Views in RisingWave

Validating the ingested data

Now we are ready to begin processing the real-time stream being ingested into RisingWave!

The first thing we will do is to check taxi_zone and trip_data, to make sure the data has been ingested correctly.

Let's start a psql session.

source commands.sh
psql
image

First, we verify taxi_zone, since this is static data:

SELECT * FROM taxi_zone;
image

We will also query for recent data, to ensure we are getting real-time data.

SELECT pulocationid, dolocationid, tpep_pickup_datetime, tpep_dropoff_datetime
FROM trip_data WHERE tpep_dropoff_datetime > now() - interval '1 minute';
image

We can join this with taxi_zone to get the names of the zones.

SELECT taxi_zone.Zone as pickup_zone, taxi_zone_1.Zone as dropoff_zone, tpep_pickup_datetime, tpep_dropoff_datetime
 FROM trip_data
 JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id
 JOIN taxi_zone as taxi_zone_1 ON trip_data.DOLocationID = taxi_zone_1.location_id
 WHERE tpep_dropoff_datetime > now() - interval '1 minute';
image

And finally make it into an MV so we can constantly query the latest realtime data:

CREATE MATERIALIZED VIEW latest_1min_trip_data AS
 SELECT taxi_zone.Zone as pickup_zone, taxi_zone_1.Zone as dropoff_zone, tpep_pickup_datetime, tpep_dropoff_datetime
 FROM trip_data
 JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id
 JOIN taxi_zone as taxi_zone_1 ON trip_data.DOLocationID = taxi_zone_1.location_id
 WHERE tpep_dropoff_datetime > now() - interval '1 minute';
image

We can now query the MV to see the latest data:

SELECT * FROM latest_1min_trip_data order by tpep_dropoff_datetime DESC;
image

Now we can start processing the data with Materialized Views, to provide analysis of the data stream!

Materialized View 1: Total Airport Pickups

The first materialized view we create will be to count the total number of pickups at the airports.

This is rather simple, we just need to filter the PULocationID to the airport IDs.

Recall taxi_zone contains metadata around the taxi zones, so we can use that to figure out the airport zones.

describe taxi_zone;
image

Let's first get the zone names by looking at the taxi_zone table:

SELECT * FROM taxi_zone WHERE Zone LIKE '%Airport';
image

Then we can simply join on their location ids to get all the trips:

    SELECT
        *
    FROM
        trip_data
            JOIN taxi_zone
                 ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport';
image

And finally apply the count(*) aggregation for each airport.

    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                 ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
image

We can now create a Materialized View to constantly query the latest data:

CREATE MATERIALIZED VIEW total_airport_pickups AS
    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
image

We can now query the MV to see the latest data:

SELECT * FROM total_airport_pickups;
image

So what actually happens for the MV?

We can examine the query plan to see what's happening:

EXPLAIN CREATE MATERIALIZED VIEW total_airport_pickups AS
    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                 ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
image

Go to your local RisingWave Dashboard to see the query plan.

Provided a simplified a simpler version here:

image

Materialized View 2: Airport pickups from JFK Airport, 1 hour before the latest pickup

We can adapt the previous MV to create a more specific one. We no longer need the GROUP BY, since we are only interested in 1 taxi zone, JFK Airport.

CREATE MATERIALIZED VIEW airport_pu as
SELECT
    tpep_pickup_datetime,
    pulocationid
FROM
    trip_data
        JOIN taxi_zone
            ON trip_data.PULocationID = taxi_zone.location_id
WHERE
        taxi_zone.Borough = 'Queens'
  AND taxi_zone.Zone = 'JFK Airport';
image

Next, we also want to keep track of the latest pickup

CREATE MATERIALIZED VIEW latest_jfk_pickup AS
    SELECT
        max(tpep_pickup_datetime) AS latest_pickup_time
    FROM
        trip_data
            JOIN taxi_zone
                ON trip_data.PULocationID = taxi_zone.location_id
    WHERE
        taxi_zone.Borough = 'Queens'
      AND taxi_zone.Zone = 'JFK Airport';
image

Finally, let's get the counts of the pickups from JFK Airport, 1 hour before the latest pickup

CREATE MATERIALIZED VIEW jfk_pickups_1hr_before AS
    SELECT
        count(*) AS cnt
    FROM
        airport_pu
            JOIN latest_jfk_pickup
                ON airport_pu.tpep_pickup_datetime > latest_jfk_pickup.latest_pickup_time - interval '1 hour'
            JOIN taxi_zone
                ON airport_pu.PULocationID = taxi_zone.location_id
    WHERE
        taxi_zone.Borough = 'Queens'
      AND taxi_zone.Zone = 'JFK Airport';
image

Simplified query plan:

image

Materialized View 3: Top 10 busiest zones in the last 1 minute

First we can write a query to get the counts of the pickups from each zone.

SELECT
    taxi_zone.Zone AS dropoff_zone,
    count(*) AS last_1_min_dropoff_cnt
FROM
    trip_data
        JOIN taxi_zone
            ON trip_data.DOLocationID = taxi_zone.location_id
GROUP BY
    taxi_zone.Zone
ORDER BY last_1_min_dropoff_cnt DESC
    LIMIT 10;
image

Next, we can create a temporal filter to get the counts of the pickups from each zone in the last 1 minute.

That has the form:

CREATE MATERIALIZED VIEW busiest_zones_1_min AS SELECT
    taxi_zone.Zone AS dropoff_zone,
    count(*) AS last_1_min_dropoff_cnt
FROM
    trip_data
        JOIN taxi_zone
            ON trip_data.DOLocationID = taxi_zone.location_id
WHERE
    trip_data.tpep_dropoff_datetime > (NOW() - INTERVAL '1' MINUTE)
GROUP BY
    taxi_zone.Zone
ORDER BY last_1_min_dropoff_cnt DESC
    LIMIT 10;
image

Materialized View 4: Longest trips

Here, the concept is similar as the previous MV, but we are interested in the top 10 longest trips for the last 5 min.

First we create the query to get the longest trips:

SELECT
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    taxi_zone_pu.Zone as pickup_zone,
    taxi_zone_do.Zone as dropoff_zone,
    trip_distance
FROM
    trip_data
        JOIN taxi_zone as taxi_zone_pu
             ON trip_data.PULocationID = taxi_zone_pu.location_id
        JOIN taxi_zone as taxi_zone_do
             ON trip_data.DOLocationID = taxi_zone_do.location_id
ORDER BY
    trip_distance DESC
    LIMIT 10;
image

Then we can create a temporal filter to get the longest trips for the last 5 minutes:

CREATE MATERIALIZED VIEW longest_trip_1_min AS SELECT
        tpep_pickup_datetime,
        tpep_dropoff_datetime,
        taxi_zone_pu.Zone as pickup_zone,
        taxi_zone_do.Zone as dropoff_zone,
        trip_distance
    FROM
        trip_data
    JOIN taxi_zone as taxi_zone_pu
        ON trip_data.PULocationID = taxi_zone_pu.location_id
    JOIN taxi_zone as taxi_zone_do
        ON trip_data.DOLocationID = taxi_zone_do.location_id
    WHERE
        trip_data.tpep_pickup_datetime > (NOW() - INTERVAL '5' MINUTE)
    ORDER BY
        trip_distance DESC
    LIMIT 10;
image

After this, you may run the visualization dashboard to see the data in real-time.

Start the backend which queries RisingWave:

./server.py

Visualize Data from Materialized View 3 and 4

Start the frontend, in a separate terminal, if you're on OSX:

open index.html

Start the frontend, in a separate terminal, if you're on linux:

xdg-open index.html

Materialized View 5: Average Fare Amount vs Number of rides

How does avg_fare_amt change relative to number of pickups per minute?

We use something known as a tumble window function, to compute this.

CREATE MATERIALIZED VIEW avg_fare_amt AS
SELECT
    avg(fare_amount) AS avg_fare_amount_per_min,
    count(*) AS num_rides_per_min,
    window_start,
    window_end
FROM
    TUMBLE(trip_data, tpep_pickup_datetime, INTERVAL '1' MINUTE)
GROUP BY
    window_start, window_end
ORDER BY
    num_rides_per_min ASC;

For each window we compute the average fare amount and the number of rides.

That's all for the materialized views!

Now we will see how to sink the data out from RisingWave.

How to sink data from RisingWave to Clickhouse

Reference:

  • https://docs.risingwave.com/docs/current/data-delivery/
  • https://docs.risingwave.com/docs/current/sink-to-clickhouse/

We have done some simple analytics and processing of the data in RisingWave.

Now we want to sink the data out to Clickhouse, for further analysis.

We will create a Clickhouse table to store the data from the materialized views.

Open n anew termina.

image
source commands.sh
clickhouse-client-term
image
CREATE TABLE avg_fare_amt(
    avg_fare_amount_per_min numeric,
    num_rides_per_min Int64,
) ENGINE = ReplacingMergeTree
PRIMARY KEY (avg_fare_amount_per_min, num_rides_per_min);
image

We will create a Clickhouse sink to sink the data from the materialized views to the Clickhouse table.

CREATE SINK IF NOT EXISTS avg_fare_amt_sink AS SELECT avg_fare_amount_per_min, num_rides_per_min FROM avg_fare_amt
WITH (
    connector = 'clickhouse',
    type = 'append-only',
    clickhouse.url = 'http://clickhouse:8123',
    clickhouse.user = '',
    clickhouse.password = '',
    clickhouse.database = 'default',
    clickhouse.table='avg_fare_amt',
    force_append_only = 'true'
);
image

Now we can run queries on the data ingested into clickhouse:

clickhouse-client-term

Run some queries in Clickhouse

select max(avg_fare_amount_per_min) from avg_fare_amt;
image
select min(avg_fare_amount_per_min) from avg_fare_amt;
image

RisingWave Dashboard









Summary

In this workshop you have learnt:

  • How to ingest data into RisingWave using Kafka
  • How to process the data using Materialized Views
  • Using Aggregations
  • Using Temporal Filters
  • Using Window Functions (Tumble)
  • Using Joins
  • Layering MVs to build a stream pipeline
  • How to sink the data out from RisingWave to Clickhouse

Materialized Views List Created

  1. latest_1min_trip_data
CREATE MATERIALIZED VIEW latest_1min_trip_data AS
 SELECT taxi_zone.Zone as pickup_zone, taxi_zone_1.Zone as dropoff_zone, tpep_pickup_datetime, tpep_dropoff_datetime
 FROM trip_data
 JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id
 JOIN taxi_zone as taxi_zone_1 ON trip_data.DOLocationID = taxi_zone_1.location_id
 WHERE tpep_dropoff_datetime > now() - interval '1 minute';
  1. total_airport_pickups
CREATE MATERIALIZED VIEW total_airport_pickups AS
    SELECT
        count(*) AS cnt,
        taxi_zone.Zone
    FROM
        trip_data
            JOIN taxi_zone
                ON trip_data.PULocationID = taxi_zone.location_id
    WHERE taxi_zone.Zone LIKE '%Airport'
    GROUP BY taxi_zone.Zone;
  1. airport_pu
CREATE MATERIALIZED VIEW airport_pu as
SELECT
    tpep_pickup_datetime,
    pulocationid
FROM
    trip_data
        JOIN taxi_zone
            ON trip_data.PULocationID = taxi_zone.location_id
WHERE
        taxi_zone.Borough = 'Queens'
  AND taxi_zone.Zone = 'JFK Airport';
  1. latest_jfk_pickup
CREATE MATERIALIZED VIEW latest_jfk_pickup AS
    SELECT
        max(tpep_pickup_datetime) AS latest_pickup_time
    FROM
        trip_data
            JOIN taxi_zone
                ON trip_data.PULocationID = taxi_zone.location_id
    WHERE
        taxi_zone.Borough = 'Queens'
      AND taxi_zone.Zone = 'JFK Airport';
  1. jfk_pickups_1hr_before
CREATE MATERIALIZED VIEW jfk_pickups_1hr_before AS
    SELECT
        count(*) AS cnt
    FROM
        airport_pu
            JOIN latest_jfk_pickup
                ON airport_pu.tpep_pickup_datetime > latest_jfk_pickup.latest_pickup_time - interval '1 hour'
            JOIN taxi_zone
                ON airport_pu.PULocationID = taxi_zone.location_id
    WHERE
        taxi_zone.Borough = 'Queens'
      AND taxi_zone.Zone = 'JFK Airport';
  1. busiest_zones_1_min
CREATE MATERIALIZED VIEW busiest_zones_1_min AS SELECT
    taxi_zone.Zone AS dropoff_zone,
    count(*) AS last_1_min_dropoff_cnt
FROM
    trip_data
        JOIN taxi_zone
            ON trip_data.DOLocationID = taxi_zone.location_id
WHERE
    trip_data.tpep_dropoff_datetime > (NOW() - INTERVAL '1' MINUTE)
GROUP BY
    taxi_zone.Zone
ORDER BY last_1_min_dropoff_cnt DESC
    LIMIT 10;
  1. longest_trip_1_min
CREATE MATERIALIZED VIEW longest_trip_1_min AS SELECT
        tpep_pickup_datetime,
        tpep_dropoff_datetime,
        taxi_zone_pu.Zone as pickup_zone,
        taxi_zone_do.Zone as dropoff_zone,
        trip_distance
    FROM
        trip_data
    JOIN taxi_zone as taxi_zone_pu
        ON trip_data.PULocationID = taxi_zone_pu.location_id
    JOIN taxi_zone as taxi_zone_do
        ON trip_data.DOLocationID = taxi_zone_do.location_id
    WHERE
        trip_data.tpep_pickup_datetime > (NOW() - INTERVAL '5' MINUTE)
    ORDER BY
        trip_distance DESC
    LIMIT 10;
  1. avg_fare_amt
CREATE MATERIALIZED VIEW avg_fare_amt AS
SELECT
    avg(fare_amount) AS avg_fare_amount_per_min,
    count(*) AS num_rides_per_min,
    window_start,
    window_end
FROM
    TUMBLE(trip_data, tpep_pickup_datetime, INTERVAL '1' MINUTE)
GROUP BY
    window_start, window_end
ORDER BY
    num_rides_per_min ASC;
image

What's next?

https://tutorials.risingwave.com/docs/category/basics

Homework

To further understand the concepts, please try the Homework.


Saturday, March 2, 2024

Module 5 - Batch Processing (DTC DE Zoomcamp Week 5)

 Module 5 - Batch Processing  (DTC DE Zoomcamp Week 5)

5.1 Introduction

### Introduction to Batch Processing

Batch jobs are routines that are run in regular intervals.

The most common types of batch jobs are either : - daily - two times a day - hourly - every 5 mnutes - etc..

### Introduction to Apache Spark

             What is Spark?

               Apache Spark is an open-source, distributed processing system used for big data workloads. It utilizes in-memory caching, and optimized query execution for fast analytic queries against data of any size. It provides development APIs in Java, Scala, Python and R, and supports code reuse across multiple workloads—batch processing, interactive queries, real-time analytics, machine learning, and graph processing.

             Spark Architecture

               Spark applications run as independent sets of processes on a cluster as described in the below diagram:


           

             These set of processes are coordinated by the SparkContext object in your main program (called the driver program). SparkContext connects to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications.

               Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.

               Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.

             Core Components

               The following diagram gives the clear picture of the different components of Spark:

               


          

             How does Apache Spark work?

               Hadoop MapReduce is a programming model for processing big data sets with a parallel, distributed algorithm. Developers can write massively parallelized operators, without having to worry about work distribution, and fault tolerance. However, a challenge to MapReduce is the sequential multi-step process it takes to run a job. With each step, MapReduce reads data from the cluster, performs operations, and writes the results back to HDFS. Because each step requires a disk read, and write, MapReduce jobs are slower due to the latency of disk I/O.

               Spark was created to address the limitations to MapReduce, by doing processing in-memory, reducing the number of steps in a job, and by reusing data across multiple parallel operations. With Spark, only one-step is needed where data is read into memory, operations performed, and the results written back—resulting in a much faster execution. Spark also reuses data by using an in-memory cache to greatly speed up machine learning algorithms that repeatedly call a function on the same dataset. Data re-use is accomplished through the creation of DataFrames, an abstraction over Resilient Distributed Dataset (RDD), which is a collection of objects that is cached in memory, and reused in multiple Spark operations. This dramatically lowers the latency making Spark multiple times faster than MapReduce, especially when doing machine learning, and interactive analytics.

             Key differences: Apache Spark vs. Apache Hadoop

               Outside of the differences in the design of Spark and Hadoop MapReduce, many organizations have found these big data frameworks to be complimentary, using them together to solve a broader business challenge.

               Hadoop is an open source framework that has the Hadoop Distributed File System (HDFS) as storage, YARN as a way of managing computing resources used by different applications, and an implementation of the MapReduce programming model as an execution engine. In a typical Hadoop implementation, different execution engines are also deployed such as Spark, Tez, and Presto.

               Spark is an open source framework focused on interactive query, machine learning, and real-time workloads. It does not have its own storage system, but runs analytics on other storage systems like HDFS, or other popular stores like Google Cloud Storage, Google BigQuery, Amazon Redshift, Amazon S3, and others. Spark on Hadoop leverages YARN to share a common cluster and dataset as other Hadoop engines, ensuring consistent levels of service, and response.

             What are the benefits of Apache Spark?

               There are many benefits of Apache Spark to make it one of the most active projects in the Hadoop ecosystem. These include:

            Fast

               Through in-memory caching, and optimized query execution, Spark can run fast analytic queries against data of any size.

            Developer friendly

               Apache Spark natively supports Java, Scala, R, and Python, giving you a variety of languages for building your applications. These APIs make it easy for your developers, because they hide the complexity of distributed processing behind simple, high-level operators that dramatically lowers the amount of code required.

            Multiple workloads

               Apache Spark comes with the ability to run multiple workloads, including interactive queries, real-time analytics, machine learning, and graph processing. One application can combine multiple workloads seamlessly.

             How deploying Apache Spark in the cloud works?

               Spark is an ideal workload in the cloud, because the cloud provides performance, scalability, reliability, availability, and massive economies of scale. ESG research found 43% of respondents considering cloud as their primary deployment for Spark. The top reasons customers perceived the cloud as an advantage for Spark are faster time to deployment, better availability, more frequent feature/functionality updates, more elasticity, more geographic coverage, and costs linked to actual utilization.

5.2 Installations

Check Linux version:

lsb_release -a


Java

             Download the following Java 11.0.22 version from link https://www.oracle.com/id/java/technologies/downloads/#java11/

               jdk-11.0.22_linux-x64_bin.tar.gz

             Unzip (untar) file

      tar –xvzf jdk-11.0.22_linux-x64_bin.tar.gz

             Check Java version

      java -version   


             Add JAVA_HOME & edit PATH to ~/.bashrc file        


             source ~/.bashrc

      source ~/.bashrc

Apache Spark

             Download Apache Spark

               We suggest the following location for your download:

               https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz

             Unzip (untar) file

               tar -xvzf spark-3.4.2-bin-hadoop3.tgz

             Add SPARK_HOME & edit PATH to ~/.bashrc

               

             Check spark, pyspark versions

               Scala

      spark-shell

               


               Python

      pyspark

               


               SQL

      spark-sql

           


               R

               For sparkR, we have install R language before using sparkR.

      sudo apt install r-base

               Run sparkR

      sparkR

               



GCP Cloud

Create a project

IAM & Admin > New Project



Create Service Account



Region : asia-southeast2

Zone : asia-southeast2-a

Assign Roles like this:



Create Bucket




             Try copy file into bucket

      gsutil cp green_tripdata_2020-01.csv.gz gs://<bucket name>

Create Dataproc Cluster

Read also chapter 5.6.3

Using console:


Or using gcloud command:

gcloud config set compute/region asia-southeast2
gcloud config set compute/zone asia-southeast2-c
        
CLUSTER=<cluster name>
PROJECT=<project name>
REGION=<rgeion name>
ZONE=<zone name>
      
gcloud dataproc clusters create ${CLUSTER} --project=${PROJECT} --region=${REGION} --zone=${ZONE} --single-node

Create VM Instance





Start VM Instance, and copy & save External IP. This external IP would be used to open spark master browser in local machine, i.e. https://:8080

Open port 8080 (for spark master), 7077 (for spark worker)

Using console

Select project > Firewall policies > Create Firewall Policies






Using gcloud command

gcloud config set project <project name>
     
gcloud compute firewall-rules create allow-8088 --allow=tcp:8080 --description="Allow incoming traffic on TCP port 8080" --direction=INGRESS
     
gcloud compute firewall-rules create allow-7077 --allow=tcp:7077 --description="Allow incoming traffic on TCP port 7077" --direction=INGRESS

5.3 Spark SQL and DataFrames

5.3.1 First Look at Spark/PySpark

Note: if you’re running Spark and Jupyter Notebook on a remote machine, you will need to redirect ports 8888 for Jupyter Notebook and 4040 for the Spark UI.

Step 2 : Creating a Spark session

Import pyspark module

import pyspark
from pyspark.sql import SparkSession

Instantiate a Spark session, an object that we use to interact with Spark.

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

SparkSession is the class of the object that we instantiate. builder is the builder method.

master() sets the Spark master URL to connect to. The local string means that Spark will run on a local cluster. [*] means that Spark will run with as many CPU cores as possible.

appName() defines the name of our application/session. This will show in the Spark UI.

getOrCreate() will create the session or recover the object if it was previously created.

Once we’ve instantiated a session, we can access the Spark UI by browsing to localhost:4040. The UI will display all current jobs. Since we’ve just created the instance, there should be no jobs currently running.



image

Step 2 : Reading CSV File

Similarlly to Pandas, Spark can read CSV files into dataframes, a tabular data structure. Unlike Pandas, Spark can handle much bigger datasets but it’s unable to infer the datatypes of each column.

# Download csv compresses file
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv.gz')

read() reads the file.

option() contains options for the read method. In this case, we’re specifying that the first line of the CSV file contains the column names.

csv() is for reading CSV files.

Check :

df.show() or df.head() –> contents of the dataframe

df.schema or df.printSchema() –> dataframe schema

Step 3 : Check the inferred schema through df.schema

from pyspark.sql import types
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.IntegerType(), True)
    ]
)

df = spark.read \
    .option("header", "true") \
    .option("inferSchema",True) \
    .csv('fhvhv_tripdata_2021-01.csv.gz')

df.schema

Step 4 : Save DataFrame as parquet

As explained by the instructor, it is not good to have a smaller number of files than CPUs (because only a subset of CPUs will be used and the remaining will be idle). For such, we first use the repartition method and then save the data as parquet. Suppose we have 8 cores, then we can repartition our dataset into 24 parts.

df = df.repartition(24)
df.write.parquet('fhvhv/2021/01/')

!ls -lh fhvhv/2021/01/



5.3.2 Spark DataFrames

Create a dataframe from the parquet files.

df = spark.read.parquet('fhvhv/2021/01/')

Check the schema

df.printSchema()


select() with filter()

df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003')

Actions vs. Transformations

Action : code that is executed immediately (such as: show(), take(), head(), write(), etc.)

Transformations : code that is lazy, i.e., not executed immediately (suchs as: selecting columns, data filtering, joins and groupby operations)

Spark SQL Functions

Spark has many predefined SQL-like functions.

def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    return f'e/{num:03x}'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

5.3.3 (Optional) Preparing Yellow and Green Taxi Data

See :

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/04_pyspark_yellow.ipynb

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/04_pyspark_green.ipynb

5.3.4 SQL with Spark

Batch jobs can be expressed as SQL queries, and Spark can run them.

See :

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/06_spark_sql.ipynb

5.4 Spark Internals

5.4.1 Anatomy of a Spark Cluster



Apache Spark is considered as a powerful complement to Hadoop, big data’s original technology. Spark is a more accessible, powerful, and capable big data tool for tackling various big data challenges. It has become mainstream and the most in-demand big data framework across all major industries. Spark has become part of the Hadoop since 2.0. And is one of the most useful technologies for Python Big Data Engineers.

Architecture

The components of the spark application are: - Driver - Application Master - Spark Context - Cluster Resource Manager(aka Cluster Manager) - Executors

Spark uses a master/slave architecture with a central coordinator called Driver and a set of executable workflows called Executors that are located at various nodes in the cluster.

Driver

The Driver(aka driver program) is responsible for converting a user application to smaller execution units called tasks and then schedules them to run with a cluster manager on executors. The driver is also responsible for executing the Spark application and returning the status/results to the user.

Spark Driver contains various components – DAGScheduler, TaskScheduler, BackendScheduler and BlockManager. They are responsible for the translation of user code into actual Spark jobs executed on the cluster.

Other Driver properties: - can run in an independent process or on one of the work nodes for High Availability (HA); - stores metadata about all Resilient Distributed Databases and their partitions; - is created after the user sends the Spark application to the cluster manager (YARN in our case); - runs in its own JVM; - optimizes logical DAG transformations and, if possible, combines them in stages and determines the best location for execution of this DAG; - creates Spark WebUI with detailed information about the application;

Application Master

Application Master is a framework-specific entity charged with negotiating resources with ResourceManager(s) and working with NodeManager(s) to perform and monitor application tasks. Each application running on the cluster has its own, dedicated Application Master instance.

Spark Master is created simultaneously with Driver on the same node (in case of cluster mode) when a user submits the Spark application using spark-submit.

The Driver informs the Application Master of the executor’s needs for the application, and the Application Master negotiates the resources with the Resource Manager to host these executors.

In offline mode, the Spark Master acts as Cluster Manager.

Spark Context

Spark Context is the main entry point into Spark functionality, and therefore the heart of any Spark application. It allows Spark Driver to access the cluster through its Cluster Resource Manager and can be used to create RDDs, accumulators and broadcast variables on the cluster. Spark Context also tracks executors in real-time by sending regular heartbeat messages.

Spark Context is created by Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the lifetime of the Spark application.

Spark Context stops working after the Spark application is finished. For each JVM only one Spark Context can be active. You must stop()activate Spark Context before creating a new one.

Cluster Resource Manager

Cluster Manager in a distributed Spark application is a process that controls, governs, and reserves computing resources in the form of containers on the cluster. These containers are reserved by request of Application Master and are allocated to Application Master when they are released or available.

Once the containers are allocated by Cluster Manager, the Application Master transfers the container resources back to the Spark Driver, and the Spark Driver is responsible for performing the various steps and tasks of the Spark application.

SparkContext can connect to different types of Cluster Managers. Now the most popular types are YARN, Mesos, Kubernetes or even Nomad. There is also Spark’s own standalone cluster manager.

Fun fact is that Mesos was also developed by the creator of Spark.

Executors

Executors are the processes at the worker’s nodes, whose job is to complete the assigned tasks. These tasks are executed on the worker nodes and then return the result to the Spark Driver.

Executors are started once at the beginning of Spark Application and then work during all life of the application, this phenomenon is known as “Static Allocation of Executors”. However, users can also choose to dynamically allocate executors where they can add or remove executors to Spark dynamically to match the overall workload (but this can affect other applications running on the cluster). Even if one Spark executor crashes, the Spark application can continue to work.

Performers provide storage either in-memory for RDD partitions that are cached (locally) in Spark applications (via BlockManager) or on disk while using localCheckpoint.

Other executor properties: - stores data in a cache in a JVM heap or on disk - reads data from external sources - writes data to external sources - performs all data processing

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

appName = "PythonWordCount"
master = "local"

# initialization of spark context
conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

# read data from text file, as a result we get RDD of lines
linesRDD = spark.sparkContext.textFile("/mnt/d/apache/spark-3.4.2-bin-hadoop3/README.md")

# from RDD of lines create RDD of lists of words
wordsRDD = linesRDD.flatMap(lambda line: line.split(" "))

# from RDD of lists of words make RDD of words tuples where
# the first element is a word and the second is counter, at the
# beginning it should be 1
wordCountRDD= wordsRDD.map(lambda word: (word, 1))

# combine elements with the same word value
resultRDD = wordCountRDD.reduceByKey(lambda a, b: a + b)

# write it back to folder
resultRDD.saveAsTextFile("PythonWordCount")
spark.stop()                           


1.          When we send the Spark application in cluster mode, the spark-submit utility communicates with the Cluster Resource Manager to start the Application Master.

2.          The Resource Manager is then held responsible for selecting the necessary container in which to run the Application Master. The Resource Manager then tells a specific Node Manager to launch the Application Master.

3.          The Application Master registers with the Resource Manager. Registration allows the client program to request information from the Resource Manager, that information allows the client program to communicate directly with its own Application Master.

4.          The Spark Driver then runs on the Application Master container (in case of cluster mode).

5.          The driver implicitly converts user code containing transformations and actions into a logical plan called a DAG. All RDDs are created in the driver and do nothing until the action is called. At this stage, the driver also performs optimizations such as pipelining narrow transformations.

6.          It then converts the DAG into a physical execution plan. After conversion to a physical execution plan, the driver creates physical execution units called tasks at each stage.

7.          The Application Master now communicates with the Cluster Manager and negotiates resources. Cluster Manager allocates containers and asks the appropriate NodeManagers to run the executors on all selected containers. When executors run, they register with the Driver. This way, the Driver has a complete view of the artists.

8.          At this point, the Driver will send tasks to executors via Cluster Manager based on the data placement.

9.          The code of the user application is launched inside the container. It provides information (stage of execution, status) to the Application Master.

10.      At this stage, we will start to execute our code. Our first RDD will be created by reading data in parallel from HDFS to different partitions on different nodes based on HDFS InputFormat. Thus, each node will have a subset of data.

11.      After reading the data we have two map transformations which will be executed in parallel on each partition.

12.      Next, we have a reduceByKey transformation, it is not a narrow transformation like map, so it will create an additional stage. It combines records with the same keys, then moves data between nodes (shuffle) and partitions to combine the keys of the same record.

13.      We then perform an action — write back to HDFS which will trigger the entire DAG execution.

14.      During the execution of the user application, the client communicates with the Application Master to obtain the application status.

15.      When the application finishes executing and all of the necessary work is done, the Application Master disconnects itself from the Resource Manager and stops, freeing up its container for other purposes.

5.4.2 GroupBy in Spark

See:

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/07_groupby_join.ipynb

5.4.3 Joins in Spark

See:

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/07_groupby_join.ipynb

5.5 (Optional) Resilient Distributed Datasets

See :

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/08_rdds.ipynb

5.6 Running Spark in the Cloud

5.6.1 Connecting to Google Cloud Storage

See :

https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/09_spark_gcs.ipynb

5.6.2 Creating a Local Spark Cluster

             Stop all kernels icnluding Jupyter Notebook conenctions

             Star Spark Master

      start-master.sh



image

             Open WebUI : http://localhost:8080/

               


               image

             Start Worker

      start-worker.sh spark://Desktop-Gar.:7077

               



               


           

               Change :

      # Instance a session

spark = SparkSession.builder \
  .master("local[*]") \
  .appName('test') \
  .getOrCreate()

               into :

      # Instance a session

spark = SparkSession.builder \
  .master("spark://Desktop-Gar.:7077") \
  .appName('test') \
  .getOrCreate()

      #  stop spark worker and spark master

./spark-3.2.1-bin-hadoop3.2/sbin/stop-worker.sh
./spark-3.2.1-bin-hadoop3.2/sbin/stop-master.sh

               5.6.3 Setting up a Dataproc Cluster

               Step 1: create cluster

               


     

               


        

               


                 


                    



MLOps Zoomcamp 2024 – Module 3

Module 3: Orchestration Source mlops-zoomcamp/03-orchestration at main · DataTalksClub/mlops-zoomcamp (github.com) Homework The goal of this...