Copyright © Cay S. Horstmann 2015
This work is licensed under a Creative Commons Attribution 4.0 International License
Note: The content of this unit is adapted from the slides of the course "Parallel Programming and Data Analysis"
by Heather Miller at EPFL.
Spark’s Programming Model
Spark provides a subset of the reduction operations of Scala.
In fact, not all reduction operations are parallelizable.
Take the two operations, fold
and foldLeft
. Which of these two are parallelizable?
def foldLeft[B](z: B)(f: (B, A) => B): B
def fold(z: A)(f: (A, A) => A): A
Spark provides a subset of the reduction operations of Scala.
In fact, not all reduction operations are parralizable.
Let's examine the foldLeft
signature.
def foldLeft[B](z: B)(f: (B, A) => B): B
Spark provides a subset of the reduction operations of Scala.
In fact, not all reduction operations are parrallelizable.
Let's examine the foldLeft
signature.
def foldLeft[B](z: B)(f: (B, A) => B): B
Being able to change the result type from A
to B
forces us to have to
execute foldLeft
sequentially from left to right.
Concretely, given:
val xs = List(1, 2, 3)
val res = xs.foldLeft("")((str: String, i: Int) => str + i)
What happens if we try to break this collection in two and parallelize?
foldLeft
is not parallelizable.
Operations foldRight, reduceLeft, reduceRight, scanLeft
and scanRight
similarly must process elements sequentially.
fold
enables us to parallelize things, but it restricts us to always
returning the same type.
def fold(z: A)(f: (A, A) => A): A
**It enables us to parallelize using a single function f
by enabling us to
build parallelizable reduce trees.**
**It enables us to parallelize using a single function f
by enabling us to
build parallelizable reduce trees.**
def fold(z: A)(f: (A, A) => A): A
Has someone already used the aggregate
operation?
aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
aggregate
is said to be general because it gets you the best of both
worlds.
Properties of aggregate
aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
Aggregate lets you still do sequential-style folds in chunks which change
the result type. Additionally requiring the combop
function enables building
one of these nice reduce trees that we saw is possible with fold
to combine
these chunks in parallel.
Scala collections: | Spark: |
fold | fold |
foldLeft/foldRight | - |
reduce | reduce |
aggregate | aggregate |
Spark doesn’t even give you the option to use foldLeft
/foldRight
. Which
means that if you have to change the return type of your reduction operation,
your only choice is to use aggregate
.
In Spark, aggregate
is a more desirable reduction operator a majority of the
time.
As you will realize from experimenting with Spark , much of the time when working with large-scale data, our goal is to project down from larger/more complex data types
Example:
case class WikipediaPage(
title: String,
redirectTitle: String,
timestamp: String,
lastContributorUsername: String,
text: String)
We might only care about title
and
timestamp
, for example. In this case, it’d save a lot of time/memory
to not have to carry around the full-text of each article in
our accumulator!
Key-value pairs are known as Pair RDDs in Spark.
When an RDD is created with a pair as its element type, Spark automatically
adds a number of extra useful additional methods (extension methods) for such
pairs.
Creating a Pair RDD
Pair RDDs are most often created from already-existing non-pair RDDs,
for example by using the map
operation on RDDs:
val rdd: RDD[WikipediaPage] = ...
val pairRdd = ???
Creating a Pair RDD
Pair RDDs are most often created from already-existing non-pair RDDs,
for example by using the map
operation on RDDs:
val rdd: RDD[WikipediaPage] = ...
// Has type: org.apache.spark.rdd.RDD[(String, String)]
val pairRdd = rdd.map(page => (page.title, page.text))
Once created, you can now use transformations specific to key-value pairs such
as reduceByKey
, groupByKey
, and join
Transformations
groupByKey
reduceByKey
join
leftOuterJoin
/rightOuterJoin
Action
countByKey
groupByKey
Recall groupBy
from Scala collections. groupByKey
can be thought of as a
groupBy
on Pair RDDs that is specialized on grouping all values that have
the same key. As a result, it takes no argument.
def groupByKey(): RDD[(K, Iterable[V])]
Example:
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
Here the key is organizer
. What does this call do?
groupByKey
Example:
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
// (Prime Sound,CompactBuffer(42000))
// (Sportorg,CompactBuffer(23000, 12000, 1400))
// ...
reduceByKey
Conceptually, reduceByKey
can be thought of as a combination of groupByKey
and reduce
-ing on all the values per key. It’s more efficient though, than
using each separately. (We’ll see why later.)
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
Example: Let’s use eventsRdd
from the previous example to calculate the
total budget per organizer of all of their organized events.
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = ...
reduceByKey
Example: Let’s use eventsRdd
from the previous example to calculate the
total budget per organizer of all of their organized events.
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_+_)
reducedRdd.collect().foreach(println)
// (Prime Sound,42000)
// (Sportorg,36400)
// (Innotech,320000)
// (Association Balélec,50000)
mapValues
and Action: countByKey
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
mapValues
can be thought of as a short-hand for:
rdd.map { case (x, y): (x, func(y))}
That is, it simply applies a function to only the values in a Pair RDD.
countByKey
(def countByKey(): Map[K, Long]
) simply counts the number
of elements per key in a Pair RDD, returning a normal Scala Map
(remember,
it’s an action!) mapping from keys to counts.
mapValues
and Action: countByKey
Example: we can use each of these operations to compute the average budget
per event organizer.
// Calculate a pair (as a key's value) containing (budget, #events)
val intermediate =
eventsRdd.mapValues(b => (b, 1))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
// intermediate: RDD[(String, (Int, Int))]
val avgBudgets = ???
mapValues
and Action: countByKey
Example: we can use each of these operations to compute the average budget
per event organizer.
// Calculate a pair (as a key's value) containing (budget, #events)
val intermediate =
eventsRdd.mapValues(b => (b, 1))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
// intermediate: RDD[(String, (Int, Int))]
val avgBudgets = intermediate.mapValues {
case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
// (Prime Sound,42000)
// (Sportorg,12133)
// (Innotech,106666)
// (Association Balélec,50000)
Joins are another sort of transformation on Pair RDDs. They’re used to combine
multiple datasets They are one of the most commonly-used operations on Pair
RDDs!
There are two kinds of joins:
join
)leftOuterJoin
/rightOuterJoin
)The difference between the two types of joins is exactly the same as in databases
join
)Inner joins return a new RDD containing combined pairs whose keys are present in both input RDDs.
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
Example: Let’s pretend the CFF has two datasets. One RDD representing
customers and their subscriptions (abos
), and another representing
customers and cities they frequently travel to (locations
). (E.g.,
gathered from CFF smartphone app.)
How do we combine only customers that have a subscription and where there is
location info?
val abos = ... // RDD[(Int, (String, Abonnement))]
val locations = ... // RDD[(Int, String)]
val trackedCustomers = ???
join
)Example: Let’s pretend the CFF has two datasets. One RDD representing
customers and their subscriptions (abos
), and another representing
customers and cities they frequently travel to (locations
). (E.g.,
gathered from CFF smartphone app.)
How do we combine only customers that have a subscription and where there is
location info?
val abos = ... // RDD[(Int, (String, Abonnement))]
val locations = ... // RDD[(Int, String)]
val trackedCustomers = abos.join(locations)
// trackedCustomers: RDD[(Int, ((String, Abonnement), String))]
join
)Example continued with concrete data:
val as = List((101, ("Ruetli", AG)), (102, ("Brelaz", DemiTarif)),
(103, ("Gress", DemiTarifVisa)), (104, ("Schatten", DemiTarif)))
val abos = sc.parallelize(as)
val ls = List((101, "Bern"), (101, "Thun"), (102, "Lausanne"), (102, "Geneve"),
(102, "Nyon"), (103, "Zurich"), (103, "St-Gallen"), (103, "Chur"))
vals locations = sc.parallelize(ls)
val trackedCustomers = abos.join(locations)
// trackedCustomers: RDD[(Int, ((String, Abonnement), String))]
join
)Example continued with concrete data:
trackedCustomers.collect().foreach(println)
// (101,((Ruetli,AG),Bern))
// (101,((Ruetli,AG),Thun))
// (102,((Brelaz,DemiTarif),Nyon))
// (102,((Brelaz,DemiTarif),Lausanne))
// (102,((Brelaz,DemiTarif),Geneve))
// (103,((Gress,DemiTarifVisa),St-Gallen))
// (103,((Gress,DemiTarifVisa),Chur))
// (103,((Gress,DemiTarifVisa),Zurich))
What happened to customer 104?
Book
The Learning Spark book is the most complete reference.
Free
Now that we understand Spark’s programming model, and a majority of Spark’s
key operations, we’ll now see how we can optimize what we do with Spark to
keep it practical.
It’s very easy to write clear code that takes tens of minutes to compute when
it could be computed in only tends of seconds!
Let’s start with an example. Given:
case class CFFPurchase(customerId: Int, destination: String, price: Double)
Assume we have an RDD of the purchases that users of the CFF mobile app have
made in the past month.
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
Goal: calculate how many trips, and how much money
was spent by each individual customer over the course of the month.
**Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.**
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth = ...
**Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.**
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
**Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.**
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
**Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.**
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
// Returns: Array[(Int, (Int, Double))]
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
.map(p => (p._1, (p._2.size, p._2.sum)))
.collect()
Let’s start with an example dataset:
val purchases = List(CFFPurchase(100, "Geneva", 22.25),
CFFPurchase(300, "Zurich", 42.10),
CFFPurchase(100, "Fribourg", 12.40),
CFFPurchase(200, "St. Gallen", 8.20),
CFFPurchase(100, "Lucerne", 31.60),
CFFPurchase(300, "Basel", 16.20))
What might the cluster look like with this data distributed over it?
What might the cluster look like with this data distributed over it?
Starting with purchasesRdd
:
What might this look like on the cluster?
**Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.**
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
Note: groupByKey results in one key-value pair per key.
And this single key-value pair cannot span across multiple worker nodes.
What might this look like on the cluster?
What might this look like on the cluster?
What might this look like on the cluster?
But, we don’t want to be sending all of our data over the network if it’s not absolutely required.
Too much network communication kills performance.
Perhaps we don’t need to send all pairs over the network.
Perhaps we can reduce before we shuffle. This could greatly reduce the amount of data we have to send over the network.
We can use reduceByKey
.
Conceptually, reduceByKey
can be thought of as a combination of first doing
groupByKey
and then reduce
-ing on all the values grouped per key. It’s
more efficient though, than using each separately. We’ll see how in the
following example.
Signature:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
Goal: calculate how many trips, and how much money was spent by each
individual customer over the course of the month.
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey(...) // ?
Notice that the function passed to map
has changed.
It’s now p => (p.customerId, (1, p.price))
**What function do we pass to reduceByKey
in order to get a result that
looks like: (customerId, (numTrips, totalSpent))
returned?**
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey(...) // ?
Recall that we’re reducing over the values per key.
Since our values are an Iterable[(Int, Double)]
, the function that we pass
to reduceByKey
must reduce over two such pairs.
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()
What might this look like on the cluster?
What might this look like on the cluster?
What might this look like on the cluster?
What might this look like on the cluster?
What are the benefits of this approach?
By reducing the dataset first, the amount of data sent over the network during
the shuffle is greatly reduced.
This can result in non-trival gains in performance!
groupByKey
and reduceByKey
Running TimesSpace, Right Arrow or swipe left to move to next slide, click help below for more details