hara.concurrent methods and datastructures for concurrency

Author: Chris Zheng  (z@caudate.me)
Date: 11 February 2018
Repository: https://github.com/zcaudate/hara
Version: 2.8.2

1    concurrent.latch

Add to project.clj dependencies:

[zcaudate/hara.concurrent.latch "2.8.2"]

hara.concurrent.latch supplies a simple primary/follower latch mechanism for atoms and ref such that if the primary is updated, then the followers will update as well

latch ^

followes two irefs together so that when `primary` changes, the `follower` will also be updated.

v 2.1
(defn latch
  ([primary follower] (latch primary follower identity))
  ([primary follower f] (latch primary follower f nil))
  ([primary follower f opts]
     (watch/add primary
                (keyword (hash-label primary follower))
                (latch-fn follower f)
(def primary (atom 1)) (def follower (atom nil)) (latch primary follower #(* 10 %)) (swap! primary inc) @primary => 2 @follower => 20

unlatch ^

removes the latch so that updates will not be propagated

v 2.1
(defn unlatch
  [primary follower]
  (watch/remove primary (keyword (hash-label primary follower))))
(def primary (atom 1)) (def follower (atom nil)) (latch primary follower) (swap! primary inc) @primary => 2 @follower => 2 (unlatch primary follower) (swap! primary inc) @primary => 3 @follower => 2

2    concurrent.notification

Add to project.clj dependencies:

[zcaudate/hara.concurrent.notification "2.8.2"]

hara.concurrent.notification introduces a way to be notified of changes, based on this post

alter-on ^

a redundant function. Used for testing purposes. The same as `(alter! ref f & args)` but the function is wired with the notification scheme.

v 2.1
(defn alter-on
  ([obj f]
     (wait-on #(dispatch % f) obj))
  ([obj opts? f & args]
     (wait-on #(apply dispatch % opts? f args) obj)))
(def atm (atom 0)) (alter-on atm #(do (Thread/sleep 300) (inc %))) @atm => 1

dispatch ^

updates the value contained within a ref or atom using another thread.

v 2.1
(defn dispatch
  ([obj f]
    (future (state/update obj f)))
 ([obj opts? f & args]
    (future (apply state/update obj opts? f args))))
@@(dispatch (atom 0) (fn [x] (Thread/sleep 200) (inc x))) => 1

notify ^

creates a watch mechanism so that when a long running function finishes, it returns a promise that delivers the updated iref.

v 2.1
(defn notify
  ([mtf rf] (notify mtf rf nil))
  ([mtf rf opts]
     (let [p  (promise)
           pk (keyword (hash-label p))]
       (watch/add rf pk
                  (fn [_ _ _ _]
                    (watch/remove rf pk opts)
                    (deliver p rf))
       (mtf rf)
(def res (notify #(do (Thread/sleep 200) (state/update % inc)) (ref 1))) res => promise? @res => iref? @@res => 2

wait-on ^

waits for a long running multithreaded function to update the ref. Used for testing purposes

v 2.1
(defn wait-on
  ([mtf rf]
     (deref (notify mtf rf)))
  ([mtf rf opts ms ret]
     (deref (notify mtf rf opts) ms ret)))
(def atm (atom 0)) (def f (fn [obj] (dispatch obj #(do (Thread/sleep 300) (inc %))))) (wait-on f atm) @atm => 1

3    concurrent.pipe

Add to project.clj dependencies:

[zcaudate/hara.concurrent.pipe "2.8.2"]

hara.concurrent.pipe provides a simple asynchronous pipe that can be sent tasks that are queued until previous tasks are complete

pipe ^

creates a pipe so that tasks can be acted upon asynchronously in order in which they were sent

v 2.2
(defn pipe
  (let [queue  (atom (common/queue))
        thread (atom nil)
        _      (add-handler queue handler thread)]
    (map->Pipe {:handler handler
                :queue   queue
                :thread  thread})))
(pipe (fn [msg] (println msg))) => #(instance? hara.concurrent.pipe.Pipe %)

send ^

sends a task to the pipe for it's handler to act upon

v 2.2
(defn send
  [pipe task]
  (swap! (:queue pipe) conj task))
(def atm (atom [])) (def p (pipe (fn [msg] (swap! atm conj msg)))) (do (pipe/send p 1) (pipe/send p 2) (pipe/send p 3) (Thread/sleep 100) @atm) => [1 2 3]

4    concurrent.propagate

Add to project.clj dependencies:

[zcaudate/hara.concurrent.propagate "2.8.2"]

hara.concurrent.propagate is an implemention around the concept of propagators, introduced by Sussman

cell ^

creates a propogation cell

v 2.1
(defn cell
  ([] (cell nothing))
  ([content] (cell content {}) )
  ([content opts]
     (Cell. (-> opts
                (assoc :content content)
(def cell-a (cell)) @cell-a => :hara.concurrent.propagate/nothing (def cell-b (cell "Hello")) @cell-b => "Hello" (cell-b "World") ;; invoking sets the state of the cell @cell-b => "World"

link ^

creates a propogation link between a set of input cells and an output cell

v 2.1
(defn link
  ([sources sink] (link sources sink straight-through))
  ([sources sink tf] (link sources sink tf {}))
  ([sources sink tf {:keys [label tdamp concurrent] :as options}]
     (let [pg (propagator label (assoc options
                                  :tf tf
                                  :in-cells sources
                                  :out-cell sink))]
       (doseq [s sources]
         (register-propagator s pg))
(def in-a (cell 1)) (def in-b (cell 2)) (def inter (cell)) (def in-c (cell 3)) (def out (cell)) (link [in-a in-b] inter +) (link [inter in-c] out +) (in-a 10) @inter => 12 @out => 15 (in-b 100) @inter => 110 @out => 113 (in-c 1000) @inter => 110 @out => 1110

unlink ^

removes the propagation link between a set of cells

v 2.1
(defn unlink
  (let [sources (:in-cells pg)]
    (doseq [s sources]
      (deregister-propagator s pg))
    (state/set pg :in-cells [])
    (state/set pg :out-cell nil)))
(def in-a (cell 1)) (def out (cell)) (def lk (link [in-a] out)) (in-a 10) @out => 10 (unlink lk) (in-a 100) @in-a 100 @out => 10