Module 5 - Batch Processing (DTC DE Zoomcamp Week 5)
5.1 Introduction
###
Introduction to Batch Processing
Batch jobs are
routines that are run in regular intervals.
The most common
types of batch jobs are either : - daily - two times a day - hourly - every 5
mnutes - etc..
### Introduction
to Apache Spark
•
What is Spark?
Apache Spark is an open-source, distributed
processing system used for big data workloads. It utilizes in-memory caching,
and optimized query execution for fast analytic queries against data of any
size. It provides development APIs in Java, Scala, Python and R, and supports
code reuse across multiple workloads—batch processing, interactive queries,
real-time analytics, machine learning, and graph processing.
•
Spark Architecture
Spark applications run as independent sets of
processes on a cluster as described in the below diagram:
These set of processes are coordinated by the SparkContext object in your main program (called the driver program). SparkContext connects to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications.
Once connected, Spark acquires executors on
nodes in the cluster, which are processes that run computations and store data
for your application.
Next, it sends your application code (defined by
JAR or Python files passed to SparkContext) to the executors. Finally,
SparkContext sends tasks to the executors to run.
•
Core Components
The following diagram gives the clear picture of
the different components of Spark:
•
How does Apache Spark work?
Hadoop MapReduce is a programming model for
processing big data sets with a parallel, distributed algorithm. Developers can
write massively parallelized operators, without having to worry about work
distribution, and fault tolerance. However, a challenge to MapReduce is the
sequential multi-step process it takes to run a job. With each step, MapReduce
reads data from the cluster, performs operations, and writes the results back
to HDFS. Because each step requires a disk read, and write, MapReduce jobs are
slower due to the latency of disk I/O.
Spark was created to address the limitations to
MapReduce, by doing processing in-memory, reducing the number of steps in a
job, and by reusing data across multiple parallel operations. With Spark, only
one-step is needed where data is read into memory, operations performed, and
the results written back—resulting in a much faster execution. Spark also
reuses data by using an in-memory cache to greatly speed up machine learning
algorithms that repeatedly call a function on the same dataset. Data re-use is
accomplished through the creation of DataFrames, an abstraction over Resilient
Distributed Dataset (RDD), which is a collection of objects that is cached in
memory, and reused in multiple Spark operations. This dramatically lowers the
latency making Spark multiple times faster than MapReduce, especially when
doing machine learning, and interactive analytics.
•
Key differences: Apache Spark vs. Apache
Hadoop
Outside of the differences in the design of
Spark and Hadoop MapReduce, many organizations have found these big data
frameworks to be complimentary, using them together to solve a broader business
challenge.
Hadoop is an open source framework that has the
Hadoop Distributed File System (HDFS) as storage, YARN as a way of managing
computing resources used by different applications, and an implementation of
the MapReduce programming model as an execution engine. In a typical Hadoop
implementation, different execution engines are also deployed such as Spark,
Tez, and Presto.
Spark is an open source framework focused on
interactive query, machine learning, and real-time workloads. It does not have
its own storage system, but runs analytics on other storage systems like HDFS,
or other popular stores like Google Cloud Storage, Google BigQuery, Amazon
Redshift, Amazon S3, and others. Spark on Hadoop leverages YARN to share a
common cluster and dataset as other Hadoop engines, ensuring consistent levels
of service, and response.
•
What are the benefits of Apache Spark?
There are many benefits of Apache Spark to make
it one of the most active projects in the Hadoop ecosystem. These include:
–
Fast
Through in-memory caching, and optimized query
execution, Spark can run fast analytic queries against data of any size.
–
Developer friendly
Apache Spark natively supports Java, Scala, R,
and Python, giving you a variety of languages for building your applications.
These APIs make it easy for your developers, because they hide the complexity
of distributed processing behind simple, high-level operators that dramatically
lowers the amount of code required.
–
Multiple workloads
Apache Spark comes with the ability to run
multiple workloads, including interactive queries, real-time analytics, machine
learning, and graph processing. One application can combine multiple workloads
seamlessly.
•
How deploying Apache Spark in the cloud works?
Spark is an ideal workload in the cloud, because
the cloud provides performance, scalability, reliability, availability, and
massive economies of scale. ESG research found 43% of respondents considering
cloud as their primary deployment for Spark. The top reasons customers
perceived the cloud as an advantage for Spark are faster time to deployment,
better availability, more frequent feature/functionality updates, more
elasticity, more geographic coverage, and costs linked to actual utilization.
5.2 Installations
Check Linux
version:
lsb_release -a
Java
•
Download the following Java 11.0.22 version from
link https://www.oracle.com/id/java/technologies/downloads/#java11/
jdk-11.0.22_linux-x64_bin.tar.gz
•
Unzip (untar) file
tar –xvzf
jdk-11.0.22_linux-x64_bin.tar.gz
•
Check Java version
java -version
• Add JAVA_HOME & edit PATH to ~/.bashrc file
•
source ~/.bashrc
source ~/.bashrc
Apache Spark
•
Download Apache Spark
We suggest the following location for your
download:
https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
•
Unzip (untar) file
tar -xvzf spark-3.4.2-bin-hadoop3.tgz
•
Add SPARK_HOME & edit PATH to ~/.bashrc
• Check spark, pyspark versions
Scala
spark-shell
Python
pyspark
SQL
spark-sql
R
For sparkR, we have install R language before
using sparkR.
sudo apt install r-base
Run sparkR
sparkR
GCP Cloud
Create a project
IAM
& Admin > New Project
Region
: asia-southeast2
Zone
: asia-southeast2-a
Assign
Roles like this:
•
Try copy file into bucket
gsutil cp green_tripdata_2020-01.csv.gz gs://<bucket
name>
Create Dataproc Cluster
Read
also chapter 5.6.3
Using
console:
Or
using gcloud command:
gcloud config set compute/region asia-southeast2
gcloud config set compute/zone asia-southeast2-c
CLUSTER=<cluster name>
PROJECT=<project name>
REGION=<rgeion name>
ZONE=<zone name>
gcloud dataproc clusters create ${CLUSTER}
--project=${PROJECT} --region=${REGION} --zone=${ZONE} --single-node
Create VM Instance
Start
VM Instance, and copy & save External IP. This external IP would be used to
open spark master browser in local machine, i.e. https://:8080
Open port 8080 (for spark
master), 7077 (for spark worker)
Using
console
Select
project > Firewall policies > Create Firewall Policies
Using gcloud command
gcloud config set project <project name>
gcloud compute firewall-rules create allow-8088
--allow=tcp:8080 --description="Allow incoming traffic on TCP port
8080" --direction=INGRESS
gcloud compute firewall-rules create allow-7077
--allow=tcp:7077 --description="Allow incoming traffic on TCP port
7077" --direction=INGRESS
5.3 Spark SQL and DataFrames
5.3.1 First Look at Spark/PySpark
Note: if you’re running Spark
and Jupyter Notebook on a remote machine, you will need to redirect ports 8888
for Jupyter Notebook and 4040 for the Spark UI.
Step 2 : Creating a Spark
session
Import pyspark module
import
pyspark
from pyspark.sql import SparkSession
Instantiate a Spark session, an
object that we use to interact with Spark.
spark
= SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
SparkSession is the class of
the object that we instantiate. builder is the builder method.
master() sets the Spark master
URL to connect to. The local string means that Spark will run on a local
cluster. [*] means that Spark will run with as many CPU cores as possible.
appName() defines the name of
our application/session. This will show in the Spark UI.
getOrCreate() will create the
session or recover the object if it was previously created.
Once we’ve instantiated a
session, we can access the Spark UI by browsing to localhost:4040. The UI will
display all current jobs. Since we’ve just created the instance, there should
be no jobs currently running.
image
Step 2 : Reading CSV File
Similarlly to Pandas, Spark can
read CSV files into dataframes, a tabular data structure. Unlike Pandas, Spark
can handle much bigger datasets but it’s unable to infer the datatypes of each
column.
#
Download csv compresses file
!wget
https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
df =
spark.read \
.option("header", "true") \
.csv('fhvhv_tripdata_2021-01.csv.gz')
read() reads the file.
option() contains options for
the read method. In this case, we’re specifying that the first line of the CSV
file contains the column names.
csv() is for reading CSV files.
Check :
df.show() or df.head() –>
contents of the dataframe
df.schema or df.printSchema()
–> dataframe schema
Step 3 : Check the inferred
schema through df.schema
from
pyspark.sql import types
schema = types.StructType(
[
types.StructField('hvfhs_license_num', types.StringType(), True),
types.StructField('dispatching_base_num', types.StringType(), True),
types.StructField('pickup_datetime', types.TimestampType(), True),
types.StructField('dropoff_datetime', types.TimestampType(), True),
types.StructField('PULocationID', types.IntegerType(), True),
types.StructField('DOLocationID', types.IntegerType(), True),
types.StructField('SR_Flag',
types.IntegerType(), True)
]
)
df = spark.read \
.option("header", "true") \
.option("inferSchema",True) \
.csv('fhvhv_tripdata_2021-01.csv.gz')
df.schema
Step 4 : Save DataFrame as
parquet
As explained by the instructor,
it is not good to have a smaller number of files than CPUs (because only a
subset of CPUs will be used and the remaining will be idle). For such, we first
use the repartition method and then save the data as parquet. Suppose we have 8
cores, then we can repartition our dataset into 24 parts.
df =
df.repartition(24)
df.write.parquet('fhvhv/2021/01/')
!ls
-lh fhvhv/2021/01/
Create a dataframe from the parquet
files.
df =
spark.read.parquet('fhvhv/2021/01/')
Check the schema
df.printSchema()
select() with filter()
df.select('pickup_datetime',
'dropoff_datetime', 'PULocationID', 'DOLocationID') \
.filter(df.hvfhs_license_num == 'HV0003')
Actions vs. Transformations
Action : code that is executed
immediately (such as: show(), take(), head(), write(), etc.)
Transformations : code that is lazy,
i.e., not executed immediately (suchs as: selecting columns, data filtering,
joins and groupby operations)
Spark SQL Functions
Spark has many predefined SQL-like
functions.
def
crazy_stuff(base_num):
num =
int(base_num[1:])
if num % 7
== 0:
return
f's/{num:03x}'
elif num % 3
== 0:
return
f'a/{num:03x}'
return
f'e/{num:03x}'
crazy_stuff_udf = F.udf(crazy_stuff,
returnType=types.StringType())
df \
.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
.withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
.withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
.select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID',
'DOLocationID') \
.show()
5.3.3 (Optional) Preparing Yellow
and Green Taxi Data
See :
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/04_pyspark_yellow.ipynb
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/04_pyspark_green.ipynb
5.3.4
SQL with Spark
Batch jobs can be expressed as SQL queries,
and Spark can run them.
See :
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/06_spark_sql.ipynb
5.4 Spark Internals
5.4.1 Anatomy of a Spark Cluster
Apache Spark is considered as a
powerful complement to Hadoop, big data’s original technology. Spark is a more
accessible, powerful, and capable big data tool for tackling various big data
challenges. It has become mainstream and the most in-demand big data framework
across all major industries. Spark has become part of the Hadoop since 2.0. And
is one of the most useful technologies for Python Big Data Engineers.
Architecture
The
components of the spark application are: - Driver - Application Master - Spark
Context - Cluster Resource Manager(aka Cluster Manager) - Executors
Spark
uses a master/slave architecture with a central coordinator called Driver and a
set of executable workflows called Executors that are located at various nodes
in the cluster.
Driver
The Driver(aka driver program) is responsible for
converting a user application to smaller execution units called tasks and then
schedules them to run with a cluster manager on executors. The driver is also
responsible for executing the Spark application and returning the
status/results to the user.
Spark Driver contains various components –
DAGScheduler, TaskScheduler, BackendScheduler and BlockManager. They are
responsible for the translation of user code into actual Spark jobs executed on
the cluster.
Other Driver properties: - can run in an
independent process or on one of the work nodes for High Availability (HA); -
stores metadata about all Resilient Distributed Databases and their partitions;
- is created after the user sends the Spark application to the cluster manager
(YARN in our case); - runs in its own JVM; - optimizes logical DAG
transformations and, if possible, combines them in stages and determines the
best location for execution of this DAG; - creates Spark WebUI with detailed
information about the application;
Application Master
Application Master is a
framework-specific entity charged with negotiating resources with
ResourceManager(s) and working with NodeManager(s) to perform and monitor
application tasks. Each application running on the cluster has its own,
dedicated Application Master instance.
Spark Master is created simultaneously
with Driver on the same node (in case of cluster mode) when a user submits the
Spark application using spark-submit.
The Driver informs the Application
Master of the executor’s needs for the application, and the Application Master
negotiates the resources with the Resource Manager to host these executors.
In offline mode, the Spark Master acts
as Cluster Manager.
Spark Context
Spark Context is the main entry point into
Spark functionality, and therefore the heart of any Spark application. It
allows Spark Driver to access the cluster through its Cluster Resource Manager
and can be used to create RDDs, accumulators and broadcast variables on the
cluster. Spark Context also tracks executors in real-time by sending regular
heartbeat messages.
Spark Context is created by Spark Driver for
each Spark application when it is first submitted by the user. It exists
throughout the lifetime of the Spark application.
Spark Context stops working after the Spark
application is finished. For each JVM only one Spark Context can be active. You
must stop()activate Spark Context before creating a new one.
Cluster
Resource Manager
Cluster Manager in a distributed
Spark application is a process that controls, governs, and reserves computing
resources in the form of containers on the cluster. These containers are
reserved by request of Application Master and are allocated to Application
Master when they are released or available.
Once the containers are allocated
by Cluster Manager, the Application Master transfers the container resources
back to the Spark Driver, and the Spark Driver is responsible for performing
the various steps and tasks of the Spark application.
SparkContext can connect to
different types of Cluster Managers. Now the most popular types are YARN,
Mesos, Kubernetes or even Nomad. There is also Spark’s own standalone cluster
manager.
Fun fact is that Mesos was also
developed by the creator of Spark.
Executors
Executors are the processes at the worker’s
nodes, whose job is to complete the assigned tasks. These tasks are executed on
the worker nodes and then return the result to the Spark Driver.
Executors are started once at the beginning of
Spark Application and then work during all life of the application, this
phenomenon is known as “Static Allocation of Executors”. However, users can
also choose to dynamically allocate executors where they can add or remove
executors to Spark dynamically to match the overall workload (but this can
affect other applications running on the cluster). Even if one Spark executor
crashes, the Spark application can continue to work.
Performers provide storage either in-memory for
RDD partitions that are cached (locally) in Spark applications (via
BlockManager) or on disk while using localCheckpoint.
Other executor properties: - stores data in a
cache in a JVM heap or on disk - reads data from external sources - writes data
to external sources - performs all data processing
from pyspark.sql import
SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
appName = "PythonWordCount"
master = "local"
# initialization of spark context
conf =
SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
.config(conf=sc.getConf()) \
.getOrCreate()
# read data from text file, as a result we get RDD of
lines
linesRDD =
spark.sparkContext.textFile("/mnt/d/apache/spark-3.4.2-bin-hadoop3/README.md")
# from RDD of lines create RDD of lists of words
wordsRDD = linesRDD.flatMap(lambda line:
line.split(" "))
# from RDD of lists of words make RDD of words tuples
where
# the first element is a word and the second is
counter, at the
# beginning it should be 1
wordCountRDD= wordsRDD.map(lambda word: (word, 1))
# combine elements with the same word value
resultRDD = wordCountRDD.reduceByKey(lambda a, b: a +
b)
# write it back to folder
resultRDD.saveAsTextFile("PythonWordCount")
spark.stop()
1.
When we send the Spark application in cluster
mode, the spark-submit utility communicates with the Cluster Resource Manager
to start the Application Master.
2.
The Resource Manager is then held responsible
for selecting the necessary container in which to run the Application Master.
The Resource Manager then tells a specific Node Manager to launch the
Application Master.
3.
The Application Master registers with the
Resource Manager. Registration allows the client program to request information
from the Resource Manager, that information allows the client program to
communicate directly with its own Application Master.
4.
The Spark Driver then runs on the Application
Master container (in case of cluster mode).
5.
The driver implicitly converts user code
containing transformations and actions into a logical plan called a DAG. All
RDDs are created in the driver and do nothing until the action is called. At
this stage, the driver also performs optimizations such as pipelining narrow
transformations.
6.
It then converts the DAG into a physical
execution plan. After conversion to a physical execution plan, the driver
creates physical execution units called tasks at each stage.
7.
The Application Master now communicates with the
Cluster Manager and negotiates resources. Cluster Manager allocates containers
and asks the appropriate NodeManagers to run the executors on all selected
containers. When executors run, they register with the Driver. This way, the
Driver has a complete view of the artists.
8.
At this point, the Driver will send tasks to
executors via Cluster Manager based on the data placement.
9.
The code of the user application is launched
inside the container. It provides information (stage of execution, status) to
the Application Master.
10.
At this stage, we will start to execute our
code. Our first RDD will be created by reading data in parallel from HDFS to
different partitions on different nodes based on HDFS InputFormat. Thus, each
node will have a subset of data.
11.
After reading the data we have two map
transformations which will be executed in parallel on each partition.
12.
Next, we have a reduceByKey transformation, it
is not a narrow transformation like map, so it will create an additional stage.
It combines records with the same keys, then moves data between nodes (shuffle)
and partitions to combine the keys of the same record.
13.
We then perform an action — write back to HDFS
which will trigger the entire DAG execution.
14.
During the execution of the user application,
the client communicates with the Application Master to obtain the application
status.
15.
When the application finishes executing and all
of the necessary work is done, the Application Master disconnects itself from
the Resource Manager and stops, freeing up its container for other purposes.
5.4.2
GroupBy in Spark
See:
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/07_groupby_join.ipynb
5.4.3
Joins in Spark
See:
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/07_groupby_join.ipynb
5.5 (Optional) Resilient
Distributed Datasets
See
:
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/08_rdds.ipynb
5.6 Running Spark in the Cloud
5.6.1 Connecting to Google Cloud
Storage
See :
https://github.com/garjita63/de-zoomcamp-2024/blob/main/learning/module5/09_spark_gcs.ipynb
5.6.2 Creating a Local Spark Cluster
•
Stop all kernels icnluding Jupyter Notebook
conenctions
•
Star Spark Master
start-master.sh
image
•
Open WebUI : http://localhost:8080/
image
•
Start Worker
start-worker.sh spark://Desktop-Gar.:7077
Change :
# Instance a session
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
into :
# Instance a session
spark = SparkSession.builder \
.master("spark://Desktop-Gar.:7077") \
.appName('test') \
.getOrCreate()
# stop spark worker
and spark master
./spark-3.2.1-bin-hadoop3.2/sbin/stop-worker.sh
./spark-3.2.1-bin-hadoop3.2/sbin/stop-master.sh
5.6.3 Setting up a Dataproc Cluster
Step 1: create cluster
No comments:
Post a Comment