August 18, 2014

Embedded MapReduce with Flambo

I recently stumbled across a neat library called Flambo. Flambo is a clojure wrapper for Spark, and it does a really great job keeping everything nice and Clojure-y. I wanted to show you so you can enjoy it too!

Let’s look at an example. A word-count example! We have a huge file and we want to count word frequencies. I used the news.2010.en.shuffled from the training-monolingual.tgz archive available from https://code.google.com/p/1-billion-word-language-modeling-benchmark/

We’ll share some utility functions between our non-flambo version and the flambo one:

(defn lower-case [w] (.toLowerCase w))
(defn split-line [line]
  (filter (comp not empty?)
          (map lower-case (s/split line #"[^a-zA-z]+"))))

(defn write-result [filename wordmap]
  (let [writer (io/writer filename)]
    (doseq [[word cnt] wordmap]
            (csv/write-csv writer [[word cnt]]))))

We want to be sensitive to memory usage, so first, let’s see how the reducers version looks:


;; With reducers

; A combiner for use with fold.
(defn combine-fn
  ([] {})
  ([m1 m2] (merge-with + m1 m2)))

(defn reducer-word-count [filename]
  (->> (io/reader filename)
       (line-seq)            ; Read the lines
       (r/mapcat split-line) ; Get a stream of words
       (r/fold               ; Count them up
         combine-fn
         (fn [results word]
           (update-in results [word] (fnil inc 0))))
       (write-result "output-reducers.csv")))

I think we did pretty good here. Line-seq with io/reader is lazy, and so should not try to read the whole thing at once. Using reducers’ mapcat won’t trigger a full evaluation either, and fold will automatically parallelize the operation (although io is probably the bottleneck).

This implementation works great on smaller files, but hangs and then crashes on the 2gb text file, at least in my tests. So obviously we need to do something else.

Enter flambo:

;; With flambo

(def c (-> (conf/spark-conf)
           (conf/master "local[8]") ; 8 partitions
           (conf/app-name "flambo_test")))

(def sc (f/spark-context c))

(defn flambo-word-count [filename]
    (-> (f/text-file sc filename)
        (f/flat-map (f/fn [line] (split-line line)))
        (f/map (f/fn [word] [word 1]))
        (f/count-by-key)
        (->> (write-result "output-flambo.csv"))))

On a dataset of 2000 lines, reducers took 400ms while flambo took about 1.5s. On 100,000 lines, both took around 8 seconds. For the full 17.6 million lines, for the full dataset, flambo took around 900 seconds to do the whole thing, which is a lot better than crashing.

So hey, that’s pretty great! No need to set up a spark or hadoop cluster to get your wordcount rolling, or to start testing. The syntax compares very favorably to Scala (disclaimer, I did not run or test this):

def flamboWordCount(filename:String):Unit {
    val result = sc.textFile(filename)
                    .flatMap((line) => splitLine(line))
                    .map((word) => Tuple2(word, 1))
                    .countByKey();

    writeResult("output-flambo.csv", result)
}

But, in flambo, you can even use a repl and live-code your map-reduce job (probably with some smaller sample data). That’s a lot more fun than compiling a job and submitting it to Spark between each round of debugging, isn’t it?

Of course, Flambo offers the ability to submit jobs to an existing Spark cluster too. Just use lein uberjar to create a jar and submit to Spark as usual (you’ll probably want to gen-class for this).

If you like Clojure, and you need to do some large-scale data processing, you should definitely take Flambo for a spin.