Distributed Systems
MapReduce
Map every record in parallel, shuffle by key, reduce each group — a petabyte scan as two pure functions
MapReduce is a programming model for processing huge datasets across a cluster: a map function emits key-value pairs in parallel, the framework shuffles them by key, and a reduce function aggregates each group — turning a petabyte scan into an embarrassingly parallel job.
- IntroducedGoogle, 2004 (Dean & Ghemawat)
- You writemap() + reduce()
- Framework ownsshuffle, sort, retries
- Map phaseO(n) over input, fully parallel
- Shuffle transfersup to M × R
- Fault modelre-run any task on failure
Interactive visualization
Press play, or step through manually. The visualization is yours to drive — try it before reading on.
Watch the 60-second explainer
A condensed visual walkthrough — narrated, captioned, under a minute.
How MapReduce works
You have a petabyte of web-server logs and one question: how many times does each URL appear? No single machine can hold the data, let alone sort it. MapReduce, introduced by Jeffrey Dean and Sanjay Ghemawat at Google in 2004, answers this with a deceptively small idea: express the whole computation as two pure functions, and let the framework handle every hard distributed-systems problem behind them.
The model has exactly three phases:
- Map. The framework splits the input into chunks (typically 64–128 MB each) and runs your
map(key, value)on every record, in parallel, on whichever machines hold the data. Each call emits zero or more intermediate(k, v)pairs. For URL counting, map reads a log line and emits(url, 1). - Shuffle. This is the framework's job, not yours. It sorts every intermediate pair by key and routes all values for a given key to the same reducer. After the shuffle, a reducer sees
(url, [1, 1, 1, ...])— one group per distinct key, with every value for that key, no matter which mapper produced it. - Reduce. Your
reduce(key, values)runs once per key and folds the list of values into a result. For URL counting it sums the ones:(url, 42).
The conceptual signature is clean:
map : (k1, v1) → list(k2, v2)
shuffle (framework) : list(k2, v2) → list(k2, list(v2))
reduce: (k2, list(v2)) → list(k3, v3)
The leverage comes from what you didn't write. There is no code for partitioning the input, scheduling thousands of tasks, moving data across the network, sorting it, retrying failed machines, or detecting stragglers. The framework does all of that, identically, for every job — because map and reduce are pure functions of their inputs, they can be re-run anywhere, any number of times, with the same result.
The mechanism, phase by phase
Splits and map parallelism. A job with input size D and split size S launches M = ⌈D / S⌉ map tasks. With 1 TB of input and 128 MB splits, that's about 8,000 map tasks. The scheduler tries to place each map task on a machine that already holds its split (data locality), so most of the input never crosses the network. Map is O(D) total work, spread across however many slots the cluster has — the wall-clock map time is roughly D / (slots × per-slot throughput).
Partitioning. Each map output pair is assigned to one of R reducers by a partition function, by default hash(key) mod R. This guarantees all pairs with the same key land in the same partition, which is what makes the per-key grouping possible. The number of reducers R is yours to choose and directly controls reduce parallelism.
The shuffle. Each mapper writes R sorted, partitioned spill files to local disk. Each reducer then pulls its partition from every mapper — an all-to-all transfer of up to M × R network connections — and merges those sorted runs into one sorted stream. This sort-merge is why MapReduce reducers always see keys in sorted order. The shuffle is the only phase that moves the full intermediate dataset across the network, and it is almost always the bottleneck.
Reduce. Walking the merged, sorted stream, the framework detects key boundaries and calls your reducer once per key with an iterator over that key's values. Reduce output is written to the distributed file system, one output file per reducer.
Total work is O(D) for map plus O(D log D) for the distributed sort in the shuffle, but the wall-clock time is what you actually care about, and it collapses with parallelism — that's the entire point.
When to use MapReduce (and when not to)
- One-pass batch aggregation over data that doesn't fit on one machine — counts, sums, histograms, inverted-index construction, log analysis, ETL.
- Embarrassingly parallel record transforms where each input row is processed independently in map.
- Workloads where input is read once and throughput matters more than latency — MapReduce is a batch system, not an interactive one.
- Clusters of unreliable commodity hardware, because the re-run-on-failure model tolerates routine machine death.
It is a poor fit when:
- The computation is iterative (PageRank, k-means, gradient descent). Each iteration is a fresh MapReduce job that re-reads the whole dataset from disk. Spark, which keeps data in memory between iterations, is the right tool — often 10–100× faster here.
- You need low latency. Even a trivial job has tens of seconds of fixed startup overhead. For sub-second queries, use a query engine or index.
- The algorithm needs random access or fine-grained updates. MapReduce streams over data in bulk; it has no notion of mutating a single record.
- The job is small. If your data fits in RAM on one box, a single
sort | uniq -cbeats spinning up a cluster.
MapReduce vs other large-scale processing models
| MapReduce | Spark (RDD/DAG) | SQL on MPP | Streaming (Flink) | Single-node sort | uniq | |
|---|---|---|---|---|---|
| Programming model | map + reduce | functional DAG of transforms | declarative queries | continuous operators | Unix pipes |
| Intermediate data | spilled to disk every stage | in-memory (cached RDDs) | in-memory + spill | in-flight state | OS buffers |
| Iterative workloads | re-reads disk each pass (slow) | excellent (data stays hot) | moderate | n/a (unbounded) | n/a |
| Latency | minutes (high startup cost) | seconds–minutes | seconds | milliseconds | milliseconds |
| Fault tolerance | re-run failed task | lineage recomputation | query restart | checkpoint + replay | none |
| Max practical scale | thousands of machines, PB | thousands, PB | hundreds, PB | hundreds, unbounded | one machine, ~RAM |
| Best for | one-pass batch ETL | iterative + interactive batch | analyst queries | real-time pipelines | data that fits in RAM |
The headline difference between MapReduce and Spark is what happens between stages. MapReduce materializes intermediate output to disk after every map and every reduce — durable, simple to recover, and slow. Spark keeps it in memory and recomputes only what's lost from lineage. For a single map-then-reduce pass the gap is modest; for a 30-iteration machine-learning loop, MapReduce pays the full disk round-trip 30 times while Spark pays it roughly once.
What the numbers actually say
- The original 2004 paper reported a 1,800-machine cluster sorting 1010 100-byte records (≈1 TB) in 891 seconds, and Google was already running upwards of one thousand MapReduce jobs per day internally.
- Map is data-local; shuffle is not. On a job that reads 1 TB and emits 1 TB of intermediate data, the shuffle alone moves ~1 TB across the network. At 1 GB/s of usable bisection bandwidth per machine that's the dominant cost — which is exactly why a combiner matters.
- A combiner on word count turns thousands of
("the", 1)pairs per map task into one("the", 8324)pair. On natural-language text a good combiner cuts shuffle volume by 90% or more, because a handful of stop-words dominate the term distribution (Zipf's law). - Stragglers, not failures, dominate tail latency. The paper notes that disabling speculative execution of slow tasks increased one sort job's completion time by 44% (891 s → 1,283 s), because a handful of straggler reduce tasks finished long after the rest. The slowest 1% of tasks can otherwise dominate total runtime.
- Reduce can't start its user code until map is done — there's a barrier. A reducer can pre-fetch map output as mappers finish, but the reduce function only runs once all map output for its keys has arrived and been sorted.
JavaScript implementation
A faithful single-process model of map → shuffle → reduce. Real frameworks distribute these steps, but the data flow is identical.
// A generic MapReduce engine. mapFn emits [k, v] pairs;
// reduceFn folds all values for one key into a result.
function mapReduce(inputs, mapFn, reduceFn, { combineFn = null } = {}) {
// ---- MAP (would be parallel across machines) ----
let emitted = [];
for (const record of inputs) {
for (const pair of mapFn(record)) emitted.push(pair);
}
// ---- optional COMBINE (map-side mini-reduce) ----
if (combineFn) emitted = shuffleAndReduce(emitted, combineFn);
// ---- SHUFFLE + REDUCE ----
return shuffleAndReduce(emitted, reduceFn);
}
// Group pairs by key (the shuffle), then apply fn to each group.
function shuffleAndReduce(pairs, fn) {
const groups = new Map(); // k -> [v, v, ...]
for (const [k, v] of pairs) {
if (!groups.has(k)) groups.set(k, []);
groups.get(k).push(v);
}
// Sorted-key iteration mirrors the framework's sort guarantee.
const out = [];
for (const k of [...groups.keys()].sort()) {
out.push([k, fn(k, groups.get(k))]);
}
return out;
}
// ---- WORD COUNT ----
const docs = ['the cat sat', 'the dog ran', 'the cat ran'];
const countMap = doc => doc.split(/\s+/).map(w => [w, 1]);
const sumReduce = (_key, values) => values.reduce((a, b) => a + b, 0);
// The combiner is the SAME function as reduce here — safe because
// addition is commutative and associative.
console.log(mapReduce(docs, countMap, sumReduce, { combineFn: sumReduce }));
// [['cat', 2], ['dog', 1], ['ran', 2], ['sat', 1], ['the', 3]]
Two details that mirror the real system. First, shuffleAndReduce is reused for both the optional combiner and the final reduce — a combiner really is just a reduce that runs early, on the map side. Second, reducers see keys in sorted order; that's not incidental, it falls out of the framework's sort-merge of map output.
Python implementation
The same engine in Python, plus a second classic job — building an inverted index, the data structure behind every search engine.
from collections import defaultdict
def map_reduce(inputs, map_fn, reduce_fn, combine_fn=None):
# ---- MAP ----
emitted = []
for record in inputs:
emitted.extend(map_fn(record))
# ---- optional COMBINE (map-side) ----
if combine_fn:
emitted = _shuffle_reduce(emitted, combine_fn)
# ---- SHUFFLE + REDUCE ----
return _shuffle_reduce(emitted, reduce_fn)
def _shuffle_reduce(pairs, fn):
groups = defaultdict(list) # the shuffle: group by key
for k, v in pairs:
groups[k].append(v)
return [(k, fn(k, groups[k])) for k in sorted(groups)]
# ---- WORD COUNT ----
docs = ['the cat sat', 'the dog ran', 'the cat ran']
word_map = lambda doc: [(w, 1) for w in doc.split()]
sum_reduce = lambda _k, vs: sum(vs)
print(map_reduce(docs, word_map, sum_reduce, combine_fn=sum_reduce))
# [('cat', 2), ('dog', 1), ('ran', 2), ('sat', 1), ('the', 3)]
# ---- INVERTED INDEX: term -> list of doc ids ----
corpus = [(0, 'the cat'), (1, 'the dog'), (2, 'a cat')]
def index_map(doc):
doc_id, text = doc
return [(term, doc_id) for term in text.split()]
def index_reduce(_term, doc_ids):
return sorted(set(doc_ids)) # postings list, deduped
print(map_reduce(corpus, index_map, index_reduce))
# [('a', [2]), ('cat', [0, 2]), ('dog', [1]), ('the', [0, 1])]
Note that the inverted-index job uses set in the reducer — its reduce is not a simple commutative-associative fold over a running total, so you can't blindly reuse it as a combiner. You could write a combiner that emits deduped local doc-id lists, but you can't just hand index_reduce the same role unchanged the way word count can. Knowing which reducers are combiner-safe is the practical heart of MapReduce.
Variants and the wider lineage
Combiners. A map-side pre-aggregation, covered above. Legal only when reduce is commutative and associative; the framework may call it zero, one, or many times, so it must produce the same final answer regardless.
Custom partitioners. Replace the default hash(key) mod R to control which reducer gets which keys — used to keep related keys together, implement a total sort, or split a hot key across reducers (salting).
Secondary sort. Sort values within a key, not just the keys, by encoding the sort field into a composite key and teaching the partitioner and grouping comparator to ignore it. Lets a reducer see a key's values in a chosen order without buffering them all in memory.
Map-only jobs. Set R = 0 when there's nothing to aggregate — pure record transforms, format conversions, filtering. Skips the shuffle entirely and writes map output straight to the file system.
Chained jobs. Many real problems are a pipeline of MapReduce jobs (PageRank = one job per iteration). Higher-level tools — Pig, Hive, Cascading — compile a query into a chain of MapReduce stages so you don't hand-write each one.
The successors. Apache Spark generalizes map/reduce into a lazy DAG of in-memory transformations with lineage-based recovery; Apache Flink and Google Dataflow extend the same shape to unbounded streams. All three trace directly back to the 2004 model — they kept the "user writes pure functions, framework owns distribution" insight and dropped the mandatory disk round-trip between stages.
Common bugs and edge cases
- Data skew on a hot key. One reducer gets the word "the" — or one user_id, or one date — and runs long after the rest finish. The whole job waits on it. Fix with a combiner, a custom partitioner, or salting the hot key into N sub-keys.
- Using a combiner with a non-associative reduce. Averaging by summing then dividing is wrong with a combiner, because the partial sums lose their counts. Emit
(sum, count)pairs and divide only in the final reduce. - Assuming map output order. Map tasks run in parallel and in arbitrary order; never rely on the order records arrive or on global state across map calls. Each map call must be independent.
- Buffering all of a reducer's values in memory. A reducer receives an iterator, not a list, precisely because a key can have billions of values. Materializing them into a list OOMs the reducer. Stream them; use secondary sort if you need them ordered.
- Too few or too many reducers. One reducer serializes the whole reduce phase; ten thousand reducers create ten thousand tiny output files and crush the file-system master. Size R to the data, not to a default.
- Non-deterministic map or reduce. Reading the wall clock, a random seed, or external mutable state breaks the re-run-on-failure guarantee — a retried task can produce a different answer, corrupting output silently.
- Reaching for MapReduce when the data fits in RAM. Cluster startup overhead dwarfs the work. If
awkor a single Python script finishes in a minute, use it.
Frequently asked questions
What are the three phases of MapReduce?
Map, shuffle, and reduce. Map applies a user function to each input record and emits intermediate key-value pairs. Shuffle is framework-owned: it sorts and groups all pairs by key and routes each group to a reducer. Reduce applies a user function to all values for one key. Only map and reduce are written by you; shuffle is the framework's job and where most of the network cost lives.
Why is the shuffle the most expensive part of MapReduce?
Map output is local, but every reducer needs all values for its keys regardless of which mapper produced them, so the shuffle moves data across the network in an all-to-all pattern. For M mappers and R reducers there are up to M×R transfers, plus an external sort on both sides. A combiner that pre-aggregates map output is the single biggest lever for cutting shuffle volume.
What is a combiner and when can you use one?
A combiner is a mini-reduce that runs on the map side before the shuffle, collapsing many local pairs into fewer. It is safe only when the reduce function is commutative and associative — sum, count, max, min work; computing a median or average from raw values does not, because partial aggregation changes the result. Word count's combiner can cut shuffle traffic by orders of magnitude.
How does MapReduce handle a machine that fails mid-job?
Because map and reduce are deterministic functions of their inputs, the framework just re-runs failed tasks elsewhere. A dead mapper's input split is re-read and re-executed; a dead reducer re-pulls its map outputs. The master also launches speculative duplicates of slow "straggler" tasks and keeps whichever finishes first. This is why MapReduce scaled to thousands of commodity machines where individual failures are routine.
Is MapReduce the same as Hadoop or Spark?
No. MapReduce is the programming model from Google's 2004 paper. Hadoop MapReduce is the open-source implementation of that model. Apache Spark is a successor engine that generalizes the model with in-memory RDDs and a DAG scheduler, often running 10–100× faster on iterative workloads by avoiding the disk write between every map and reduce.
What happens when one key has far more values than the others?
That is data skew, and it is MapReduce's classic failure mode. One reducer gets the hot key — say the word "the" in a corpus — and runs long after every other reducer has finished, so the whole job waits on it. Fixes include a combiner, a custom partitioner, or salting the hot key into N sub-keys that are reduced separately and merged.