Copyright © Cay S. Horstmann 2015
This work is licensed under a Creative Commons Attribution 4.0 International License
Note: The content of this unit is adapted from the slides of the course "Parallel Programming and Data Analysis"
by Heather Miller at EPFL.
Spark’s Programming Model
Spark provides a subset of the reduction operations of Scala. In fact, not all reduction operations are parallelizable.
Take the two operations, fold
and foldLeft
. Which of these two are parallelizable?
def foldLeft[B](z: B)(f: (B, A) => B): B
def fold(z: A)(f: (A, A) => A): A
Spark provides a subset of the reduction operations of Scala. In fact, not all reduction operations are parralizable.
Let's examine the foldLeft
signature.
def foldLeft[B](z: B)(f: (B, A) => B): B
Spark provides a subset of the reduction operations of Scala. In fact, not all reduction operations are parrallelizable.
Let's examine the foldLeft
signature.
def foldLeft[B](z: B)(f: (B, A) => B): B
Being able to change the result type from A
to B
forces us to have to
execute foldLeft
sequentially from left to right.
Concretely, given:
val xs = List(1, 2, 3)
val res = xs.foldLeft("")((str: String, i: Int) => str + i)
What happens if we try to break this collection in two and parallelize?
foldLeft
is not parallelizable.
Operations foldRight, reduceLeft, reduceRight, scanLeft
and scanRight
similarly must process elements sequentially.
fold
enables us to parallelize things, but it restricts us to always
returning the same type.
def fold(z: A)(f: (A, A) => A): A
**It enables us to parallelize using a single function f
by enabling us to
build parallelizable reduce trees.**
**It enables us to parallelize using a single function f
by enabling us to
build parallelizable reduce trees.**
def fold(z: A)(f: (A, A) => A): A
Has someone already used the Properties of Aggregate lets you still do sequential-style folds in chunks which change Spark doesn’t even give you the option to use In Spark, As you will realize from experimenting with Spark
, much of the time when working with large-scale data, our goal is to
project down from larger/more complex data types Example: We might only care about Key-value pairs are known as Pair RDDs in Spark. When an RDD is created with a pair as its element type, Spark automatically Creating a Pair RDD Pair RDDs are most often created from already-existing non-pair RDDs, Creating a Pair RDD Pair RDDs are most often created from already-existing non-pair RDDs, Once created, you can now use transformations specific to key-value pairs such Transformations Action Recall Example: Here the key is Example: Conceptually, Example: Let’s use Example: Let’s use That is, it simply applies a function to only the values in a Pair RDD. Example: we can use each of these operations to compute the average budget Example: we can use each of these operations to compute the average budget Joins are another sort of transformation on Pair RDDs. They’re used to combine There are two kinds of joins: The difference between the two types of joins is exactly the same as in databases Inner joins return a new RDD containing combined pairs whose keys are present in both input RDDs. Example: Let’s pretend the CFF has two datasets. One RDD representing How do we combine only customers that have a subscription and where there is Example: Let’s pretend the CFF has two datasets. One RDD representing How do we combine only customers that have a subscription and where there is Example continued with concrete data: Example continued with concrete data: What happened to customer 104? Book The Learning Spark book is the most complete reference. Free Now that we understand Spark’s programming model, and a majority of Spark’s It’s very easy to write clear code that takes tens of minutes to compute when Let’s start with an example. Given: Assume we have an RDD of the purchases that users of the CFF mobile app have Goal: calculate how many trips, and how much money **Goal: calculate how many trips, and how much money was spent by each **Goal: calculate how many trips, and how much money was spent by each **Goal: calculate how many trips, and how much money was spent by each **Goal: calculate how many trips, and how much money was spent by each Let’s start with an example dataset: What might the cluster look like with this data distributed over it? What might the cluster look like with this data distributed over it? Starting with What might this look like on the cluster? **Goal: calculate how many trips, and how much money was spent by each Note: groupByKey results in one key-value pair per key.
And this single key-value pair cannot span across multiple worker nodes. What might this look like on the cluster? What might this look like on the cluster? What might this look like on the cluster? But, we don’t want to be sending all of our data over the network if it’s not absolutely required.
Too much network communication kills performance.
Perhaps we don’t need to send all pairs over the network. Perhaps we can reduce before we shuffle. This could greatly reduce the amount of data we have to send over the network. We can use Conceptually, Signature: Goal: calculate how many trips, and how much money was spent by each Notice that the function passed to **What function do we pass to Recall that we’re reducing over the values per key. Since our values are an What might this look like on the cluster? What might this look like on the cluster? What might this look like on the cluster? What might this look like on the cluster? What are the benefits of this approach? By reducing the dataset first, the amount of data sent over the network during This can result in non-trival gains in performance! Reduction Operations: Aggregate
aggregate
operation?aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
aggregate
is said to be general because it gets you the best of both
worlds.aggregate
Reduction Operations: Aggregate
aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
the result type. Additionally requiring the combop
function enables building
one of these nice reduce trees that we saw is possible with fold
to combine
these chunks in parallel.Reduction Operations on RDDs
Scala collections:
Spark:
fold
fold
foldLeft/foldRight
-
reduce
reduce
aggregate
aggregate
foldLeft
/foldRight
. Which
means that if you have to change the return type of your reduction operation,
your only choice is to use aggregate
.RDD Reduction Operations: Aggregate
aggregate
is a more desirable reduction operator a majority of the
time.case class WikipediaPage(
title: String,
redirectTitle: String,
timestamp: String,
lastContributorUsername: String,
text: String)
title
and
timestamp
, for example. In this case, it’d save a lot of time/memory
to not have to carry around the full-text of each article in
our accumulator!Pair RDDs (Key-Value Pairs)
adds a number of extra useful additional methods (extension methods) for such
pairs.Pair RDDs (Key-Value Pairs)
for example by using the map
operation on RDDs:val rdd: RDD[WikipediaPage] = ...
val pairRdd = ???
Pair RDDs (Key-Value Pairs)
for example by using the map
operation on RDDs:val rdd: RDD[WikipediaPage] = ...
// Has type: org.apache.spark.rdd.RDD[(String, String)]
val pairRdd = rdd.map(page => (page.title, page.text))
as reduceByKey
, groupByKey
, and join
Some interesting Pair RDDs operations
groupByKey
reduceByKey
join
leftOuterJoin
/rightOuterJoin
countByKey
Pair RDD Transformation:
groupByKey
groupBy
from Scala collections. groupByKey
can be thought of as a
groupBy
on Pair RDDs that is specialized on grouping all values that have
the same key. As a result, it takes no argument.def groupByKey(): RDD[(K, Iterable[V])]
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
organizer
. What does this call do?Pair RDD Transformation:
groupByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
// (Prime Sound,CompactBuffer(42000))
// (Sportorg,CompactBuffer(23000, 12000, 1400))
// ...
Pair RDD Transformation:
reduceByKey
reduceByKey
can be thought of as a combination of groupByKey
and reduce
-ing on all the values per key. It’s more efficient though, than
using each separately. (We’ll see why later.)def reduceByKey(func: (V, V) => V): RDD[(K, V)]
eventsRdd
from the previous example to calculate the
total budget per organizer of all of their organized events.case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = ...
Pair RDD Transformation:
reduceByKey
eventsRdd
from the previous example to calculate the
total budget per organizer of all of their organized events.case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_+_)
reducedRdd.collect().foreach(println)
// (Prime Sound,42000)
// (Sportorg,36400)
// (Innotech,320000)
// (Association Balélec,50000)
Pair RDD Transformation:
mapValues
and Action: countByKey
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
mapValues
can be thought of as a short-hand for:rdd.map { case (x, y): (x, func(y))}
countByKey
(def countByKey(): Map[K, Long]
) simply counts the number
of elements per key in a Pair RDD, returning a normal Scala Map
(remember,
it’s an action!) mapping from keys to counts.Pair RDD Transformation:
mapValues
and Action: countByKey
per event organizer.// Calculate a pair (as a key's value) containing (budget, #events)
val intermediate =
eventsRdd.mapValues(b => (b, 1))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
// intermediate: RDD[(String, (Int, Int))]
val avgBudgets = ???
Pair RDD Transformation:
mapValues
and Action: countByKey
per event organizer.// Calculate a pair (as a key's value) containing (budget, #events)
val intermediate =
eventsRdd.mapValues(b => (b, 1))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
// intermediate: RDD[(String, (Int, Int))]
val avgBudgets = intermediate.mapValues {
case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
// (Prime Sound,42000)
// (Sportorg,12133)
// (Innotech,106666)
// (Association Balélec,50000)
Joins
multiple datasets They are one of the most commonly-used operations on Pair
RDDs!
join
)leftOuterJoin
/rightOuterJoin
)Inner Joins (
join
)def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
customers and their subscriptions (abos
), and another representing
customers and cities they frequently travel to (locations
). (E.g.,
gathered from CFF smartphone app.)
location info?val abos = ... // RDD[(Int, (String, Abonnement))]
val locations = ... // RDD[(Int, String)]
val trackedCustomers = ???
Inner Joins (
join
)
customers and their subscriptions (abos
), and another representing
customers and cities they frequently travel to (locations
). (E.g.,
gathered from CFF smartphone app.)
location info?val abos = ... // RDD[(Int, (String, Abonnement))]
val locations = ... // RDD[(Int, String)]
val trackedCustomers = abos.join(locations)
// trackedCustomers: RDD[(Int, ((String, Abonnement), String))]
Inner Joins (
join
)val as = List((101, ("Ruetli", AG)), (102, ("Brelaz", DemiTarif)),
(103, ("Gress", DemiTarifVisa)), (104, ("Schatten", DemiTarif)))
val abos = sc.parallelize(as)
val ls = List((101, "Bern"), (101, "Thun"), (102, "Lausanne"), (102, "Geneve"),
(102, "Nyon"), (103, "Zurich"), (103, "St-Gallen"), (103, "Chur"))
vals locations = sc.parallelize(ls)
val trackedCustomers = abos.join(locations)
// trackedCustomers: RDD[(Int, ((String, Abonnement), String))]
Inner Joins (
join
)trackedCustomers.collect().foreach(println)
// (101,((Ruetli,AG),Bern))
// (101,((Ruetli,AG),Thun))
// (102,((Brelaz,DemiTarif),Nyon))
// (102,((Brelaz,DemiTarif),Lausanne))
// (102,((Brelaz,DemiTarif),Geneve))
// (103,((Gress,DemiTarifVisa),St-Gallen))
// (103,((Gress,DemiTarifVisa),Chur))
// (103,((Gress,DemiTarifVisa),Zurich))
Resources to learn more operations on Pair RDDs
Optimizing...
key operations, we’ll now see how we can optimize what we do with Spark to
keep it practical.
it could be computed in only tends of seconds!Grouping and Reducing, Example
case class CFFPurchase(customerId: Int, destination: String, price: Double)
made in the past month.val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
was spent by each individual customer over the course of the month.Grouping and Reducing, Example
individual customer over the course of the month.**val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth = ...
Grouping and Reducing, Example
individual customer over the course of the month.**val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
Grouping and Reducing, Example
individual customer over the course of the month.**val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
Grouping and Reducing, Example
individual customer over the course of the month.**val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
// Returns: Array[(Int, (Int, Double))]
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
.map(p => (p._1, (p._2.size, p._2.sum)))
.collect()
Grouping and Reducing, Example – What’s Happening?
val purchases = List(CFFPurchase(100, "Geneva", 22.25),
CFFPurchase(300, "Zurich", 42.10),
CFFPurchase(100, "Fribourg", 12.40),
CFFPurchase(200, "St. Gallen", 8.20),
CFFPurchase(100, "Lucerne", 31.60),
CFFPurchase(300, "Basel", 16.20))
Grouping and Reducing, Example – What’s Happening?
purchasesRdd
:Grouping and Reducing, Example – What’s Happening?
Grouping and Reducing, Example
individual customer over the course of the month.**val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[K, Iterable[V]]
Grouping and Reducing, Example – What’s Happening?
Grouping and Reducing, Example – What’s Happening?
Grouping and Reducing, Example – What’s Happening?
Can we do a better job?
Grouping and Reducing, Example – Optimized
reduceByKey
.reduceByKey
can be thought of as a combination of first doing
groupByKey
and then reduce
-ing on all the values grouped per key. It’s
more efficient though, than using each separately. We’ll see how in the
following example.def reduceByKey(func: (V, V) => V): RDD[(K, V)]
Grouping and Reducing, Example – Optimized
individual customer over the course of the month.val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey(...) // ?
map
has changed.
It’s now p => (p.customerId, (1, p.price))
reduceByKey
in order to get a result that
looks like: (customerId, (numTrips, totalSpent))
returned?**Grouping and Reducing, Example – Optimized
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey(...) // ?
Iterable[(Int, Double)]
, the function that we pass
to reduceByKey
must reduce over two such pairs.Grouping and Reducing, Example – Optimized
val purchasesPerMonth =
purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()
Grouping and Reducing, Example – Optimized
Grouping and Reducing, Example – Optimized
Grouping and Reducing, Example – Optimized
Grouping and Reducing, Example – Optimized
the shuffle is greatly reduced.
Benchmark results on a real cluster:
groupByKey
and reduceByKey
Running Times