Skip to content

Commit d720670

Browse files
committed
Add AtomicThreadPool#active_count
1 parent 65bbb87 commit d720670

File tree

2 files changed

+40
-6
lines changed

2 files changed

+40
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## [Unreleased]
22

3+
- Add `AtomicThreadPool#active_count`
34
- Make native extension methods private
45
- Add YARD documentation and inline RBS type signatures
56
- Replace ruby 3.5 references with 4.0

lib/atomic-ruby/atomic_thread_pool.rb

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ module AtomicRuby
2929
#
3030
# @example Monitoring pool state
3131
# pool = AtomicThreadPool.new(size: 3)
32-
# puts pool.length #=> 3
33-
# puts pool.queue_length #=> 0
32+
# puts pool.length #=> 3
33+
# puts pool.queue_length #=> 0
34+
# puts pool.active_count #=> 0
3435
#
3536
# 5.times { pool << proc { sleep(1) } }
36-
# puts pool.queue_length #=> 2 (3 workers busy, 2 queued)
37+
# puts pool.queue_length #=> 2 (3 workers busy, 2 queued)
38+
# puts pool.active_count #=> 3 (3 workers processing)
3739
#
3840
# @note This class is NOT Ractor-safe as it contains mutable thread state
3941
# that cannot be safely shared across ractors.
@@ -69,7 +71,8 @@ def initialize(size:, name: nil)
6971
@name = name
7072

7173
@state = Atom.new(queue: [], shutdown: false)
72-
@started_threads = Atom.new(0)
74+
@started_thread_count = Atom.new(0)
75+
@active_thread_count = Atom.new(0)
7376
@threads = []
7477

7578
start
@@ -151,6 +154,33 @@ def queue_length
151154
# @rbs () -> Integer
152155
alias queue_size queue_length
153156

157+
# Returns the number of worker threads currently executing work.
158+
#
159+
# This represents threads that have picked up a work item and are
160+
# actively processing it. The count includes threads in the middle
161+
# of executing work.call, but excludes threads that are idle or
162+
# waiting for work.
163+
#
164+
# @return [Integer] The number of threads actively processing work
165+
#
166+
# @example Monitor active workers
167+
# pool = AtomicThreadPool.new(size: 4)
168+
# puts pool.active_count #=> 0
169+
#
170+
# 5.times { pool << proc { sleep(1) } }
171+
# sleep(0.1) # Give threads time to pick up work
172+
# puts pool.active_count #=> 4 (all workers busy)
173+
# puts pool.queue_length #=> 1 (one item still queued)
174+
#
175+
# @example Calculate total load
176+
# total_load = pool.active_count + pool.queue_length
177+
# puts "Total pending work: #{total_load}"
178+
#
179+
# @rbs () -> Integer
180+
def active_count
181+
@active_thread_count.value
182+
end
183+
154184
# Gracefully shuts down the thread pool.
155185
#
156186
# This method:
@@ -207,7 +237,7 @@ def start
207237
thread_name << " for #{@name}" if @name
208238
Thread.current.name = thread_name
209239

210-
@started_threads.swap { |current_count| current_count + 1 }
240+
@started_thread_count.swap { |current_count| current_count + 1 }
211241

212242
loop do
213243
work = nil
@@ -228,12 +258,15 @@ def start
228258
if should_shutdown
229259
break
230260
elsif work
261+
@active_thread_count.swap { |current_count| current_count + 1 }
231262
begin
232263
work.call
233264
rescue => err
234265
puts "#{thread_name} rescued:"
235266
puts "#{err.class}: #{err.message}"
236267
puts err.backtrace.join("\n")
268+
ensure
269+
@active_thread_count.swap { |current_count| current_count - 1 }
237270
end
238271
else
239272
Thread.pass
@@ -243,7 +276,7 @@ def start
243276
end
244277
@threads.freeze
245278

246-
Thread.pass until @started_threads.value == @size
279+
Thread.pass until @started_thread_count.value == @size
247280
end
248281
end
249282
end

0 commit comments

Comments
 (0)