February 18, 2018

One-box stream processing with CSP

A simple streaming data architecture using clojure.core.async

If you’re like me (that is, employed by an ad tech company), stream processing is usually associated with frameworks like Storm, Flink, Spark Streaming, and other such solutions. However, a lot of real-life software can be described as stream processing – data comes in one end, is transformed or aggregated, and goes somewhere else. Many of these workloads don’t justify the overhead of a stream processor, but that doesn’t mean they can’t benefit from some of the lessons of stream processing systems.

In this article I’ll be going into detail of a small-scale stream-processing architecture for Clojure, built around the core.async and component libraries. This architecture allows us to build modular components connected entirely by asynchronous queues, allowing for very flexible systems. The lessons herein can also be broadly applied to other coroutine systems in other languages, such as Go and Kotlin.

Motivation

Existing stream-processing architectures concern themselves extensively with problems that arise from distributed computing: ensuring that datapoints are processed at least/exactly once and generally handling the distribution of datapoints to nodes and processes within them. Most such systems promote a similar architecture, wherein you are reponsible for defining components to transform incoming datapoints, which match the system’s API regarding incoming and outgoing data, and with assembling those components into your final system.

The basic principles of designing a system based on stream-based CSP are quite similar; you have various processes, which are connected to each other via channels, which act as queues. Even if we’re not writing a distributed system, approaching a streaming-data system from this angle can provide us some benefits. Specifically, we will be forced to develop reusable components which are loosely coupled (in this case, via core.async channels).

Later, if we want to make design changes – a simple example would be adding persistent queues between components – we can completely reuse the existing components, and need only adapt the external service to a core.async channel.

CSP with core.async

Clojure’s core.async is inspired by Go’s Concurrent Sequential Processing features. It allows you to structure your program as a serious of asynchronous state machines, which can logically block waiting for each other without holding a thread while they do it. This is called parking, and it can help simplify highly-concurrent workloads substantially. I’ll be quickly reviewing Clojure’s implementation here, but if you’re not familiar with the subject, Brave Clojure has a great primer you should read.

The basic unit of a core.async workflow is a channel. A channel behaves like a queue – items can be put to it, or taken to it, and will retain the order that they were placed on the queue. Outside of a go block, you can asynchronously (with a callback) put! to or take! from a channel, or you can synchronously (blocking a thread) put (>!!) or take (<!!) from it:

