Skip to content

Commit 08393ff

Browse files
Merge pull request #20 from ClojureCivitas/async-flow
progressing the async flow
2 parents 694b393 + e877e08 commit 08393ff

File tree

3 files changed

+262
-61
lines changed

3 files changed

+262
-61
lines changed
Lines changed: 244 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,101 @@
11
^{:kindly/hide-code true
2-
:clay {:title "Core Async Flow Stats Example"
2+
:clay {:title "Stats and Signals in the Flow of Asynctopolis"
33
:quarto {:author [:alexmiller :timothypratley]
44
:draft true
55
:type :post
6-
:date "2025-05-15"
6+
:date "2025-05-1"
77
:category :clojure
88
:tags [:core.async :core.async.flow]}}}
99
(ns core.async.flow.example.stats
1010
(:require [clojure.core.async :as a]
1111
[clojure.core.async.flow :as flow]
12-
[clojure.core.async.flow-monitor :as mon]))
13-
14-
;; Recently Alex provided a video walkthrough on how to use `core.async.flow` to build a stats monitor.
12+
[clojure.core.async.flow-static :as flow-static]
13+
[tablecloth.api :as tc]
14+
[scicloj.tableplot.v1.plotly :as plotly]
15+
[clojure.print-object.remove-extraneous]
16+
[clojure.datafy :as datafy]))
17+
18+
;; Welcome to Asynctopolis, a city where agents act on signals, not orders.
19+
;; Here, unseen agents pass messages, track patterns, and sound alarms when the moment calls.
20+
;; No one oversees the whole city, yet everything flows.
21+
;;
22+
;; Beneath it all hums the Stats Core Async Flow,
23+
;; a network of processes working together without ever meeting.
24+
;; Today, you'll meet the agents of this asynchronous allegiance.
25+
26+
;; This code is adapted from [Alex's stats flow example](https://github.com/puredanger/flow-example),
27+
;; used for his video walkthrough.
1528

1629
^:kind/video ^:kindly/hide-code
1730
{:youtube-id "lXFwf3O4BVY"
1831
:iframe-width "100%"}
1932

20-
;; This notebook is adapted from [his code](https://github.com/puredanger/flow-example).
33+
;; Above us in the sky flies Talon the Stat Hawk.
34+
;; Sleek, silent, and tireless.
35+
;; With a glint in his eye and wings tipped in probability,
36+
;; he soars into the realm of the unknowable every half second,
37+
;; returning with a fresh stat clutched in his talons.
38+
;; He doesn't question, he doesn't falter.
39+
;; He circles over the range from min to max, plucks a random integer,
40+
;; and drops it onto a channel without ceremony.
2141

22-
(defn stat-gen
42+
(defn Talon
2343
"Generates a random value between min (inclusive) and max (exclusive)
2444
and writes it to out chan, waiting wait ms between until stop-atom is flagged."
2545
([out min max wait stop-atom]
2646
(loop []
2747
(let [val (+ min (rand-int (- max min)))
2848
put (a/>!! out val)]
29-
;(println "stat-gen" (System/identityHashCode stop-atom) val put (not @stop-atom))
3049
(when (and put (not @stop-atom))
3150
(^[long] Thread/sleep wait)
3251
(recur))))))
3352

34-
(defn source
53+
;; Born of wind and randomness, Talon is no ordinary bird.
54+
;; He executes his mission with the rhythm and the grace of chance incarnate.
55+
;; Talon embodies the eternal recurrence of the loop.
56+
;; An autonomous creature of purpose, relentless and unthinking.
57+
;; To be a process is to endure.
58+
;; Ever watchful, speaking in channels.
59+
60+
;; Fly Talon! Collect samples. Let's see what distribution you bring.
61+
62+
(let [c (a/chan)
63+
stop (atom false)
64+
n 100]
65+
(future (Talon c 0 20 0 stop))
66+
(let [samples (vec (repeatedly n (fn [] (a/<!! c))))]
67+
(reset! stop true)
68+
(-> (tc/dataset {:index (range n)
69+
:sample samples})
70+
(plotly/base {:=x :index
71+
:=y :sample
72+
:=title "The prey of Talon"})
73+
(plotly/layer-point))))
74+
75+
;; You have sampled fairly, Talon.
76+
77+
;; Talon operates at the behest of the city's Generator.
78+
79+
;; ## Meet Randomius Maximus, the Generator
80+
;;
81+
;; In a stone tower at the edge of the async city lives Randomius Maximus.
82+
;; Robed in numbers, crowned with entropy, keeper of the unceasing stream.
83+
;; He does not wander. He does not speak.
84+
;; He gestures, and Talon flies.
85+
;;
86+
;; With a sweep of his hand, he dispatches his hawk to gather truths from the swirling chaos.
87+
;; Min and Max are his decree.
88+
;; Wait is his tempo.
89+
;; As long as his flow runs, the stats will come.
90+
91+
;; To be a true citizen of Asynctopolis is to be known as a process.
92+
;; To follow the sacred cycle of Vita Processus:
93+
;; Describe your duties.
94+
;; Initialize your station.
95+
;; Transition with order.
96+
;; Transform with purpose.
97+
98+
(defn Randomius
3599
"Source proc for random stats"
36100
;; describe
37101
([] {:params {:min "Min value to generate"
@@ -47,11 +111,10 @@
47111

48112
;; transition
49113
([{:keys [min max wait :clojure.core.async.flow/in-ports] :as state} transition]
50-
;(println "transition" transition)
51114
(case transition
52115
:clojure.core.async.flow/resume
53116
(let [stop-atom (atom false)]
54-
(future (stat-gen (:stat in-ports) min max wait stop-atom))
117+
(future (Talon (:stat in-ports) min max wait stop-atom))
55118
(assoc state :stop stop-atom))
56119

57120
(:clojure.core.async.flow/pause :clojure.core.async.flow/stop)
@@ -61,18 +124,66 @@
61124

62125
;; transform
63126
([state in msg]
64-
;(println "source transform" in msg)
65127
[state (when (= in :stat) {:out [msg]})]))
66128

67-
(defn aggregator
129+
;; Randomius, describe your duties!
130+
(Randomius)
131+
132+
;; Initialize your station!
133+
(def state
134+
(atom (Randomius {:min 10
135+
:max 20
136+
:wait 1})))
137+
^:kind/println
138+
@state
139+
140+
;; Transition with order.
141+
(swap! state Randomius :clojure.core.async.flow/resume)
142+
143+
;; Talon is flying.
144+
(-> (:clojure.core.async.flow/in-ports @state)
145+
(:stat)
146+
(a/<!!))
147+
148+
;; Transform with purpose.
149+
(swap! state
150+
(fn [state]
151+
(let [[state step] (Randomius state :stat "I transform, therefore I am")]
152+
(println step)
153+
state)))
154+
;; I see you wish to send a message to `stat`.
155+
;; Be wary in the future, speak only numbers to those who seek stats.
156+
157+
158+
;; Well done, Randomius.
159+
;; You are a true citizen.
160+
;; Now rest.
161+
(swap! state Randomius :clojure.core.async.flow/stop)
162+
163+
164+
;; ## Meet Tallystrix, the Whispering Aggregator
165+
;;
166+
;; In the marble shadows of the Hall of Measures,
167+
;; Tallystrix gathers numbers in her obsidian basin.
168+
;; She listens not to the sky, but to the `stat` channel,
169+
;; where strange numbers arrive without explanation.
170+
;; She lets them settle, silent and still.
171+
;;
172+
;; She says nothing—until the bell rings.
173+
;; Then, with a tilt of the bowl and a whisper of reckoning,
174+
;; she releases the average to those who asked.
175+
;;
176+
;; If a number is too high or too low, she sends a warning,
177+
;; a flare in the async night.
178+
179+
(defn Tallystrix
68180
;; describe
69181
([] {:params {:min "Min value, alert if lower"
70182
:max "Max value, alert if higher"}
71183
:ins {:stat "Channel to receive stat values"
72184
:poke "Channel to poke when it is time to report a window of data to the log"}
73185
:outs {:alert "Notify of value out of range {:val value, :error :high|:low"}
74-
:workload :compute
75-
})
186+
:workload :compute})
76187

77188
;; init
78189
([args] (assoc args :vals []))
@@ -96,15 +207,27 @@
96207
:count (count vals)}])}]
97208
[state nil])))
98209

99-
(comment
100-
;; test aggregator alert case - no channels involved
101-
(let [state {:min 1 :max 5 :vals []}
102-
[state' msgs'] (aggregator state :stat 100)]
103-
(assert (= msgs' {:alert [{:val 100, :error :high}]})))
104-
)
210+
;; Tallystrix, what messages have you?
211+
212+
(let [state {:min 1 :max 5 :vals []}
213+
[state' msgs'] (Tallystrix state :stat 100)]
214+
msgs')
215+
216+
;; Well alerted.
217+
;; Your transform is sound.
105218

219+
;; ## Meet Chronon, the Scheduler of Bells
106220

107-
(defn scheduler
221+
;; In a chamber just outside the Hall of Measures,
222+
;; Chronon stands beside a great brass bell.
223+
;; Every few thousand milliseconds, he raises his staff and strikes it.
224+
;; A chime ripples through the channels and stirs the Aggregator within.
225+
226+
;; He does not wait for thanks. He does not miss a beat.
227+
;; His duty is rhythm. His gift is regularity.
228+
;; And with every ring, the silence grows wiser.
229+
230+
(defn Chronon
108231
;; describe
109232
([] {:params {:wait "Time to wait between pokes"}
110233
:outs {:out "Poke channel, will send true when the alarm goes off"}})
@@ -117,7 +240,6 @@
117240

118241
;; transition
119242
([{:keys [wait :clojure.core.async.flow/in-ports] :as state} transition]
120-
;(println "scheduler transition" transition state transition)
121243
(case transition
122244
:clojure.core.async.flow/resume
123245
(let [stop-atom (atom false)]
@@ -137,7 +259,22 @@
137259
([state in msg]
138260
[state (when (= in :alarm) {:out [true]})]))
139261

140-
(defn printer
262+
;; Chronon has no familiar to do his work,
263+
;; and listens to no-one.
264+
265+
;; ## Meet Claxxus, the Notifier, the Herald
266+
267+
;; At the city’s edge stands Claxxus, cloaked in red and brass,
268+
;; eyes ever on the flame that signals alarm.
269+
;; He does not gather, he does not measure,
270+
;; he only declares.
271+
;;
272+
;; When Tallystrix sends a flare,
273+
;; Claxxus steps forward to speak.
274+
;; He raises his voice for all to hear:
275+
;; “Out of range!”
276+
277+
(defn Claxxus
141278
;; describe
142279
([] {:params {:prefix "Log message prefix"}
143280
:ins {:in "Channel to receive messages"}})
@@ -153,49 +290,101 @@
153290
(println prefix msg)
154291
[state nil]))
155292

293+
;; Cursed to know only how to shout.
294+
295+
(Claxxus {:prefix "ERROR:"} :in "Out of range!")
296+
297+
;; ## The Asynchronous Allegiance
298+
;;
299+
;; All these roles are bound together in a flow,
300+
;; a living graph of asynchronous collaboration.
301+
;;
302+
;; Randomius Maximus generates.
303+
;; Chronon keeps the beat.
304+
;; Tallystrix listens and computes.
305+
;; Claxxus alerts.
306+
;;
307+
;; They never meet.
308+
;; They never speak.
309+
;; Yet they move as one.
310+
;;
311+
;; This is an allegiance, asynchronous and unseen.
312+
;; Held together by channels, purpose, and trust.
313+
156314
(def config
157-
{:procs {:generator {:args {:min 0 :max 12 :wait 500} :proc (flow/process #'source)}
158-
:aggregator {:args {:min 1 :max 10} :proc (flow/process #'aggregator)}
159-
:scheduler {:args {:wait 3000} :proc (flow/process #'scheduler)}
160-
:notifier {:args {:prefix "Alert: "} :proc (flow/process #'printer)
315+
{:procs {:generator {:args {:min 0 :max 12 :wait 500}
316+
:proc (flow/process #'Randomius)}
317+
:aggregator {:args {:min 1 :max 10}
318+
:proc (flow/process #'Tallystrix)}
319+
:scheduler {:args {:wait 3000}
320+
:proc (flow/process #'Chronon)}
321+
:notifier {:args {:prefix "Alert: "}
322+
:proc (flow/process #'Claxxus)
161323
:chan-opts {:in {:buf-or-n (a/sliding-buffer 3)}}}}
162324
:conns [[[:generator :out] [:aggregator :stat]]
163325
[[:scheduler :out] [:aggregator :poke]]
164326
[[:aggregator :alert] [:notifier :in]]]})
165327

166-
(defn create-flow
167-
[]
168-
(flow/create-flow config))
328+
^:kind/hiccup
329+
[:iframe {:width "100%"
330+
:height "600px"
331+
:srcdoc (flow-static/template config nil)}]
332+
333+
;; The Flow creates them, calling upon their civic duties,
334+
;; Describe your duties.
335+
;; Initialize your station.
336+
337+
(def f (flow/create-flow config))
338+
339+
;; The city is ready, but not yet in action.
340+
341+
(datafy/datafy f)
342+
343+
(def chs (flow/start f))
344+
345+
chs
346+
347+
;; `report-chan` and `error-chan` are special conduits in the Flow.
348+
;; Tallystrix sends her summaries to `report`, dutifully.
349+
;; When something breaks it flows to `error`.
350+
351+
;; Claxxus does not speak of such failures.
352+
;; He is for alerts.
353+
;; Thresholds breached, events of note, things the city must hear.
354+
355+
;; The city breathes, the asynchronous allegiance stirs.
356+
;; Transition with order.
357+
358+
(flow/resume f)
359+
360+
;; Transform with purpose.
361+
362+
(flow/inject f [:aggregator :poke] [true])
363+
(flow/inject f [:aggregator :stat] ["abc1000"]) ;; trigger an alert
364+
(flow/inject f [:notifier :in] [:sandwich])
169365

170-
(comment
171-
(def f (create-flow))
172-
(def chs (flow/start f))
173-
(flow/resume f)
174-
(flow/pause f)
175-
(flow/stop f)
366+
(a/poll! (:report-chan chs))
367+
(a/poll! (:error-chan chs))
176368

177-
(def server (mon/start-server {:flow f}))
178-
(mon/stop-server server)
369+
;; The flow can coordinate peace.
179370

180-
@(flow/inject f [:aggregator :poke] [true])
181-
@(flow/inject f [:aggregator :stat] ["abc1000"]) ;; trigger an alert
182-
@(flow/inject f [:notifier :in] [:sandwich])
371+
(flow/pause f)
183372

184-
(def report-chan (:report-chan chs))
185-
(flow/ping f)
186-
(a/poll! report-chan)
187-
(def error-chan (:error-chan chs))
188-
(a/poll! error-chan)
373+
(flow/stop f)
189374

190-
(flow/stop f)
191-
(a/close! stat-chan)
375+
;; The city falls silent.
192376

193-
@(flow/inject f [:aggregator :poke] [true])
377+
;; Thus does Asynctopolis coordinate,
378+
;; thus is Vita Processus observed.
194379

195-
(require '[clojure.datafy :as datafy])
196-
(datafy/datafy f)
380+
;; The flow of Asynctopolis is a choreography of concurrent logic,
381+
;; where each part knows just enough to play its role, and no more.
382+
;; It's a quiet network of intent.
383+
;; Each role with a narrow purpose, joined by shared channels and rhythm.
197384

198-
(require '[clojure.core.async.flow-static :refer [graph]])
199-
(graph f)
385+
;; You can observe its work as it happens.
386+
;; You can inspect, poke, pause, and resume.
387+
;; Buffers shape its tempo, and transitions reveal its state.
200388

201-
)
389+
;; In Asynctopolis, no one rules,
390+
;; yet the system flows precisely, predictably, asynchronously.

0 commit comments

Comments
 (0)