Course:CICS525/Notes/ProgrammingModels
< Course:CICS525 | Notes
Outline
- Why do we need new programming models? What are the design options and challenges?
- MapReduce (Google)
- Dryad (Microsoft) + DryadLINQ
Parallel programming can be hard
- complexity: threads, shared data, mutexes, RPC, failure
- dependencies: when can F() and G() run in parallel?
- vs when must G() follow F()?
- does it require experts?
- can the details be hidden?
- One application area: big data-parallel computation
- huge data set
- natural parallelism: can work on different parts of data independently
- image processing
- grep
- indexing
- and many more
Cluster computing
- many machines in one place
- partition data among machines
- each machine computes on its part of the data
- machines communicate when required
- cheaper than big shared-memory multiprocessors
- but less communication capacity
Challenges of cluster computing
- Parallelize application
- Where to place input and output data?
- What parts of the computation to place on what machines?
- How to avoid network bottleneck?
- How to write the application?
- Does the programmer have to indicate what parts can be parallel?
- Or can the system figure it out?
- Can the system apply optimizations?
- Balance computations of an application across computers
- Statically (e.g., doable when designer knows how much work there is)
- Dynamically
- Handle failures of nodes during computation
- With a 1000 machines, is a failure likely in a 1 hour period?
- Often easier than with say banking applications (or YFS lock server)
- Many computations have no "harmful" side-effects and clear commit points
- Does the programmer have to indicate what parts can be parallel?
- Scheduling several applications who want to share infrastructure
- Time-sharing
- Strict partitioning
MapReduce
Design
- Partition large data set into M split
- a split is equal to a 64 Mbyte part of the input typically
- run map on each partition, which produces R local partitions
- using a partition function R
- run reduce on each intermediate partition, which produces R output files
Programmer interface
map(key, value) -> set of key-value pairs reduce(key, set of values) reduce called once per key, with all values for that key
- Example: word count
- split input files into big pieces
map(split)
- takes one split as input
- parses into words and returns list of word/1 pairs
reduce(word, set)
- set of "1"s from maps for this word
- adds them
- outputs the number of occurrences of each word
Implementation overview for programmers
- (Figure 1 from MapReduce article - see readings)
- input partitioned over GFS
- M map worker machines
- read from one split of input
- write to local intermediate storage
- each intermediate store partitioned by reduce key
- R reduce worker machines
- pull intermediate data from *all* map workers via RPC
- local sort by key
- call reduce function on each key + value set
- output to GFS
- output partitioned over GFS
- where is parallel speedup coming from?
- will it scale?
- be N times as fast w/ N machines, indefinitely?
- what might limit scaling?
- look at WC implementation from MapReduce article appendix
- Map
- input is a string with many words
- split into words at space boundaries
- one Emit(w, "1") per word
- Reduce
- sums up "1"'s of the key's values
- just one Emit at end
main() set up input files (must be in GFS) specify names of output files R is 100 combiner: Reduce is associative can run it on temporary output of each Map worker will reduce communication set # of machines
- then run!
- Map/Reduce are sequential code
- no locks, threads, RPC, &c
Implementation details
- MapReduce system handles all management
- create workers &c
- master process
- keeps track of which parts of work are done
- keeps track of workers
- assigns workers map and redece jobs
- handles failures of workers
- map workers communicate locations of R partitions to master
- reducer worker asks master for locations
- sorts input keys
- run reduce operation
- when all workers are finished, master returns result to caller
- Fault tolerance -- what if a worker fails?
- assign map and reduce jobs to another worker
- may have to re-run completed map jobs since worker crash also lost intermediate map output
- Load balancing
- what's the right number of map input splits (M)?
- == the number of worker machines?
- some will take longer than others
- want to limit damage from any one failure
- want to split failure-recovery load among multiple workers
- challenge: stragglers
Why MapReduce's particular setup?
- Can we argue that Map then Reduce perhaps covers many situations?
- High level: only two things going on in cluster computing
- partitioned computing (Map)
- reshuffuling data (Reduce)
Retrospective
- Google claims MapReduce made huge parallel computing accessible to novices
- Maybe you have to shoe-horn into MR model
- but you win in scalability what you might lose in performance
Dryad
- Claim: MapReduce is not flexible enough
- there are computations that should work well on cluster
- but are awkward to express in MapReduce
- Example
- score web pages by the words they contain
- score web pages by # of incoming links
- combine the two scores
- sort by combined score
- Example is awkward in MapReduce
- multiple MapReduce runs -- maybe one for each step
- programmer must glue together
- step 3 has two inputs -- MapReduce has only one
Dryad's principal idea: programmer specifices arbitrary graph
- Vertices are computations
- Edges are communication channels
- Each vertex can have several input and output channels
- Most interesting when programmed with DryadLINQ
DryadLINQ
- Goals
- allow high-level programming of Dryad graphs
- good integration with programming language
- Look at word frequency handout (from the DryadLINQ tech report on the Microsoft Research website)
- count occurences of each word
- return top 3 (so requires final sort by frequency)
public static IQueryable<Pair> Histogram(input, k){ var words = input.SelectMany(x => x.Split(' ')); var groups = words.GroupBy(x => x); var counts = groups.Select(x => new Pair(x.Key, x.Count())); var ordered = counts.OrderByDescending(x => x.Count); var top = ordered.Take(k); return top; }
- What does each statement do?
- input: "A line of words of wisdom"
- SelectMany: ["A", "line", "of¡, "words", "of", "wisdom"]
- GroupBy: [["A"], ["line"], ["of", "of"], ["words"], ["wisdom"]]
- Select: [ {"A", 1}, {"line", 1}, {"of", 2}, {"words", 1}, {"wisdom", 1}]
- OrderByDescending: [{"of", 2}, {"A", 1}, {"line", 1}, {"words", 1}, {"wisdom", 1}]
- Take(3): [{"of", 2}, {"A", 1}, {"line", 1}]
- How is this executed? (a dryad graph, see figure 7 of the technical report)
- original input was stored in four partitions
- computation proceeds in three stages
- stage 1: four machines, each reading one partition
- split into words
- hash(w) and send over network to one of GroupBys
- stage 2: each GroupBy works on a (new) partition by words
- e.g., a-g, h-m, n-t, s-z
- counts # of occurences of each word it's responsible for
- sorts by # occurences
- this is only a partial sort, to reduce cost of final sort
- stage 3
- look at top few words from each partitioned computation
- pick top 3
- why is this cool?
- program was pretty high level -- much shorter than in MapReduce
- system figure out a good graph, and managed execution
- automatic optimization: move most of sort early, so parallel
- What optimizations can DryadLINQ do?
- runs initial stages on/near input file source
- knows if data is already partitioned in the required way
- e.g. if input to WC was already sorted by word
- this works when input was itself computed by DryadLINQ
- samples to produce balanced range-partitions for e.g. sort (fig 5)
- moves associative aggregation up above re-distributions (fig 6)
- e.g. summing for word count, move up to Map side
- dynamic aggregation tree to keep net traffic local (fig 6)
DryadLINQ performance
- Table 1 / Figure 7 (tech report)
- N machines, sorting 4N gigabytes of data
- does it scale well?
- what do we expect?
- could we hope for a flat line?
- 2x data, 2x machines, 1x time?
- close to limits of hardware?
- how long does it take to send data over net?
- 40 seconds to send 4 GB
- if from/to same machine, maybe 80 seconds to snd+rcv
- sorting 4 GB seems to take 119 (N=1, no network I/O)