(require '[clojure.core.async :as a])
(def mychan (a/chan))

; Asynchronous take
(a/take! mychan prn)

; Blocking put (callback will be invoked)
(a/>!! mychan :test)

To use the “parking” feature, you can use the go macro, wherein the parking put >! and take <! can be used:

; Parking take
(a/go (prn (a/<! my-chan)))

; Parking put
(a/go (a/>! my-chan :test2))

The go macro will, effectively, rewrite its contents into an event-driven state machine, allowing channel operations to wait for responses without holding up a thread. A single machine can easily handle thousands (or more) of parking operations (and therefore go blocks) simultaneously. The contents of a go block are sometimes said to be run in an “Inversion of Control” (IOC) thread.

Channels can be closed; takes on closed channels will return the contents of the buffer until it is empty and nil thereafter (NB that nils cannot be placed on a channel on purpose, and therefore always signify a closed channel). Puts to closed channels will complete immediately and cause >!! and friends to return false, signifying that the put failed.

Buffered channels

By default, channels are unbuffered, and attempts to put to a channel will block until some other process is ready to take from it. This is an extremely useful property, since it allows us to build in back-pressure. However, sometimes it can be useful to have channels store message – this allows processes putting to those channels to proceed without waiting for the message to be taken.

A fixed buffer of size n will allow n items to be put to the channel and stored. If the buffer is full, puts will block. If you don’t mind losing some messages and want to make sure your source process can continue unimpeded, you have your choice of sliding or dropping buffers.

A sliding buffer, when full, will remove the least recent message (i.e. the next message that would be taken from the channel):

A dropping buffer will allow puts when full, but discard the message:

Here’s how to create channels with different buffer types

(def fix-chan (a/chan (a/fixed-buffer 2)))
(def fix-chan2 (a/chan 2)) ; Creates a fixed buffer by default
(def drop-chan (a/chan (a/dropping-buffer 2)))
(def slide-chan (a/chan (a/sliding-buffer 2)))

Transducers

Transducers are a utility devised to generalize collection operations. There is a lot to know about transducers, but for our purposes, what you need to know is that channels can be created with attached transducers, which means that their content will be transformed automatically as it passes through the channel:

(def xform (map clojure.string/upper-case))
(def scream-chan (a/chan 1 xform))

(a/put! scream-chan "hello")
(a/<!! scream-chan) ; "HELLO"

Since transducers can be applied generally, to collections as well as channels (try (eduction xform ["hello"])), they are easy to write and test.

Splitting and combining channels

It’s straightforward to pipe one channel to another:

(a/pipe in-chan out-chan)

By default, pipe will automatically close out-chan if in-chan is closed. See also pipeline, which does the same but multithreaded and with a transducer. See also pipeline, which can apply a transducer and add parallellism.

Many-to-one many-to-one channels are easy enough; just put to a channel from one or more sources (or use merge).

Sometimes, however, you will have more than one process consuming from the same input stream. In this case, you can use core.async’s built-in mult/tap functionality.

You can use mult to create a mult – that is, a quasi-channel that can be consumed by more than one consumer. Every mult has a single backing channel; writes to that channel will be distributed synchronously to all taps. When using mult, remember that no other process should consume from the backing channel – otherwise you’ll have a read race and the mult may not receive other messages.

(def c (line-chan "myfile.txt"))
(def c-mult (a/mult c))

To add a consumer to a mult, use tap:

; tap returns the channel, so you can do this:
(def out-c (a/tap c-mult (a/chan)))

(a/put! c-mult "hello")
(a/<!! out-c) ; "hello"

Other solutions for multiple-channel designs are pub and mix; you can read more about these in the docs.

Designing an application

The basic principles of designing a system based on stream-based CSP are quite simple; you have various processes, which are connected to each other via channels, which act as basic queues.

How work is divided between the processes can be important; as a rule, remember that go blocks are cheap but not free.

Sources and sinks

Unless you have extremely good luck, the entry and exit points of your pipeline will not be native core.async channels, and so you’ll have write your own adapter. The following function will open a file and put the contents into a channel that it returns.

(defn line-chan [filename]
  (let [out-chan (a/chan)] ; 1
    (a/thread ; 2
      (with-open [f (io/reader filename)]
        (try
          (doseq [line (line-seq f)]
            (a/>!! out-chan line)) ; 3

          (finally ; 4
            (a/close! out-chan)))))
  out-chan ; 5
))
  1. Note the use of an unbuffered channel. This will cause the later >!! operation to block until each line is taken from it. A fixed buffer may be used if you like; however, a sliding or dropping buffer would just immediately consume the file and drop most of the records.

  2. Here, thead is used instead of go. All go blocks in your program (or at least those in a single running Java process) use a shared thread pool. For this reason, it’s usually advisable to use thread instead of go, to avoid using up a thread from go’s thread pool on non-parking i/o.

  3. Since we’re not inside of a go block, we must use >!! instead of >!.

  4. It is generally not a big problem if you don’t close a channel, but you should endeavour to do so for the sake of building good habits if nothing else.

  5. Remember to return the output channel!

Intermediate processes

Most of your intermediate processes will use go-loop to consume indefinitely from a channel. I’ve found that this basic pattern is generally useful:

(a/go-loop []
  (when-let [v (a/<! in-chan)] ; 1
    (do-something v) ; 2
    (recur))) ; 3
  1. The use of when-let ensures that the loop will terminate if in-chan is closed.

  2. Often, do-something will involve putting to another channel.

  3. Don’t forget to keep looping!

Mapping and Filtering

Writing components that map or filter components is straightforward, using the above pattern:


(defn mapc [f in-chan out-chan]
  (go-loop []
    (when-let [v (a/<! in-chan)]
      (a/>! out-chan (f v))))

(defn filterc [pred in-chan out-chan]
  (go-loop []
    (when-let [v (a/<! in-chan)]
      (if (pred v)
        (a/>! out-chan v))
      (recur))))

However, you should generally prefer to express mapping and filtering operations as transducers for the sake of resource use and brevity. If necessary, you can use pipe to help with this:

(defn mapc [f in-chan]
  (a/pipe in-chan (a/chan 0 (map f))))

; Or, for free parallelization, use pipeline (here with a thread pool of size 4)
(defn mapc-fast [f in-chan]
  (let [out-chan (a/chan)]
    (a/pipeline 4 out-chan (map f) in-chan)
    out-chan))

Aggregation

Aggregations are necessarily stateful, so we’ll need a way to store that state. An easy way to do this is to update that state via go-loop:

(defn count-occurrances
  "Given a channel that streams words, emit a map of word to count over
   a given window size"
  [window-size word-chan]

  (let [out-chan (a/chan)]
    (a/go-loop [ii 0 counts {}]
      (if (>= ii window-size)
        (do
          (a/>! out-chan counts)
          (recur 0 {}))
        (if-let [word (a/<! word-chan)]
          (recur (inc ii) (update counts word (fnil inc 0)))
          (a/close! out-chan))))
     out-chan))

Components

To manage all those channels and processes, I’ve found it helpful to use Components. Component components describe long-running processes with resources that must be opened and closed, which accurately describes typical go loops and channels. Actually, I’ve found it helpful to use Component to structure most Clojure software I write; the restrictions imposed by this design tend to guide me towards writing more general components and less interconnected and coupled systems.

Components are plain clojure records that implement Lifecycle. Use the record fields to store configuration, references to provided components and/or resources that must be opened, exposed, and closed.

In this case, our components will consist of a channel plus a process. The components will store channel references, and other channels that depend on them can take from those channels and use them in their own processing.

Here’s an example source component:

(require '[com.stuartsierra.component :as component])

(defrecord LineStream [conf out-chan]
  component/Lifecycle
  (start [{{:keys [filename]} :conf :as this}]
    (assoc this :out-chan (line-chan filename)))

  (stop [{:keys [out-chan] :as this}]
    (a/close! out-chan)
    (dissoc this :out-chan)))

Note the reuse of the line-chan function from earlier. This is actually the only service that requires closing; consumers are written such that their channels will close when the input channel is closed.

Also, if this is starting to smell like an overengineered word count example, you’re completely right. Here are the other components (both of the xform type, as above):

(defrecord WordStream [conf line-stream out-mult]
  component/Lifecycle
  (start [{line-stream :line-stream :as this}]

    (let [xform (comp (map clojure.string/lower-case)
                      (mapcat #(clojure.string/split % #"\s")))
          out-chan (a/chan)]
      (a/pipeline 4 out-chan xform (:out-chan line-stream))

      ;; Use a mult to support multiple consumers
      (assoc this :out-mult (a/mult out-chan))))

  ; No need to close chan, since a/pipeline will close it
  (stop [this] (dissoc this :out-mult)))


(defrecord WordCount [conf word-stream out-chan]
  component/Lifecycle
  (start [{{:keys [window-size buf-or-n]
            :or {buf-or-n 10}} :conf
           word-stream :word-stream
           :as this}]
    (let [in-chan (a/tap (:out-mult word-stream) (a/chan))]
      (assoc this :out-chan (count-occurances window-size in-chan))))

  (stop [this] (dissoc this :out-chan)))

Defining a system

Once defined, these components can be assembled into a system that can be started and stopped as a whole:

(defn mk-system [filename]
  (component/system-map 
    :line-stream (->LineStream {:filename filename} nil)
    :word-stream (component/using (->WordStream {} nil nil)
                                  [:line-stream])
    :word-count (component/using (->WordCount {:window-size 10000} nil nil)
                                 [:word-stream])))

For debugging: the system map can be stored and the word-count output channel accessed to get the results.

(defn run []
  (component/start (mk-system "mobydick.txt")))

(comment
  ; Repl-only stuff goes here

  ; Start the system
  (def system (run))

  ; Check that words are being correctly produced
  (a/<!! (-> system :word-stream :out-mult (a/tap (a/promise-chan))))

  ; Get counts
  (a/<!! (-> system :word-count :out-chan))

  ; Stop the system
  (component/stop system)
  )

I have several applications presently running on this general architecture, and I’ve been surprised at just how rock-solid they’ve been.

I think my favorite thing about this design is the ease with which it can be remixed, refactored, and repurposed. Components can be changed and rewritten without requiring major changes to other parts of your program.

Since core.async channels are such a general tool, it’s also easy to adapt the channels connecting the components to add additional capabilities. To name one example: Kinsky is a Kafka library that provides a core.async channel interface; you barely have to change a thing to add a persistent, distributed, ultra-high-throughput message log to your system.

That’s it! I hope you learned something!