December 6, 2013

The many flavors of concurrency in Clojure

A code comparison of Lamina, Pulsar, core.async, and core.reducers

Clojure, being the extensible, malleable, rewritable language that it is, is spoiled for choice when it comes to making your cores work for you. You can pick your poison when it comes to making your program concurrent.

On Github:

You can find the code at this github repository

Today I’m going to take you on a stroll through 4 different ways of parallelizing a word count. How exciting! I’ve split the text for The Old Man and the Sea into four files, and I want to count all the words in each file and print a count of the words in each file, and an overall count of the words in all files combined.

I’m going to use 4 libraries and 4 concurrency styles to do this:

  • Lamina, using straightforward threaded channels
  • Pulsar, using erlang-style actor concurrency
  • core.async, using go-style channels
  • core.reducers, using whatever you want to call it

Oh, and keep any pedantry about my using “concurrent” and “parallel” interchangeably to a single thread, if you would.

First things first, though: here are the common utilities I’ll be using so you don’t get lost:

(ns wordcount.utils
  (:require [clojure.string :as str]
            [clojure.java.io :as io]))

(def pages ["page1.txt" "page2.txt" "page3.txt" "page4.txt"])

(defn text [filename]
  (slurp (io/resource filename)))

(defn wc [text]
  (count (str/split text #"\s" )))

core.reducers

core.reducers got invited to this party because of fold. fold acts like reduce, but will automatically parallelize if you’re reducing across enough things. We’re not, but let’s see how it looks anyhow:

(ns wordcount.sequential
  (:require [wordcount.utils :refer :all]
            [clojure.core.reducers :as r]))

(defn page-count [page]
  (let [count (wc (text page))]
    (println page ": " count)
    count))

(defn main- []
  (println "Word Count:" (r/fold + (r/map page-count pages))))

This is the shortest implementation you’ll see today, and the most obvious too. I won’t go into more detail, but here’s some more about reducers if you’re into it.

When to use it

It would be ridiculous for someone to actually use any of the other implementations in this article over this one for such a simple problem.

Lamina

I’ve written about Lamina before too, but I didn’t want to leave it out. Lamina exposes three functions that we’ll use: channel, enqueue, and read-channel. The first creates a channel, the next puts something in, and the last takes something out. Simple!

(ns wordcount.lamina
  (:require [wordcount.utils :refer :all]
            [lamina.core :refer [channel enqueue read-channel]]))

(def pages-chan (channel))
(def counts-chan (channel))

(defn pages-worker []
  (loop [page @(read-channel pages-chan)]
    (enqueue counts-chan [page (wc (text page))])
    (recur @(read-channel pages-chan))))

(defn counter-worker [expected]
  (let [received (atom 0)
        word-count (atom 0)]
    (loop [[page page-count] @(read-channel counts-chan)]
      (swap! word-count + page-count)
      (swap! received inc)
      (println page ": " page-count)
      (if (< @received expected)
        (recur @(read-channel counts-chan))
        ))
    @word-count))

(defn -main []
  (future (pages-worker))
  (future (pages-worker))
  (doseq [page pages]
    (enqueue pages-chan page))
  (println "Word Count:" 
           @(future (counter-worker (count pages)))))

Here, we first spin up two workers that will consume from the same channel of pages. Each of these workers will loop forever, taking a page filename from pages-chan, reading it, counting the words, and putting the result into counts-chan.

Another worker reads from counts-chan until enough results are received, then returns the final sum. Note that this implementation will run forever, since the page workers never return. This is not necessarily a bad thing, since the workers have no side effects. Indeed, most applications using Lamina will have long-running workers like these.

When to use it

The producer-consumer model is great when your primary concern is decoupling blocking actions from your main execution flow. An example I use often is sending a notification email in a web service without blocking the request. You can also use this to smooth resource usage by queuing incoming actions for sequential processing.

Pulsar (Actor concurrency)

Actor concurrency has always been my favorite concurrency, and Pulsar is a nice library to get it in Clojure. Actually, Pulsar will happily support any of the other concurrency techniques mentioned in this article; it even has an api-compatible core.async implementation. But we have core.async for that, and nothing else for actors, so here you go:

(ns wordcount.pulsar
  (:require [wordcount.utils :refer :all]
            [co.paralleluniverse.pulsar.core :refer [defsfn]]
            [co.paralleluniverse.pulsar.actors :refer [receive ! spawn] ]))

(defsfn wc-actor []
  (receive [:page page counter]
           (let [count (wc (text page))]
             (! counter :count page count))))

(defsfn counter-actor [expected]
  (let [received (atom 0)
        word-count (atom 0)]
    (while (< @received expected)
      (receive
       [:count page count]
       (do
         (println page ": " count)
         (swap! word-count + count)
         (swap! received inc))))
    (println "Word count: " @word-count)))


(defn -main []
  (let [counter (spawn counter-actor (count pages))]
    (doseq [page pages]
      (! (spawn wc-actor) :page page counter))))

A lot like channels, but slightly different. This time, we spawn the counter first. Then, we spawn a wc-actor for each page and immediately send it a message containing the page to which it is assigned, and the counter it should report back to.

The counter keeps track of how many messages it gets, and when it’s had enough it prints the word count and stops. Easy as that.

When to use it

Pulsar’s “fibers” are lightweight and fast, and so well-suited to applications with many workers. So, feel free to spawn many actors. You might find it useful to structure your actors in a heirarchy, with managers responsible for workers.

core.async

Heavily inspired by Go’s goroutines, core.async encourages a more ad-hoc version concurrency. The theory is much the same as the lamina version:

  1. (Asynchronously) dump all the word counts into a channel
  2. (Asynchronously) read ’em back out and sum them up
;; Core.async word count
;; 1) (Asynchronously) dump all the word counts into a channel
;; 2) Read 'em back out and sum them up
;; 3) Print the result
(ns wordcount.async
  (:require [wordcount.utils :refer :all]
            [clojure.core.async :refer [chan >! <! go <!!]]))

(defn put-pages
  "Put word counts into the channel <counts>"
  [counts]
  (doseq [page pages]
    (go (let [count (wc (text page))]
          (println page ": " count)
          (>! counts count)))))

(defn count-words
  "Read word counts from the channel <counts>"
  [counts]
  (let [word-count (atom 0)]
    (doseq [_ pages]
      (<!!
       (go (swap! word-count + (<! counts)))))
    word-count))

(defn -main []
  (let [counts (chan)]
    (put-pages counts)
    (println "Word Count: " (count-words counts))))

The secret of core.async is that go blocks are nonblocking (hence the <!! to force blocking in count-words).

There’s lots of actually cool stuff you can do with core.async if you want to check that out.

When to use it

Much like Pulsar, core.async’s routines are super lightweight, and the library will manage pooling for you. Use them to create as many workers as you need for your problem.

Conclusions

That’s all I’ve got. They’re all great, so pick the one you liked best.