• mina86.com

  • Categories
  • Code
  • Contact
  • Map-reduce explained

    Posted by Michał ‘mina86’ Nazarewicz on 18th of May 2014 | (cite)

    Outside of functional programming context, map-reduce refers to a technique for processing data. Thanks to properties of map and reduce operations, computations which can be expressed using them can be highly parallelised, which allows for faster processing of high volumes of data.

    If you’ve ever wondered how tools such as Apache Hadoop work, you’re at the right page. In this article I’ll explain what map and reduce are and later also introduce a shuffle phase.

    Even though Python is a terrible language [pl] which misuses terms from functional programming world, I’ll use it to demonstrate the concepts. Lisp or Haskell would arguably be better, but people knowing those languages are likely familiar with map-reduce anyway.

    By the way, all code and data used in this article can be accessed at map-reduce-explained git repository.

    Map and reduce functions

    map(function, sequence) → list
    
    reduce(function, sequence, initial_state) → final_state
        # function(state, element) → state

    map applies function to each element of a sequence and returns a list containing results of those applications. reduce allows a sequence to be reduced (hence the name) into a single value. function is called for each element of the sequence with state passed as another argument. It returns a new state which will be passed to next invocation of the function. Here are some simple examples:

    >>> # Square each element in a list of numbers
    ... map(lambda x: x*x, [1, 2, 3, 4])
    [1, 4, 9, 16]
    
    >>> # Sum elements of a list
    ... reduce(lambda state, el: state + el, [1, 4, 9, 16], 0)
    30
    
    >>> # Sum squares of elements of a list
    ... reduce(lambda state, el: state + el,
    ...        map(lambda x: x*x, [1, 2, 3, 4]),
    ...        0)
    30
    
    >>> # Calculating values of a polynomial
    ... def poly(coefficients, x):
    ...     # go from a_n to a_0
    ...     coefficients = reversed(coefficients)
    ...     return reduce(lambda state, a: state * x + a,
    ...                   coefficients, 0)
    ... poly([1, -2, 3], 2)
    9
    >>> x = 2
    >>> ((3 * x) - 2) * x + 1
    9

    Analysing logs

    Let’s use this approach to find out how many words each user of #slackware.pl FreeNode IRC channel (formerly #forum.slackware.pl at QuakeNet) have said over the years. The map stage will take a line from the logs and return user name and number of words in that line:

    import sys, re
    
    def getWords(line):
        m = re.search(r'^\[\d\d:\d\d\] <([^>]*)> (.*)$', line)
        if not m:
            return '_', 0
        return m.group(1), len(re.split(r'\s+', m.group(2).strip()))
    
    for nick, count in map(getWords, sys.stdin):
        print nick, count

    This script can be used to analyse all the logs by feeding them through it:

    cat temp/logs/* | python code/calc-words.py

    This is a bit wasteful of a multi-core machine though. Fortunately, it’s enough to realise the result of the getWords function depends on a single line only. Therefore in the most extreme case we could spawn a process for each line in the logs. Because of process creation and synchronisation overhead, such approach would turn out to be slower.

    The way to go is to shard the whole log into smaller chunks and map entries in each of them separately. Fortunately, the data is already split into per-day files. Parallelising the process is as simple as running a process for each file:

    mkdir -p temp/words
    parallel "$@" --eta '
    	< {} python code/calc-words.py >temp/words/{#}' \
    	::: temp/logs/*
    cat temp/words/*

    The next step is to aggregate and calculate the sum for each user:

    import sys
    
    def aggregate(state, line):
        name, count = line.strip().split(' ')
        state[name] = state.get(name, 0) + int(count)
        return state
    
    for nick, count in reduce(aggregate, sys.stdin, {}).iteritems():
        print nick, count

    The script can be run as:

    cat temp/words/* | python code/reduce.py

    But again, only one CPU does the work while all the others are slacking off. This time, each step depends on the result of the previous one, so they cannot be run independently of one another, they have to be run in sequence. Fortunately, result for nick ‘foo’ does not depend on results for nick ‘bar’ so there could be a separate reduce processes for each nick.

    The result of map phase is not split by usernames though. This is where shuffle comes into play. It takes output of the map phase and divides it into shards suitable for processing in the reduce phase. For processing IRC logs, shuffle will read output of each map process and divide it into per-nick directories. Like so:

    import errno, os, sys
    
    _, outdir, shardname = sys.argv
    files = {}
    
    for line in sys.stdin:
        name, count = line.split(' ')
        if name not in files:
            dirname = os.path.join(outdir, name)
            if not os.path.isdir(dirname):
                os.makedirs(dirname)
            files[name] = open(os.path.join(dirname, shardname), 'w')
        files[name].write('%s %s\n' % (name, count))

    With that script, we can parallelise the reduce phase as well:

    rm -rf -- temp/shuffled
    mkdir -p temp/shuffled
    parallel "$@" --eta '
    	< {} python code/shuffle.py temp/shuffled {#}' \
    	::: temp/words/*
    
    mkdir -p temp/reduced
    parallel "$@" --eta '
    	cat {}/* |
    	python code/reduce.py >temp/reduced/{#}' \
    	::: temp/shuffled/*

    Real world

    In real deployments more care is taken to make the pipeline run efficient. For example, shards should be balanced and there should not be too many of them. In case of the above analysis, there are several issues:

    • There are 1 668 separate files. Unless the analysis is run on hundreds of parallel jobs, the overhead will be noticeable. Instead of running a process per file, the logs sholud be split into several groups (or shards) such that each job/worker has a few groups to work on.

    • #slackware.pl’s popularity decreased over time so the per-day files are quite unbalanced (this is in addition to weekly variance). When sharding the files, they should be grouped so that size of each group is similar.

    • The above two points apply to the output of shuffle phase as well. In real deployements shuffle should try to make the shards balanced and their number in the order of magnitude of the number of workers.

    I leave you, dear reader, with a homework to change map-reduce-explained code to address the above points. In case of the first two, file size is a good approximation of the amount of work needed to be done. For the last one, the way to go is to shuffle data not by the nick name, but by a hash of the nick name module 50 (or so). This would control the number of shards and hopefully distribute data uniformly across them.