Skip to content

Commit e75623a

Browse files
fix: simplify stream reading - remove thread/queue, use direct iteration (#43)
The daemon thread approach was only reading 1 line in production but worked locally. Simplified to read lines directly via executor.
1 parent 41bda39 commit e75623a

File tree

1 file changed

+38
-84
lines changed

1 file changed

+38
-84
lines changed

src/policyengine_api/api/agent.py

Lines changed: 38 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,13 @@ def _parse_claude_stream_event(line: str) -> dict | None:
154154

155155
async def _stream_modal_sandbox(question: str, api_base_url: str):
156156
"""Stream output from Claude Code running in Modal Sandbox."""
157-
import queue
158-
import threading
159157
from concurrent.futures import ThreadPoolExecutor
160158

161159
with logfire.span(
162160
"agent_stream", question=question[:100], api_base_url=api_base_url
163161
):
164162
sb = None
165-
executor = ThreadPoolExecutor(max_workers=1)
163+
executor = ThreadPoolExecutor(max_workers=2)
166164
try:
167165
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox
168166

@@ -174,91 +172,47 @@ async def _stream_modal_sandbox(question: str, api_base_url: str):
174172
)
175173
logfire.info("sandbox_created")
176174

177-
line_queue = queue.Queue()
178-
lines_received = 0
179-
180-
def stream_reader():
181-
nonlocal lines_received
175+
# Read stdout synchronously in executor, yield lines as we get them
176+
def read_next_line(stdout_iter):
182177
try:
183-
logfire.info("reader_started")
184-
for line in process.stdout:
185-
lines_received += 1
186-
# Log line length and first 500 chars (avoid scrubbing)
187-
line_preview = (
188-
line[:500].replace("session", "sess1on") if line else None
189-
)
190-
# Check if multiple JSON objects concatenated (embedded newlines)
191-
newline_count = line.count("\n") if line else 0
192-
logfire.info(
193-
"raw_line",
194-
line_num=lines_received,
195-
line_len=len(line) if line else 0,
196-
newline_count=newline_count,
197-
line_preview=line_preview,
198-
)
199-
line_queue.put(("line", line))
200-
logfire.info("stdout_exhausted", total_lines=lines_received)
201-
process.wait()
202-
logfire.info("process_exited", returncode=process.returncode)
203-
if process.returncode != 0:
204-
stderr = process.stderr.read()
205-
logfire.error(
206-
"process_failed",
207-
returncode=process.returncode,
208-
stderr=stderr[:500] if stderr else None,
209-
)
210-
line_queue.put(("error", (process.returncode, stderr)))
211-
else:
212-
line_queue.put(("done", process.returncode))
213-
except Exception as e:
214-
logfire.exception("reader_error", error=str(e))
215-
line_queue.put(("exception", str(e)))
216-
217-
reader_thread = threading.Thread(target=stream_reader, daemon=True)
218-
reader_thread.start()
178+
return next(stdout_iter)
179+
except StopIteration:
180+
return None
219181

182+
stdout_iter = iter(process.stdout)
183+
lines_received = 0
220184
events_sent = 0
185+
221186
while True:
222-
try:
223-
item = await loop.run_in_executor(
224-
executor, lambda: line_queue.get(timeout=0.1)
225-
)
226-
event_type, data = item
227-
228-
if event_type == "line":
229-
parsed = _parse_claude_stream_event(data)
230-
if parsed:
231-
events_sent += 1
232-
logfire.info(
233-
"event",
234-
num=events_sent,
235-
type=parsed["type"],
236-
content=parsed["content"][:200]
237-
if parsed["content"]
238-
else None,
239-
)
240-
yield f"data: {json.dumps(parsed)}\n\n"
241-
elif event_type == "error":
242-
returncode, stderr = data
243-
yield f"data: {json.dumps({'type': 'error', 'content': stderr})}\n\n"
244-
yield f"data: {json.dumps({'type': 'done', 'returncode': returncode})}\n\n"
245-
break
246-
elif event_type == "done":
247-
logfire.info(
248-
"complete",
249-
returncode=data,
250-
events_sent=events_sent,
251-
lines_received=lines_received,
252-
)
253-
yield f"data: {json.dumps({'type': 'done', 'returncode': data})}\n\n"
254-
break
255-
elif event_type == "exception":
256-
raise Exception(data)
257-
except Exception as e:
258-
if "Empty" in type(e).__name__:
259-
await asyncio.sleep(0)
260-
continue
261-
raise
187+
line = await loop.run_in_executor(executor, read_next_line, stdout_iter)
188+
189+
if line is None:
190+
# stdout exhausted
191+
logfire.info("stdout_exhausted", total_lines=lines_received)
192+
break
193+
194+
lines_received += 1
195+
logfire.info(
196+
"raw_line",
197+
line_num=lines_received,
198+
line_len=len(line),
199+
line_preview=line[:300].replace("session", "sess1on"),
200+
)
201+
202+
parsed = _parse_claude_stream_event(line)
203+
if parsed:
204+
events_sent += 1
205+
yield f"data: {json.dumps(parsed)}\n\n"
206+
207+
# Wait for process to finish
208+
returncode = await loop.run_in_executor(executor, process.wait)
209+
logfire.info(
210+
"complete",
211+
returncode=returncode,
212+
events_sent=events_sent,
213+
lines_received=lines_received,
214+
)
215+
yield f"data: {json.dumps({'type': 'done', 'returncode': returncode})}\n\n"
262216

263217
except Exception as e:
264218
logfire.exception("failed", error=str(e))

0 commit comments

Comments
 (0)