hara.concurrent.ova shared mutable state

Author: Chris Zheng  (z@caudate.me)
Date: 29 June 2017
Repository: https://github.com/zcaudate/hara
Version: 2.5.10

1    Introduction

An ova represents a mutable array of elements. It has been designed especially for dealing with shared mutable state in multi-threaded applications. Clojure uses refs and atoms off the shelf to resolve this issue but left out methods to deal with arrays of shared elements. ova has been specifically designed for the following use case:

  • Elements (usually clojure maps) can be added or removed from an array
  • Element data are accessible and mutated from several threads.
  • Array itself can also be mutated from several threads.

1.1    Installation

Add to project.clj dependencies:

[im.chit/hara.concurrent.ova "2.5.10"]

All functions are in the hara.concurrent.ova namespace.

(use 'hara.concurrent.ova)

1.2    Motivation

Coordination in multi-threaded applications have always been a pain. Most times situations are usally coordinated using a external store like a database or a cache. hara.concurrent.ova provides an easy to use interface for array data. The actual ova datastructure is a ref containing a vector containing ref and so it has a small footprint and is small.

2    Index

3    API

3.1    Data

Methods for setting up an ova and accessing it's data



ova ^

constructs an instance of an ova

v 2.1
(defn ova
  ([] (Ova. (ova-state)))
  ([coll]
     (let [ova (Ova. (ova-state))]
       (dosync
        (state/set ova coll))
       ova)))
link
(ova []) ;=> #ova [] (ova [1 2 3]) ;=> #ova [1 2 3] (<< (ova [{:id :1} {:id :2}])) => [{:id :1} {:id :2}]

ova? ^

checks if an object is an ova instance

v 2.4
(defn ova?
  [x]
  (instance? Ova x))
link
(ova? (ova [1 2 3])) => true

clone ^

creates an exact copy of the ova, including its watches

v 2.1
(defn clone
  [old]
  (let [ova (ova old)]
    (watch/copy ova old)
    (watch/copy ova old :ova)
    ova))
link
(def o (ova (range 10))) (watch/set o {:a (fn [_ _ _ _ _])}) (def other (clone o)) (<< other) => (<< o) (watch/list other) => (just {:a fn?})

init! ^

sets elements within an ova

v 2.1
(defn init!
  ([ova]
   (empty! ova))
  ([ova coll]
   (empty! ova)
   (state/set ova coll)
   ova))
link
(def o (ova [])) (->> (init! o [{:id :1 :val 1} {:id :2 :val 1}]) (dosync) (<<)) => [{:val 1, :id :1} {:val 1, :id :2}]

<< ^

outputs outputs the entire output of an ova

v 2.1
(defmacro <<
  [& forms]
  `(let [out# (dosync ~@forms)]
     (persistent! out#)))
link
(-> (ova [1 2 3 4 5]) (append! 6 7 8 9) (<<)) => [1 2 3 4 5 6 7 8 9] ;; can also use `persistent!` (-> (ova [1 2 3 4 5]) (persistent!)) => [1 2 3 4 5]

has? ^

checks that the ova contains elements matching a selector

v 2.1
(defn has?
  ([ova]
     (-> (select ova) empty? not))
  ([ova pchk]
      (-> (select ova pchk) empty? not)))
link
(def o (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}])) (has? o) => true (has? o 0) => true (has? o -1) => false (has? o [:id '((name) (bigint) (odd?))]) => true

select ^

grabs the selected ova entries as a set of values

v 2.1
(defn select
  ([ova] (set (selectv ova)))
  ([ova pchk]
     (set (selectv ova pchk))))
link
(def o (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}])) (select o) ;; no filters => #{{:id :1, :val 1} {:id :2, :val 1} {:id :3, :val 2} {:id :4, :val 2}} (select o 0) ;; by index => #{{:id :1 :val 1}} (select o #{1 2}) ;; by indices => #{{:id :2 :val 1} {:id :3 :val 2}} (select o #(even? (:val %))) ;; by function => #{{:id :3 :val 2} {:id :4 :val 2}} (select o [:val 1]) ;; by shorthand value => #{{:id :1 :val 1} {:id :2 :val 1}} (select o [:val even?]) ;; by shorthand function => #{{:id :3 :val 2} {:id :4 :val 2}} (select o #{[:id :1] ;; or selection [:val 2]}) => #{{:id :1 :val 1} {:id :3 :val 2} {:id :4 :val 2}} (select o [:id '((name) ;; by shorthand expression (bigint) (odd?))]) => #{{:id :1 :val 1} {:id :3 :val 2}}

selectv ^

grabs the selected ova entries as vector

v 2.1
(defn selectv
  ([ova]
      (persistent! ova))
  ([ova pchk]
    (cond (number? pchk)
          (if-let [val (suppress (get ova pchk))]
            (list val) ())

          (set? pchk) (mapcat #(selectv ova %) pchk)

          :else (filter
                 (fn [obj] (check?-> obj pchk obj))
                 ova))))
link
(def o (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}])) (selectv o) ;; no filters => [{:id :1, :val 1} {:id :2, :val 1} {:id :3, :val 2} {:id :4, :val 2}] (selectv o 0) ;; by index => [{:id :1 :val 1}] (selectv o [:val even?]) ;; by shorthand function => [{:id :3 :val 2} {:id :4 :val 2}] (selectv o [:id '((name) ;; by shorthand expression (bigint) (odd?))]) => [{:id :1 :val 1} {:id :3 :val 2}]

indices ^

instead of data, outputs the matching indices

v 2.1
(defn indices
  ([ova] (-> (count ova) range vec))
  ([ova pchk]
     (cond
      (number? pchk)
      (if (suppress (get ova pchk)) (list pchk) ())

      (set? pchk)
      (mapcat #(indices ova %) pchk)

      :else
      (filter (comp not nil?)
              (map-indexed (fn [i obj]
                             (check?-> obj pchk i))
                           ova)))))
link
(def o (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}])) (indices o) => [0 1 2 3] (indices o 0) => [0] (indices o [:val 1]) => [0 1] (indices o [:val even?]) => [2 3] (indices o [:val even? '(:id (name) (bigint)) odd?]) => [2] (indices o #{4}) => [] (indices o [:id :1]) => [0]

-invoke ^

the ova itself can be invoked to get the first match

v 2.1
link
(def o (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}])) ;; Simplified indices and :id lookups (o 0) => {:val 1, :id :1} (o :1) => {:val 1, :id :1} (:1 o) => {:val 1, :id :1} ;; Selector lookups (o :id :2) => {:val 1, :id :2} (o :val 2) => {:val 2, :id :3} (o :val even?) => {:val 2, :id :3} (o (list :id name) "4") => {:val 2, :id :4}

3.2    Manipulation

Methods for changing the data within an ova



!! ^

sets the value of selected data cells in the ova

v 2.1
(defn !!
  [ova pchk val]
  (smap! ova pchk (constantly val)))
link
(-> (range 5) (ova) (!! 1 0) (<<)) => [0 0 2 3 4] (-> (range 5) (ova) (!! #{1 2} 0) (<<)) => [0 0 0 3 4] (-> (range 5) (ova) (!! even? 0) (<<)) => [0 1 0 3 0]

!> ^

applies a set of transformations to a selector on the ova

v 2.1
(defmacro !>
  [ova pchk & forms]
  `(smap! ~ova ~pchk
          #(-> % ~@forms)))
link
(<< (!> (ova [{:id :1}]) 0 (assoc-in [:a :b] 1) (update-in [:a :b] inc) (assoc :c 3))) => [{:id :1 :c 3 :a {:b 2}}]

append! ^

like `conj!` but appends multiple array elements to the ova

v 2.1
(defn append!
  [ova & es]
  (concat! ova es))
link
(-> (ova [{:id :1 :val 1}]) (append! {:id :2 :val 1} {:id :3 :val 2}) (<<)) => [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2}]

concat! ^

works like `concat`, allows both array and ova inputs

v 2.1
(defn concat!
  [ova es & more]
  (let [_ (doseq [e es] (conj! ova e))]
    (if (seq more)
      (apply concat! ova more))
    ova))
link
(<< (concat! (ova [{:id :1 :val 1} {:id :2 :val 1}]) (ova [{:id :3 :val 2}]) [{:id :4 :val 2}])) => [{:val 1, :id :1} {:val 1, :id :2} {:val 2, :id :3} {:val 2, :id :4}]

empty! ^

empties an existing ova

v 2.1
(defn empty!
  [ova]
  (watch/clear ova {:type :ova})
  (watch/clear ova)
  (state/empty ova)
  ova)
link
(-> (ova [1 2 3 4 5]) (empty!) (<<)) => []

filter! ^

keep only elements that matches the selector

v 2.1
(defn filter!
  [ova pchk]
  (let [idx (set/difference
             (set (range (count ova)))
             (set (indices ova pchk)))]
    (delete-indices ova idx))
  ova)
link
(-> (ova [0 1 2 3 4 5 6 7 8 9]) (filter! #{'(< 3) '(> 6)}) (<<)) => [0 1 2 7 8 9]

insert! ^

inserts data at either the end of the ova or when given an index

v 2.1
(defn insert!
  [ova val & [i]]
  (let [rf (ref val)]
    (add-internal-watch ova rf)
    (alter (state/get ova) insert-fn rf i))
  ova)
link
(-> (ova (range 5)) (insert! 6) (<<)) => [0 1 2 3 4 6] (-> (ova (range 5)) (insert! 6) (insert! 5 5) (<<)) => [0 1 2 3 4 5 6]

map! ^

applies a function on the ova with relevent arguments

v 2.1
(defn map!
  [ova f & args]
  (doseq [rf @ova]
    (apply alter rf f args))
  ova)
link
(-> (ova [{:id :1} {:id :2}]) (map! assoc :val 1) (<<)) => [{:val 1, :id :1} {:val 1, :id :2}]

map-indexed! ^

applies a function that taking the data index as well as the data to all elements of the ova

v 2.1
(defn map-indexed!
  [ova f]
  (doseq [i (range (count ova))]
    (alter (nth @ova i) #(f i %) ))
  ova)
link
(-> (ova [{:id :1} {:id :2}]) (map-indexed! (fn [i m] (assoc m :val i))) (<<)) => [{:val 0, :id :1} {:val 1, :id :2}]

remove! ^

removes data from the ova that matches a selector

v 2.1
(defn remove!
  [ova pchk]
  (let [idx (set (indices ova pchk))]
    (delete-indices ova idx))
  ova)
link
(-> (ova (range 10)) (remove! odd?) (<<)) => [0 2 4 6 8] (-> (ova (range 10)) (remove! #{'(< 3) '(> 6)}) (<<)) => [3 4 5 6]

reverse! ^

reverses the order of elements in the ova

v 2.1
(defn reverse!
  [ova]
  (alter (state/get ova) (comp vec reverse))
  ova)
link
(-> (ova (range 5)) (reverse!) (<<)) => [4 3 2 1 0]

smap! ^

applies a function to only selected elements of the array

v 2.1
(defn smap!
  [ova pchk f & args]
  (let [idx (indices ova pchk)]
    (doseq [i idx]
      (apply alter (nth @ova i) f args)))
  ova)
link
(-> (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}]) (smap! [:val 1] update-in [:val] #(+ % 100)) (<<)) => [{:id :1, :val 101} {:id :2, :val 101} {:id :3, :val 2} {:id :4, :val 2}]

smap-indexed! ^

applies a function that taking the data index as well as the data to selected elements of the ova

v 2.1
(defn smap-indexed!
  [ova pchk f]
  (let [idx (indices ova pchk)]
    (doseq [i idx]
      (alter (nth @ova i) #(f i %))))
  ova)
link
(-> (ova [{:id :1 :val 1} {:id :2 :val 1} {:id :3 :val 2} {:id :4 :val 2}]) (smap-indexed! [:val 1] (fn [i m] (update-in m [:val] #(+ i 100 %)))) (<<)) => [{:id :1, :val 101} {:id :2, :val 102} {:id :3, :val 2} {:id :4, :val 2}]

sort! ^

sorts all data in the ova using a comparator function

v 2.1
(defn sort!
  ([ova] (sort! ova compare))
  ([ova comp]
     (alter (state/get ova)
            (fn [state]
              (->> state
                   (sort (fn [x y]
                          (comp @x @y)))
                   vec)))
     ova)
  ([ova sel comp]
     (alter (state/get ova)
            (fn [state]
              (->> state
                   (sort (fn [x y]
                           (comp (get-> @x sel) (get-> @y sel))))
                   vec)))
     ova))
link
(-> (ova [2 1 3 4 0]) (sort! >) (<<)) => [4 3 2 1 0] (-> (ova [2 1 3 4 0]) (sort! <) (<<)) => [0 1 2 3 4]

split ^

splits an ova into two based on a predicate

v 2.1
(defn split
  [ova pchk]
  (let [pos (clone ova)
        neg (clone ova)]
    (filter! pos pchk)
    (remove! neg pchk)
    {true pos false neg}))
link
(def o (ova (range 10))) (def sp (dosync (split o #{'(< 3) '(> 6)}))) (persistent! (sp true)) => [0 1 2 7 8 9] (persistent! (sp false)) => [3 4 5 6]

3.3    Watches

Because a ova is simply a ref, it can be watched for changes

(def ov (ova [0 1 2 3 4 5]))

(def output (atom []))
(add-watch ov
           :old-new
           (fn [ov k p n]
             (swap! output conj [(mapv deref p)
                                 (mapv deref n)])))

(do (dosync (sort! ov >))
    (deref output))
=> [[[0 1 2 3 4 5]
     [5 4 3 2 1 0]]]

3.3.1    Element Watch

Entire elements of the ova can be watched. A more substantial example can be seen in the scoreboard example:

(def ov (ova [0 1 2 3 4 5]))

(def output (atom []))

(watch/add      ;; key, ova, ref, previous, next
    ov :elem-old-new
    (fn [k o r p n]
      (swap! output conj [p n])))

(<< (!! ov 0 :zero))
=> [:zero 1 2 3 4 5]

(deref output)
=> [[0 :zero]]

(<< (!! ov 3 :three))
=> [:zero 1 2 :three 4 5]

(deref output)
=> [[0 :zero] [3 :three]]

3.3.2    Element Change Watch

The add-elem-change-watch function can be used to only notify when an element has changed.

(def ov (ova [0 1 2 3 4 5]))

(def output (atom []))

(watch/add   ;; key, ova, ref, previous, next
   ov :elem-old-new
   (fn [k o r p n]
     (swap! output conj [p n]))
   {:select identity
    :diff true})

(do (<< (!! ov 0 :zero))  ;; a pair is added to output
    (deref output))
=> [[0 :zero]]

(do (<< (!! ov 0 0))      ;; another pair is added to output
    (deref output))
=> [[0 :zero] [:zero 0]]

(do (<< (!! ov 0 0))      ;; no change to output
    (deref output))
=> [[0 :zero] [:zero 0]]

3.4    Clojure Protocols

ova implements the sequence protocol so it is compatible with all the bread and butter methods.

(def ov (ova (map (fn [n] {:val n})
                  (range 8))))

(seq ov)
=> '({:val 0} {:val 1} {:val 2}
     {:val 3} {:val 4} {:val 5}
     {:val 6} {:val 7})

(map #(update-in % [:val] inc) ov)
=> '({:val 1} {:val 2} {:val 3}
     {:val 4} {:val 5} {:val 6}
     {:val 7} {:val 8})

(last ov)
=> {:val 7}

(count ov)
=> 8

(get ov 0)
=> {:val 0}

(nth ov 3)
=> {:val 3}

(ov 0)
=> {:val 0}

(ov [:val] #{1 2 3}) ;; Gets the first that matches
=> {:val 1}

4    Selection

There are a number of ways elements in an ova can be selected. The library uses custom syntax to provide a shorthand for element selection. We use the function indices in order to give an examples of how searches can be expressed. Most of the functions like select, remove!, filter!, smap!, smap-indexed!, and convenience macros are all built on top of the indices function and so can be used accordingly once the convention is understood.

4.1    by index

The most straight-forward being the index itself, represented using a number.

(def ov (ova [{:v 0, :a {:c 4}}    ;; 0
              {:v 1, :a {:d 3}}    ;; 1
              {:v 2, :b {:c 2}}    ;; 2
              {:v 3, :b {:d 1}}])) ;; 3

(indices ov)           ;; return all indices
=> [0 1 2 3]

(indices ov 0)         ;; return indices of the 0th element
=> [0]

(indices ov 10)        ;; return indices of the 10th element
=> []

4.2    by value

A less common way is to search for indices by value.

(indices ov            ;; return indices of elements matching term
         {:v 0 :a {:c 4}})
=> [0]

4.3    by predicate

Most of the time, predicates are used. They allow selection of any element returning a non-nil value when evaluated against the predicate. Predicates can take the form of functions, keywords or list representation.

(indices ov #(get % :a))   ;; retur indicies where (:a elem) is non-nil

=> [0 1]

(indices ov #(:a %))       ;; more succint function form

=> [0 1]

(indices ov :a)            ;; keyword form, same as #(:a %)

=> [0 1]

(indices ov '(get :a))     ;; list form, same as #(get % :a)

=> [0 1]

(indices ov '(:a))         ;; list form, same as #(:a %)

=> [0 1]

4.4    by sets (or)

sets can be used to compose more complex searches by acting as an union operator over its members

(indices ov #{0 1})        ;; return indices 0 and 1
=> [0 1]

(indices ov #{:a 2})       ;; return indices of searching for both 2 and :a
=> (just [0 1 2] :in-any-order)

(indices ov #{'(:a)        ;; a more complex example
              #(= (:v %) 2)})
=> (just [0 1 2] :in-any-order)

4.5    by vectors (and)

vectors can be used to combine predicates for more selective filtering of elements

(indices ov [:v 0])        ;; return indicies where (:a ele) = {:c 4}
=> [0]

(indices ov [:v '(= 0)])   ;; return indicies where (:a ele) = {:c 4}
=> [0]

(indices ov [:a #(% :c)])  ;; return indicies where (:a ele) has a :c element
=> [0]

(indices ov [:a '(:c)])    ;; with list predicate
=> [0]

(indices ov [:a :c])       ;; with keyword predicate
=> [0]

(indices ov [:v odd?       ;; combining predicates
             :v '(> 1)])
=> [3]

(indices ov #{[:a :c] 2})  ;; used within a set

=> (just [0 2] :in-any-order)

4.6    accessing nested elements

When dealing with nested maps, a vector can be used instead of a keyword to specify rules of selection using nested elements

(indices ov [[:b :c] 2])   ;; with value
=> [2]

(indices ov [[:v] '(< 3)]) ;; with predicate
=> [0 1 2]

(indices ov [:v 2          ;; combining in vector
             [:b :c] 2])
=> [2]

5    Scoreboard

5.1    Data Setup

A scoreboard is used to track player attempts, scores and high-scores

(def scoreboard
  (ova [{:name "Bill" :attempts 0 :score {:all ()}}
        {:name "John" :attempts 0 :score {:all ()}}
        {:name "Sally" :attempts 0 :score {:all ()}}
        {:name "Fred"  :attempts 0 :score {:all ()}}]))

5.2    Notifier Setup

hara.event is used to listen for a :log signal and print out the :msg component of the event.

(require '[hara.event :as event])

(event/deflistener print-logger
  :log
  [msg]
  (println msg))

We set up two watch notifiers that signal and event.

  • one to print when an attempt has been made to play a game
  • one to print when there is a new highscore
(watch/add scoreboard
           :notify-attempt
           (fn [k o r p n]  ;; key, ova, ref, previous, next
             (event/signal [:log {:msg (str (:name @r) " is on attempt " n)}]))
           {:select :attempts})

(watch/add scoreboard
           :notify-high-score
           (fn [k o r p n]
             (event/signal [:log {:msg (str (:name @r) " has a new highscore of " n)}]))
           {:select [:score :highest]})

Of course, we could have added the println statement directly. However, in an actual application, events may be logged to file, emailed, beeped or read back to the user. Having a light-weight event signalling framework lets that decision be made much later

5.3    High Scores

Another watch is added to update the high score whenever it occurs.

(watch/add scoreboard
           :update-high-score
           (fn [k o r p n]
             (let [hs    [:score :highest]
                   high  (get-in @r hs)
                   current (first n)]
               (if (and current
                        (or (nil? high)
                            (< high current)))
                 (dosync (alter r assoc-in hs current)))))
           {:select [:score :all]})

5.4    Game Simulation

Functions for simulation are defined with the following parameters:

  • sim-game and sim-n-games are used to update the scoreboard
  • the time to finish the game is randomised
  • the wait-time between subsequent games is randomised
  • the score they get is also randomised
(defn sim-game [scoreboard name]
  ;; increment number of attempts
  (dosync (!> scoreboard [:name name]
              (update-in [:attempts] inc)))

  ;; simulate game playing time
  (Thread/sleep (rand-int 500))

  ;; conj the newest score at the start of the list
  (dosync (!> scoreboard [:name name]
              (update-in [:score :all] conj (rand-int 50)))))

(defn sim-n-games [scoreboard name n]
  (when (> n 0)
    (Thread/sleep (rand-int 500))
    (sim-game scoreboard name)
    (recur scoreboard name (dec n))))

5.5    Multithreading

To demonstrate the use of ova within a multithreaded environment, we run the following simulation

  • for each player on the scoreboard, they each play a random number of games simultaneously
  • the same scoreboard is used to keep track of state
(defn sim! [scoreboard]
  (let [names (map :name scoreboard)]
    (doseq [nm names]
      (future (sim-n-games scoreboard nm (+ 5 (rand-int 5)))))))

A sample simulation is show below:

(sim! scoreboard)

=> [Sally is on attempt 1
    Bill is on attempt 1
    Bill has a new highscore of 35
    Sally has a new highscore of 40
    John is on attempt 1
    Fred is on attempt 1

    .....

    Sally is on attempt 8
    Bill has a new highscore of 44
    Bill is on attempt 9
    Bill has a new highscore of 45]

(<< scoreboard)

=> [{:name "Bill", :attempts 9, :score {:highest 45, :all (45 44 36 9 24 25 39 18 3)}}
    {:name "John", :attempts 7, :score {:highest 49, :all (20 37 32 8 48 37 49)}}
    {:name "Sally", :attempts 8, :score {:highest 49, :all (1 48 7 12 43 0 39 49)}}
    {:name "Fred", :attempts 5, :score {:highest 47, :all (16 40 47 15 22)}}]