Embedded MapReduce with Flambo
I recently stumbled across a neat library called Flambo. Flambo is a clojure wrapper for Spark, and it does a really great job keeping everything nice and Clojure-y. I wanted to show you so you can enjoy it too!
Let’s look at an example. A word-count example! We have a huge file
and we want to count word frequencies. I used the news.2010.en.shuffled
from the training-monolingual.tgz
archive available from
https://code.google.com/p/1-billion-word-language-modeling-benchmark/
We’ll share some utility functions between our non-flambo version and the flambo one:
(defn lower-case [w] (.toLowerCase w))
(defn split-line [line]
(filter (comp not empty?)
(map lower-case (s/split line #"[^a-zA-z]+"))))
(defn write-result [filename wordmap]
(let [writer (io/writer filename)]
(doseq [[word cnt] wordmap]
(csv/write-csv writer [[word cnt]]))))
We want to be sensitive to memory usage, so first, let’s see how the reducers version looks:
;; With reducers
; A combiner for use with fold.
(defn combine-fn
([] {})
([m1 m2] (merge-with + m1 m2)))
(defn reducer-word-count [filename]
(->> (io/reader filename)
(line-seq) ; Read the lines
(r/mapcat split-line) ; Get a stream of words
(r/fold ; Count them up
combine-fn
(fn [results word]
(update-in results [word] (fnil inc 0))))
(write-result "output-reducers.csv")))
I think we did pretty good here. Line-seq with io/reader is lazy, and so should not
try to read the whole thing at once. Using reducers’ mapcat won’t trigger
a full evaluation either, and fold
will automatically parallelize the operation
(although io is probably the bottleneck).
This implementation works great on smaller files, but hangs and then crashes on the 2gb text file, at least in my tests. So obviously we need to do something else.
Enter flambo:
;; With flambo
(def c (-> (conf/spark-conf)
(conf/master "local[8]") ; 8 partitions
(conf/app-name "flambo_test")))
(def sc (f/spark-context c))
(defn flambo-word-count [filename]
(-> (f/text-file sc filename)
(f/flat-map (f/fn [line] (split-line line)))
(f/map (f/fn [word] [word 1]))
(f/count-by-key)
(->> (write-result "output-flambo.csv"))))
On a dataset of 2000 lines, reducers took 400ms while flambo took about 1.5s. On 100,000 lines, both took around 8 seconds. For the full 17.6 million lines, for the full dataset, flambo took around 900 seconds to do the whole thing, which is a lot better than crashing.
So hey, that’s pretty great! No need to set up a spark or hadoop cluster to get your wordcount rolling, or to start testing. The syntax compares very favorably to Scala (disclaimer, I did not run or test this):
def flamboWordCount(filename:String):Unit {
val result = sc.textFile(filename)
.flatMap((line) => splitLine(line))
.map((word) => Tuple2(word, 1))
.countByKey();
writeResult("output-flambo.csv", result)
}
But, in flambo, you can even use a repl and live-code your map-reduce job (probably with some smaller sample data). That’s a lot more fun than compiling a job and submitting it to Spark between each round of debugging, isn’t it?
Of course, Flambo offers the ability to submit jobs to an existing Spark cluster too.
Just use lein uberjar
to create a jar and submit to Spark as usual (you’ll probably
want to gen-class
for this).
If you like Clojure, and you need to do some large-scale data processing, you should definitely take Flambo for a spin.