Skip to content

Commit 64cc9c7

Browse files
show errors in async flow
1 parent 27d8311 commit 64cc9c7

File tree

3 files changed

+71
-137
lines changed

3 files changed

+71
-137
lines changed

src/core/async/flow/example/before_his_wings_melted.clj

Lines changed: 41 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,8 @@
3232

3333
(def asynctopolis (flow/create-flow asynctopolis/config))
3434

35-
(show/flow-svg asynctopolis {:show-chans false
36-
:with-content false})
37-
38-
;; Coordinate asynchronous operations using `core.async`.
39-
;; While powerful, these operations can become hard to reason about as they grow in complexity.
40-
;; The `core.async.flow` library is a higher-level abstraction for modeling async processes as a Directed Acyclic Graph (DAG).
41-
;; We can visualize flows [flow-monitor](https://github.com/clojure/core.async.flow-monitor).
35+
(show/flow-svg asynctopolis nil {:show-chans false
36+
:with-content false})
4237

4338
;; He circled the skyline.
4439
;; He watched the channels breathe.
@@ -49,139 +44,73 @@
4944
;; each role in the great asynchronous allegiance.
5045

5146

52-
;; Let's walk through an exploration of such a flow.
53-
54-
;; ## What We'll Explore
55-
56-
;; In this notebook, we'll take a look at:
47+
;; As he flew lower he saw that processes are connected via channels.
5748

58-
;; 1. **Basic flow structure**: What does a flow look like under the hood?
59-
;; 2. **Static visualization**: How can we inspect its components?
60-
;; 3. **Dynamic interaction**: How do values move through the flow, and what happens when they do?
49+
(show/flow-svg asynctopolis nil {:chans-as-ports true
50+
:with-content false})
6151

62-
;; This flow models a small system involving aggregation, notification, and reporting.
63-
;; Internally, it consists of processes connected via channels.
64-
(show/flow-svg asynctopolis {:chans-as-ports true
65-
:with-content false})
52+
;; Are channels attached to a process, or are they part of it?
53+
;; You can choose to visualize them as distinct connectors,
54+
;; or as embedded roles within each process.
55+
;; Both perspectives reveal useful insights.
6656

67-
;; Are channels part of a process or not?
68-
;; You decide
57+
(show/flow-svg asynctopolis nil {:chans-as-ports false
58+
:with-content false})
6959

70-
(show/flow-svg asynctopolis {:chans-as-ports false
71-
:with-content false})
72-
73-
;; Let's dig deeper into the details
60+
;; Wanting to see more, Icarus swooped even lower to survey the processes.
7461

7562
(show/proc-table asynctopolis)
7663

77-
;; This table gives us a clear list of components in the flow, including their names
78-
;; and behaviors.
79-
80-
;; Next, let’s examine how these processes are **connected**.
64+
;; With a clearer understanding of the processes,
65+
;; he pondered how these processes are connected.
8166

8267
(show/conn-table asynctopolis)
8368

84-
;; Now we’re seeing the wiring: who talks to whom, and through what channels.
69+
;; In doing so he realized there are also 2 global channels,
70+
;; `report` and `error`:
8571

72+
(show/flow-svg asynctopolis nil {:chans-as-ports false
73+
:with-content false
74+
:show-global-chans true})
8675

87-
;; There are 2 global channels, `report` and `error`:
76+
;; Any process can put messages on `report` and `error`.
8877

89-
(show/flow-svg asynctopolis {:chans-as-ports false
90-
:with-content false
91-
:show-global-chans true})
78+
;; ## Street Level
9279

93-
;; Any process can put messages on `report` and `error`,
94-
;; which is why we didn't show them until now.
95-
96-
97-
;; ## 3. Running the Flow
98-
99-
;; Time to bring our flow to life!
100-
;; Calling `start` activates the processes and returns a map of the important channels for interaction.
80+
;; Reaching street level, he called out `start`!
81+
;; The flow responded, handing him report and error channels in a map.
10182

10283
(def chs (flow/start asynctopolis))
103-
(flow/resume asynctopolis)
104-
105-
;; We can now **inject values** into specific points in the flow.
106-
;; Think of this like poking the system and watching how it reacts.
107-
108-
;; We send a “poke” signal to the `aggregator` process.
109-
110-
(show/flow-svg asynctopolis {:chans-as-ports false
111-
:with-content false})
112-
113-
(flow/inject asynctopolis [:Tallystrix :poke] [true])
114-
115-
(show/flow-svg asynctopolis {:chans-as-ports false
116-
:with-content false})
117-
118-
;; We send a stat string that is designed to trigger an alert.
119-
120-
(flow/inject asynctopolis [:Tallystrix :stat] ["abc1000"])
12184

122-
;; We send a notification message into the `notifier`.
85+
;; But still, nothing stirred. So he yelled `resume`!
12386

124-
(flow/inject asynctopolis [:Claxxus :in] [:sandwich])
125-
126-
;; ## 4. Observing the Results
127-
128-
;; Our flow includes a `report-chan`, where summaries and reports might be sent.
87+
(flow/resume asynctopolis)
12988

13089
(def report-chan (:report-chan chs))
13190

132-
(flow/ping asynctopolis)
133-
13491
(async/poll! report-chan)
13592

136-
;; After pinging the system, we check if anything landed in the report channel.
137-
138-
;; We can also inspect the `error-chan`, where any issues in the flow are reported.
139-
140-
(def error-chan (:error-chan chs))
93+
(flow/inject asynctopolis [:Tallystrix :poke] [true])
14194

142-
(async/poll! error-chan)
95+
(flow/inject asynctopolis [:Tallystrix :stat] ["abc1000"])
14396

97+
(show/flow-svg asynctopolis chs {:chans-as-ports false
98+
:with-content false})
14499

145-
;; If something unexpected occurred (e.g., bad input or failed routing),
146-
;; this is where we’d find it.
100+
;; Tallystrix takes only numbers, `"abc1000"` was not acceptable.
147101

148-
(show/flow-svg asynctopolis {:chans-as-ports false
149-
:with-content false})
102+
;; ## Conclusion
150103

151-
(flow/stop asynctopolis)
104+
(flow/stop asynctopolis)
152105
(Thread/sleep 1)
153106

154-
;; TODO: wait for the report and error !<
155-
156-
157-
;; Flows implement the `Datafy` protocol so we can inspect them as data...
158-
;; Good luck with that, there's a lot of it
159-
160-
(datafy/datafy asynctopolis)
161-
162-
163-
164-
; ## Flow
165-
166-
; At its core, flow is a library for building concurrent, event-driven systems
167-
; using simple, communication-free functions.
168-
; It lets you wire up processes and connect them through channels,
169-
; while keeping control, error handling, and monitoring centralized and declarative.
170-
;
171-
;You define the structure as a directed graph—processes,
172-
; their inputs and outputs, and how they connect—and the flow system takes care of orchestration.
173-
; Your logic remains focused, while flow handles execution, coordination, and lifecycle concerns.
174-
;
175-
;All processes can be inspected or observed, and flows are fully data-driven, making them easy to reason about and visualize.
176-
; It's concurrent programming with structure—without the chaos.
177-
178-
; ## Summary
179-
180-
;; By constructing, inspecting, and interacting with a flow, we can understand the
181-
;; lifecycle and structure of asynchronous systems more clearly.
182-
;;
183-
;; This toolset provides a bridge between the abstract beauty of DAGs and the
184-
;; gritty realism of channel communication—unlocking both power and clarity
185-
;; in asynchronous Clojure code.
107+
;; Icarus realized that
108+
;; Flow is a library for building concurrent, event-driven systems
109+
;; out of simple, communication-free functions.
110+
;; Processes connect through channels.
111+
;; You define the structure as a directed graph.
112+
;; Flow takes care of orchestration.
113+
;; Flows are data-driven, easy to inspect, reason about and visualize.
114+
;; Then he wondered just how high could he fly?
186115

187-
;; Happy flowing!
116+
;; Happy flowing, and keep your feathers waxed!

src/core/async/flow/example/flow_show.clj

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
:category :clojure
88
:tags [:core.async :core.async.flow]}}}
99
(ns core.async.flow.example.flow-show
10-
(:require [clojure.datafy :as datafy]
10+
(:require [clojure.core.async :as async]
11+
[clojure.datafy :as datafy]
1112
[clojure.string :as str]
1213
[graph.layout.elk :as elk]
1314
[graph.layout.elk-svg :as elk-svg]))
@@ -53,36 +54,39 @@
5354
[:div (for [[k v] outs]
5455
[:div [:strong (name k)] ": " v])]]))}))
5556

