Course:CICS525/Notes/ProgrammingModels

From UBC Wiki

Outline

  • Why do we need new programming models? What are the design options and challenges?
  • MapReduce (Google)
  • Dryad (Microsoft) + DryadLINQ


Parallel programming can be hard

  • complexity: threads, shared data, mutexes, RPC, failure
  • dependencies: when can F() and G() run in parallel?
    • vs when must G() follow F()?
  • does it require experts?
    • can the details be hidden?
  • One application area: big data-parallel computation
    • huge data set
    • natural parallelism: can work on different parts of data independently
    • image processing
    • grep
    • indexing
    • and many more

Cluster computing

  • many machines in one place
  • partition data among machines
  • each machine computes on its part of the data
  • machines communicate when required
  • cheaper than big shared-memory multiprocessors
    • but less communication capacity

Challenges of cluster computing

  • Parallelize application
    • Where to place input and output data?
    • What parts of the computation to place on what machines?
    • How to avoid network bottleneck?
  • How to write the application?
    • Does the programmer have to indicate what parts can be parallel?
      • Or can the system figure it out?
    • Can the system apply optimizations?
    • Balance computations of an application across computers
      • Statically (e.g., doable when designer knows how much work there is)
      • Dynamically
    • Handle failures of nodes during computation
      • With a 1000 machines, is a failure likely in a 1 hour period?
      • Often easier than with say banking applications (or YFS lock server)
      • Many computations have no "harmful" side-effects and clear commit points
  • Scheduling several applications who want to share infrastructure
    • Time-sharing
    • Strict partitioning

MapReduce

Design

  • Partition large data set into M split
  • a split is equal to a 64 Mbyte part of the input typically
  • run map on each partition, which produces R local partitions
    • using a partition function R
  • run reduce on each intermediate partition, which produces R output files

Programmer interface

map(key, value) -> set of key-value pairs
reduce(key, set of values)
reduce called once per key, with all values for that key
  • Example: word count
    • split input files into big pieces
map(split)
    • takes one split as input
    • parses into words and returns list of word/1 pairs
reduce(word, set)
    • set of "1"s from maps for this word
    • adds them
    • outputs the number of occurrences of each word

Implementation overview for programmers

  • (Figure 1 from MapReduce article - see readings)
  • input partitioned over GFS
  • M map worker machines
    • read from one split of input
    • write to local intermediate storage
    • each intermediate store partitioned by reduce key
  • R reduce worker machines
    • pull intermediate data from *all* map workers via RPC
    • local sort by key
    • call reduce function on each key + value set
    • output to GFS
    • output partitioned over GFS
  • where is parallel speedup coming from?
  • will it scale?
    • be N times as fast w/ N machines, indefinitely?
    • what might limit scaling?
  • look at WC implementation from MapReduce article appendix
  • Map
    • input is a string with many words
    • split into words at space boundaries
    • one Emit(w, "1") per word
  • Reduce
    • sums up "1"'s of the key's values
    • just one Emit at end
main()
  set up input files (must be in GFS)
  specify names of output files
  R is 100
  combiner:
    Reduce is associative
    can run it on temporary output of each Map worker
    will reduce communication
    set # of machines
    • then run!
  • Map/Reduce are sequential code
    • no locks, threads, RPC, &c

Implementation details

  • MapReduce system handles all management
  • create workers &c
  • master process
    • keeps track of which parts of work are done
    • keeps track of workers
    • assigns workers map and redece jobs
    • handles failures of workers
  • map workers communicate locations of R partitions to master
  • reducer worker asks master for locations
    • sorts input keys
    • run reduce operation
  • when all workers are finished, master returns result to caller
  • Fault tolerance -- what if a worker fails?
    • assign map and reduce jobs to another worker
    • may have to re-run completed map jobs since worker crash also lost intermediate map output
  • Load balancing
    • what's the right number of map input splits (M)?
    • == the number of worker machines?
    • some will take longer than others
    • want to limit damage from any one failure
    • want to split failure-recovery load among multiple workers
    • challenge: stragglers

