Spark Architecture Overview

Here is a diagram representing components of Apache Spark:

Architecture

Within this chapter, we only look at components with blue boxes.

Cluster Managers

Spark is designed to smoothly scale from just one compute node to potentially thousands. It achieves this scalability while prioritizing flexibility by supporting various cluster managers, including Hadoop YARN, Apache Mesos, and its own Standalone Scheduler. For our setup, we've opted for the simplicity of the Standalone Scheduler to run a mini-cluster in the last section.

Spark Core - Resillient Distributed Datasets (RDDs)

What is RDDs?

RDD is simply an immutable distributed collection of objects. Within Spark, all tasks involve either generating new RDDs, modifying existing ones, or executing operations on RDDs to produce an outcome. Behind the scenes, Spark seamlessly disperses the data stored within RDDs into multiple partitions across your cluster and parallelizes the tasks you execute on them. Moreover, RDD can accommodate Python, Java, or Scala objects of any type, including user-defined classes. RDDs enable users to explicitly persist intermediate outcomes in memory, control their partitioning to optimize data distribution, and manipulate them using a diverse set of operators.

Initialize RDD

Parallelize Collections

We can create a RDD from an existing iterable or collection

data: list = ["H", "e", "l", "l", "o", "!"]

# by default, set the number of partitions automatically based on your cluster
chars: RDD = spark_context.parallelize(data)

# you can also set it manually by passing it as a second parameter to parallelize
chars: RDD = spark_context.parallelize(data, 10)

Read from External Datasets

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3,...

# takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI)
# reads it as a collection of lines
lines: RDD = spark_context.textFile("/path/to/hello.txt")

Once created, RDD can be performed on by a diverse list of functional-style operations.

RDD Operations

Transformations and Actions

RDDs support two types of operations transformations and actions:

  • Transformations are operations on RDDs that return a new RDD by applying the same operation to many data items. Transformations are lazily evaluated (including loading data), it will only be computed when an action is called. Spark internally records metadata to indicate that a transformation has been requested.
  • Actions are operations that run computation on a RDD then return a value (non-RDD) or export data to a storage system.

Why laziness?

Many popular cluster computing frameworks (such as MapReduce and Dryad) offer high-level operators for parallel computations, simplifying work distribution and fault tolerance concerns. However, they lack abstractions for efficiently optimizing distributed memory when dealing with applications that frequently reuse intermediate results (iterative methods, interactive data mining, etc).

Within the majority of these frameworks, the exclusive method for reusing data between computations (e.g., between two MapReduce jobs) involves storing it in an external stable storage system, such as a distributed file system, resulting in significant overhead due to data replication, disk I/O operations, and serialization. Therefore, we will often have to spend time considering how to group together operations to minimize the number of MapReduce passes through our data.

Spark employs lazy evaluation to:

  • Minimize the number of passes it makes over the data by consolidating operations.
  • Allow more rich and flexible combinations of Transformations.

APIs Reference

A complete list of PySpark APIs is available at Apache Spark Documentation. If you're ever uncertain about whether a specific function is a transformation or an action, simply check its return type: transformations return RDDs, whereas actions return some other data type.

Examples

Let's look at a few simple examples of using PySpark

You should follow these examples on a JupyterLab connecting to a Spark Cluster

Monte Carlo Pi Estimation
import pyspark
from pyspark.sql import SparkSession
from random import random
from operator import add

# Set this to the name of the current master node of the Spark Cluster
# Eg: master_node = "m3i014"
master_node = ...

###### Initilize Spark Session ######
spark = SparkSession \
    .builder \
    .master(f"spark://{master_node}.massive.org.au:7077") \
    .appName("PythonJupyter") \
    .config("spark.workers.show", "True") \
    .getOrCreate()

###### Monte Carlo Pi Estimation ######

partitions = 4
n = 1000 * partitions
data = range(1, n + 1)

def point(_: int) -> float:
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark
    .sparkContext \
    # distribute data into partitions across cluster, we have the starting RDD here
    .parallelize(data, partitions) \
    # Transformation: map each data through a function, resulting in a new RDD
    .map(point) \
    # Action: Reduce all the data into a value
    .reduce(add)

print("Pi is roughly %f" % (4.0 * count / n))
Sort a Text File
# Randomly generate number and store in a file called "rand_ints.csv"
# Note: This is bash command
for i in {1..1000}; do echo $[($RANDOM)]; done > ./rand_ints.csv
from typing import Tuple
from pyspark.rdd import RDD
from pyspark.sql import SparkSession

# Set this to the Path of the file rand_ints.csv
# Note: Don't use Relative Path
file_path = ...
lines = spark \
    .read \
    .text(file_path) \
    .rdd \
    .map(lambda r: r[0])

sortedCount: RDD[Tuple[int, int]] = lines \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (int(x), 1)) \
    .sortByKey()

Shared Variables

Typically, when a function is passed to a Spark operation like map or reduce and executed on a remote cluster node, it operates on separate copies of all the variables used within the function. These variables are duplicated across each machine, and any updates made to these variables on the remote machine are not communicated back to the driver program. Fortunately, Spark does provide 2 types of shared variables: Broadcast variables and Accumulators

Broadcast variables

Broadcast variables allow the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations exactly once. The broadcasted variable will stay in each node until one of these cases:

  • All tasks are finished.
  • A unpersist() method is called. However, this method will only release the resources in the executor node (the variable will be re-broadcasted again when it is used).
  • A destroy() is called. This time, the variable can't be used again.

Note that these methods do not block by default! To block until resources are freed, specify blocking=true when calling them.

broadcastVar = spark_context.broadcast([1, 2, 3])
print(broadcastVar.value) # [1, 2, 3]

Accumulators

Another type of shared variable, accumulators, provides a simple syntax for aggregating values from worker nodes back to the driver program.

Note that it is possible aggregate values from an entire RDD back to the driver program using actions (e.g: reduce()). However, in the case when we want a simpler way to calculate a metric at some point in the program (e.g: count the number of the word "Hello" in a distributed text data), accumulator will be a handly solution.

accum = spark_context.accumulator(0)
spark_context.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
print(accum.value) # 10