Skip to content

Commit 260100d

Browse files
adds print-object idea, and initial flow post
1 parent 233918d commit 260100d

File tree

4 files changed

+494
-4
lines changed

4 files changed

+494
-4
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
(ns clojure.plus.print.objects.objects-and-protocols
2+
(:require [clojure.core.async :as async]
3+
[clojure.datafy :as datafy]
4+
[clojure.string :as str]
5+
[core.async.flow.example.stats :as stats]))
6+
7+
;; Clojure plus includes print-methods to improve printing objects
8+
9+
(require 'clojure+.print)
10+
^:kind/hidden
11+
(clojure+.print/install!)
12+
13+
;; Once activated, we can print functions, atoms, namespaces, and more sensibly
14+
15+
inc
16+
(atom {})
17+
*ns*
18+
19+
;; Clojure Plus only adds printers for pre-defined types.
20+
;; No printer is provided for Object,
21+
;; so it falls back to Clojure's default printing method:
22+
23+
(defmethod print-method Object [x ^java.io.Writer w]
24+
(#'clojure.core/print-object x w))
25+
26+
;; There are plenty of objects left over that print messily
27+
28+
(def my-chan
29+
(async/chan))
30+
31+
my-chan
32+
33+
;; Clojure's Object print-method calls print-object,
34+
;; which prints the ugly [type-str hash-str type-str].
35+
36+
;; Clojure Plus could clean that up like this:
37+
38+
(defmethod print-method Object [x ^java.io.Writer w]
39+
(.write w "#object ")
40+
(.write w (.getName (class x))))
41+
42+
my-chan
43+
44+
;; Much nicer!
45+
;; This solves the majority of "left over" situations.
46+
;; However, there are still some ugly ones left...
47+
48+
(def stats-flow
49+
(stats/create-flow))
50+
51+
stats-flow
52+
53+
;; What is this? It's a reified object that implements protocols.
54+
;; We can see this by the $reify part at the end.
55+
;; The description is not terrible, at least we know where it was made,
56+
;; which hints that it must be a flow.
57+
;; Can we do better?
58+
59+
;; AFAIK the only way to check what protocols an object satisfies
60+
;; is to call `satisfies?` for every possible protocol:
61+
62+
(defn all-protocol-vars [x]
63+
(->> (all-ns)
64+
(mapcat ns-publics)
65+
(vals)
66+
(keep #(-> % meta :protocol))
67+
(distinct)
68+
(filter #(satisfies? @% x))))
69+
70+
;; On the one hand, this is concerning for performance.
71+
;; On the other hand, at my REPL I don't care about that, it's faster than I can notice.
72+
;; Leaving aside those concerns, it returns quite a long list...
73+
74+
(all-protocol-vars stats-flow)
75+
76+
;; But notice that one of them; `#'clojure.core.async.flow.impl.graph/Graph`
77+
;; just feels like it is the one we care about most.
78+
;; Furthermore, it shares a similar namespace with the classname.
79+
;; Let's try matching by the namespace...
80+
81+
(defn var-ns-name [v]
82+
(-> (meta v) (:ns) (ns-name)))
83+
84+
(defn ns-match? [p x]
85+
(-> (var-ns-name p)
86+
(str/starts-with? (.getPackageName (class x)))))
87+
88+
(defn protocol-ns-matches [x]
89+
(filter #(ns-match? % x) (all-protocol-vars x)))
90+
91+
(protocol-ns-matches stats-flow)
92+
93+
;; Nice.
94+
;; In my opinion this is more representative of the object.
95+
;; The `#'` out front is unnecessary and can be removed...
96+
97+
(defn var-sym [v]
98+
(let [m (meta v)]
99+
(symbol (str (ns-name (:ns m))) (str (:name m)))))
100+
101+
(defn protocol-ns-match-names [x]
102+
(->> (protocol-ns-matches x)
103+
(map var-sym)))
104+
105+
(protocol-ns-match-names stats-flow)
106+
107+
;; The other protocol of interest is Datafiable,
108+
;; because it indicates I can get a data representation if I would like to.
109+
110+
(datafy/datafy stats-flow)
111+
112+
;; I think this one is so helpful that it should always be shown on objects,
113+
;; regardless of their type of other protocols,
114+
;; as a hint that it is possible to get more information.
115+
;; I wouldn't want to print them as data by default, because it would be too spammy.
116+
;; And checking Datafiable is much less of a performance concern.
117+
118+
(satisfies? clojure.core.protocols/Datafiable stats-flow)
119+
120+
;; But there is a big problem... everything is Datafiable...
121+
122+
(satisfies? clojure.core.protocols/Datafiable (Object.))
123+
124+
;; So there is no way for us to know whether `datafy/datafy` will do anything useful or not.
125+
;; Sad.
126+
;; But we can improve the print-method to show protocols,
127+
;; bearing in mind it is a performance concern.
128+
129+
(defmethod print-method Object [x ^java.io.Writer w]
130+
(let [class-name (.getName (class x))
131+
r (str/includes? class-name "$reify")
132+
p (when r (first (protocol-ns-match-names x)))]
133+
(.write w (if p
134+
"#reify "
135+
"#object "))
136+
(.write w (if p
137+
(str p)
138+
class-name))))
139+
140+
my-chan
141+
stats-flow
142+
143+
;; Even if we don't care to improve reify (due to performance),
144+
;; I think the Object printer should still be improved to align with the other printers.
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
(ns core.async.flow.example.stats)
2+
3+
;; # Core async flow stats example
4+
;;
5+
;; Adapted from https://github.com/puredanger/flow-example
6+
7+
^:kind/video
8+
{:youtube-id "lXFwf3O4BVY"
9+
:iframe-width "100%"}
10+
11+
;; Necessary dependencies
12+
13+
(clojure.repl.deps/add-libs '{org.clojure/core.async {:mvn/version "1.9.808-alpha1"}})
14+
(clojure.repl.deps/add-libs '{io.github.clojure/core.async.flow-monitor {:git/tag "v0.1.1" :git/sha "61e8d31"}})
15+
16+
(require
17+
'[clojure.core.async :as a]
18+
'[clojure.core.async.flow :as flow]
19+
'[clojure.core.async.flow-monitor :as mon])
20+
21+
(defn stat-gen
22+
"Generates a random value between min (inclusive) and max (exclusive)
23+
and writes it to out chan, waiting wait ms between until stop-atom is flagged."
24+
([out min max wait stop-atom]
25+
(loop []
26+
(let [val (+ min (rand-int (- max min)))
27+
put (a/>!! out val)]
28+
;(println "stat-gen" (System/identityHashCode stop-atom) val put (not @stop-atom))
29+
(when (and put (not @stop-atom))
30+
(^[long] Thread/sleep wait)
31+
(recur))))))
32+
33+
(defn source
34+
"Source proc for random stats"
35+
;; describe
36+
([] {:params {:min "Min value to generate"
37+
:max "Max value to generate"
38+
:wait "Time in ms to wait between generating"}
39+
:outs {:out "Output channel for stats"}})
40+
41+
;; init
42+
([args]
43+
(assoc args
44+
::flow/in-ports {:stat (a/chan 100)}
45+
:stop (atom false)))
46+
47+
;; transition
48+
([{:keys [min max wait ::flow/in-ports] :as state} transition]
49+
;(println "transition" transition)
50+
(case transition
51+
::flow/resume
52+
(let [stop-atom (atom false)]
53+
(future (stat-gen (:stat in-ports) min max wait stop-atom))
54+
(assoc state :stop stop-atom))
55+
56+
(::flow/pause ::flow/stop)
57+
(do
58+
(reset! (:stop state) true)
59+
state)))
60+
61+
;; transform
62+
([state in msg]
63+
;(println "source transform" in msg)
64+
[state (when (= in :stat) {:out [msg]})]))
65+
66+
(defn aggregator
67+
;; describe
68+
([] {:params {:min "Min value, alert if lower"
69+
:max "Max value, alert if higher"}
70+
:ins {:stat "Channel to receive stat values"
71+
:poke "Channel to poke when it is time to report a window of data to the log"}
72+
:outs {:alert "Notify of value out of range {:val value, :error :high|:low"}
73+
:workload :compute
74+
})
75+
76+
;; init
77+
([args] (assoc args :vals []))
78+
79+
;; transition
80+
([state transition] state)
81+
82+
;; transform
83+
([{:keys [min max vals] :as state} input-id msg]
84+
(case input-id
85+
:stat (let [state' (assoc state :vals (conj vals msg))
86+
msgs (cond
87+
(< msg min) {:alert [{:val msg, :error :low}]}
88+
(< max msg) {:alert [{:val msg, :error :high}]}
89+
:else nil)]
90+
[state' msgs])
91+
:poke [(assoc state :vals [])
92+
{::flow/report (if (empty? vals)
93+
[{:count 0}]
94+
[{:avg (/ (double (reduce + vals)) (count vals))
95+
:count (count vals)}])}]
96+
[state nil])))
97+
98+
(comment
99+
;; test aggregator alert case - no channels involved
100+
(let [state {:min 1 :max 5 :vals []}
101+
[state' msgs'] (aggregator state :stat 100)]
102+
(assert (= msgs' {:alert [{:val 100, :error :high}]})))
103+
)
104+
105+
106+
(defn scheduler
107+
;; describe
108+
([] {:params {:wait "Time to wait between pokes"}
109+
:outs {:out "Poke channel, will send true when the alarm goes off"}})
110+
111+
;; init
112+
([args]
113+
(assoc args
114+
::flow/in-ports {:alarm (a/chan 10)}
115+
:stop (atom false)))
116+
117+
;; transition
118+
([{:keys [wait ::flow/in-ports] :as state} transition]
119+
;(println "scheduler transition" transition state transition)
120+
(case transition
121+
::flow/resume
122+
(let [stop-atom (atom false)]
123+
(future (loop []
124+
(let [put (a/>!! (:alarm in-ports) true)]
125+
(when (and put (not @stop-atom))
126+
(^[long] Thread/sleep wait)
127+
(recur)))))
128+
(assoc state :stop stop-atom))
129+
130+
(::flow/pause ::flow/stop)
131+
(do
132+
(reset! (:stop state) true)
133+
state)))
134+
135+
;; transform
136+
([state in msg]
137+
[state (when (= in :alarm) {:out [true]})]))
138+
139+
(defn printer
140+
;; describe
141+
([] {:params {:prefix "Log message prefix"}
142+
:ins {:in "Channel to receive messages"}})
143+
144+
;; init
145+
([state] state)
146+
147+
;; transition
148+
([state _transition] state)
149+
150+
;; transform
151+
([{:keys [prefix] :as state} _in msg]
152+
(println prefix msg)
153+
[state nil]))
154+
155+
(def config
156+
{:procs {:generator {:args {:min 0 :max 12 :wait 500} :proc (flow/process #'source)}
157+
:aggregator {:args {:min 1 :max 10} :proc (flow/process #'aggregator)}
158+
:scheduler {:args {:wait 3000} :proc (flow/process #'scheduler)}
159+
:notifier {:args {:prefix "Alert: "} :proc (flow/process #'printer)
160+
:chan-opts {:in {:buf-or-n (a/sliding-buffer 3)}}}}
161+
:conns [[[:generator :out] [:aggregator :stat]]
162+
[[:scheduler :out] [:aggregator :poke]]
163+
[[:aggregator :alert] [:notifier :in]]]})
164+
165+
(defn create-flow
166+
[]
167+
(flow/create-flow config))
168+
169+
(comment
170+
(def f (create-flow))
171+
(def chs (flow/start f))
172+
(flow/resume f)
173+
(flow/pause f)
174+
(flow/stop f)
175+
176+
(def server (mon/start-server {:flow f}))
177+
(mon/stop-server server)
178+
179+
@(flow/inject f [:aggregator :poke] [true])
180+
@(flow/inject f [:aggregator :stat] ["abc1000"]) ;; trigger an alert
181+
@(flow/inject f [:notifier :in] [:sandwich])
182+
183+
(def report-chan (:report-chan chs))
184+
(flow/ping f)
185+
(a/poll! report-chan)
186+
(def error-chan (:error-chan chs))
187+
(a/poll! error-chan)
188+
189+
(flow/stop f)
190+
(a/close! stat-chan)
191+
192+
@(flow/inject f [:aggregator :poke] [true])
193+
194+
(require '[clojure.datafy :as datafy])
195+
(datafy/datafy f)
196+
197+
(require '[clojure.core.async.flow-static :refer [graph]])
198+
(graph f)
199+
200+
)

0 commit comments

Comments
 (0)