Why MapReduce's particular setup?

    • Can we argue that Map then Reduce perhaps covers many situations?
    • High level: only two things going on in cluster computing
      • partitioned computing (Map)
      • reshuffuling data (Reduce)

Retrospective

  • Google claims MapReduce made huge parallel computing accessible to novices
  • Maybe you have to shoe-horn into MR model
    • but you win in scalability what you might lose in performance

Dryad

  • Claim: MapReduce is not flexible enough
    • there are computations that should work well on cluster
    • but are awkward to express in MapReduce
  • Example
  1. score web pages by the words they contain
  2. score web pages by # of incoming links
  3. combine the two scores
  4. sort by combined score
  • Example is awkward in MapReduce
    • multiple MapReduce runs -- maybe one for each step
    • programmer must glue together
    • step 3 has two inputs -- MapReduce has only one

Dryad's principal idea: programmer specifices arbitrary graph

  • Vertices are computations
  • Edges are communication channels
  • Each vertex can have several input and output channels
  • Most interesting when programmed with DryadLINQ

DryadLINQ

  • Goals
    • allow high-level programming of Dryad graphs
    • good integration with programming language
  • Look at word frequency handout (from the DryadLINQ tech report on the Microsoft Research website)
    • count occurences of each word
    • return top 3 (so requires final sort by frequency)
public static IQueryable<Pair> Histogram(input, k){
  var words = input.SelectMany(x => x.Split(' ')); 
  var groups = words.GroupBy(x => x); 
  var counts = groups.Select(x => new Pair(x.Key, x.Count())); 
  var ordered = counts.OrderByDescending(x => x.Count); 
  var top = ordered.Take(k); 
  return top;
}
  • What does each statement do?
    • input: "A line of words of wisdom"
    • SelectMany: ["A", "line", "of¡, "words", "of", "wisdom"]
    • GroupBy: [["A"], ["line"], ["of", "of"], ["words"], ["wisdom"]]
    • Select: [ {"A", 1}, {"line", 1}, {"of", 2}, {"words", 1}, {"wisdom", 1}]
    • OrderByDescending: [{"of", 2}, {"A", 1}, {"line", 1}, {"words", 1}, {"wisdom", 1}]
    • Take(3): [{"of", 2}, {"A", 1}, {"line", 1}]
  • How is this executed? (a dryad graph, see figure 7 of the technical report)
    • original input was stored in four partitions
    • computation proceeds in three stages
    • stage 1: four machines, each reading one partition
      • split into words
      • hash(w) and send over network to one of GroupBys
    • stage 2: each GroupBy works on a (new) partition by words
      • e.g., a-g, h-m, n-t, s-z
      • counts # of occurences of each word it's responsible for
      • sorts by # occurences
      • this is only a partial sort, to reduce cost of final sort
    • stage 3
      • look at top few words from each partitioned computation
      • pick top 3
  • why is this cool?
    • program was pretty high level -- much shorter than in MapReduce
    • system figure out a good graph, and managed execution
    • automatic optimization: move most of sort early, so parallel
  • What optimizations can DryadLINQ do?
    • runs initial stages on/near input file source
    • knows if data is already partitioned in the required way
      • e.g. if input to WC was already sorted by word
    • this works when input was itself computed by DryadLINQ
    • samples to produce balanced range-partitions for e.g. sort (fig 5)
    • moves associative aggregation up above re-distributions (fig 6)
      • e.g. summing for word count, move up to Map side
    • dynamic aggregation tree to keep net traffic local (fig 6)

DryadLINQ performance

  • Table 1 / Figure 7 (tech report)
  • N machines, sorting 4N gigabytes of data
  • does it scale well?
  • what do we expect?
  • could we hope for a flat line?
  • 2x data, 2x machines, 1x time?
  • close to limits of hardware?
  • how long does it take to send data over net?
    • 40 seconds to send 4 GB
    • if from/to same machine, maybe 80 seconds to snd+rcv
    • sorting 4 GB seems to take 119 (N=1, no network I/O)