56-
(defn elkg [flow {:keys [show-chans
57-
show-global-chans
58-
chans-as-ports
59-
with-content
60-
proc-width
61-
proc-height
62-
chan-width
63-
chan-height]
64-
:or {show-chans true
65-
show-global-chans false
66-
chans-as-ports true
67-
with-content false
68-
proc-width 60
69-
proc-height 30
70-
chan-width 30
71-
chan-height 12}}]
57+
(defn elkg [flow
58+
{:keys [error-chan report-chan]}
59+
{:keys [show-chans
60+
show-global-chans
61+
chans-as-ports
62+
with-content
63+
proc-width
64+
proc-height
65+
chan-width
66+
chan-height]
67+
:or {show-chans true
68+
show-global-chans false
69+
chans-as-ports true
70+
with-content false
71+
proc-width 60
72+
proc-height 30
73+
chan-width 30
74+
chan-height 12}}]
7275
(let [{:keys [conns procs chans]} (datafy/datafy flow)
7376
{:keys [ins outs error report]} chans
77+
err (some-> error-chan (async/poll!))
7478
all-proc-chans (into #{} cat conns)
7579
global-chans (when show-global-chans
7680
[{:id "report"
7781
:layoutOptions {:elk.layered.layering.layerConstraint "LAST_SEPARATE"}
7882
:width chan-width
7983
:height chan-height
80-
:labels [{:text (str "report" "(" (-> report :buffer :count) ")")}]}
84+
:labels [{:text "report"}]}
8185
{:id "error"
8286
:layoutOptions {:elk.layered.layering.layerConstraint "LAST_SEPARATE"}
8387
:width chan-width
8488
:height chan-height
85-
:labels [{:text (str "error" "(" (-> error :buffer :count) ")")}]}])
89+
:labels [{:text "error"}]}])
8690
proc-nodes (vec (for [[proc-key proc-chans] (group-by first all-proc-chans)]
8791
(let [{:keys [args proc]} (get procs proc-key)
8892
{:keys [desc]} proc
@@ -98,11 +102,11 @@
98102
chans (for [[p chan-k :as proc-chan] proc-chans
99103
:let [chan-name (name chan-k)
100104
{:as c :keys [buffer]} (or (get outs [p chan-k])
101-
(get ins [p chan-k]))]]
105+
(get ins [p chan-k]))]]
102106
{:id (id-for proc-chan)
103107
:width chan-width
104108
:height chan-height
105-
:labels [{:text (str chan-name " (" (:count buffer) ")")}]})]
109+
:labels [{:text chan-name}]})]
106110
{:id (id-for proc-key)
107111
:width proc-width
108112
:height proc-height
@@ -116,6 +120,7 @@
116120
(vec (when (and show-chans chans-as-ports)
117121
chans))})))]
118122
{:id "G"
123+
:fill (when err "red")
119124
:layoutOptions {:elk.algorithm "layered"
120125
:elk.direction "RIGHT"
121126
:elk.hierarchyHandling "INCLUDE_CHILDREN"}
@@ -137,7 +142,7 @@
137142
:sources [(id-for from)]
138143
:targets [(id-for to)]})))}))
139144

140-
(defn flow-svg [flow options]
141-
(-> (elkg flow options)
145+
(defn flow-svg [flow chs options]
146+
(-> (elkg flow chs options)
142147
(elk/layout)
143148
(elk-svg/render-graph)))

src/graph/layout/elk_svg.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@
4343
[:path {:d "M0,0 L0,6 L6,3 z"
4444
:fill (:edge-shape-stroke default-styles)}]])
4545

46-
(defn shape [{:keys [x y width height]}]
46+
(defn shape [{:keys [x y width height fill]}]
4747
[:rect {:width width
4848
:height height
4949
:stroke (:node-shape-stroke default-styles)
50-
:fill (:node-shape-fill default-styles)}])
50+
:fill (or fill (:node-shape-fill default-styles))}])
5151

5252
;; TODO: good? bad?
5353
(defn fo-div [width height content]

0 commit comments

Comments
 (0)