Skip to content

Commit 803ea0c

Browse files
committed
FIX: Fix Live.stop() disconnection with cleanup
1 parent c6d8de5 commit 803ea0c

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 0.62.1 - TBD
4+
5+
#### Bug fixes
6+
- Fixed an issue where calling `Live.stop()` would not clean up the client state once the socket is closed
7+
38
## 0.62.0 - 2025-08-19
49

510
This release delivers a number of breaking changes to the Python interface for DBN records to provide a cleaner and more consistent API.

databento/live/session.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,9 @@ def stop(self) -> None:
439439
with self._lock:
440440
if self._transport is None:
441441
return
442-
if self._transport.can_write_eof():
443-
self._transport.write_eof()
444-
self._transport.close()
442+
if self._protocol is not None:
443+
self._protocol.disconnected.add_done_callback(lambda _: self._cleanup())
444+
self._loop.call_soon_threadsafe(self._transport.close)
445445

446446
def start(self) -> None:
447447
"""
@@ -516,8 +516,6 @@ def terminate(self) -> None:
516516
with self._lock:
517517
if self._transport is None:
518518
return
519-
if self._transport.can_write_eof():
520-
self._transport.write_eof()
521519
self._transport.abort()
522520
self._cleanup()
523521

tests/test_live_client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from __future__ import annotations
66

7+
import asyncio
78
import pathlib
89
import platform
910
import random
@@ -255,6 +256,42 @@ async def test_live_connect_auth(
255256
assert message.encoding == Encoding.DBN
256257

257258

259+
async def test_live_client_reuse(
260+
mock_live_server: MockLiveServerInterface,
261+
live_client: client.Live,
262+
) -> None:
263+
"""
264+
Test that calling stop will *eventually* close a the connection and trigger
265+
a cleanup of the client state.
266+
"""
267+
live_client.subscribe(
268+
dataset=Dataset.GLBX_MDP3,
269+
schema=Schema.MBO,
270+
)
271+
272+
await mock_live_server.wait_for_message_of_type(
273+
message_type=gateway.AuthenticationRequest,
274+
)
275+
276+
live_client.start()
277+
live_client.stop()
278+
279+
await asyncio.sleep(1)
280+
281+
live_client.subscribe(
282+
dataset=Dataset.GLBX_MDP3,
283+
schema=Schema.MBP_1,
284+
)
285+
286+
await mock_live_server.wait_for_message_of_type(
287+
message_type=gateway.AuthenticationRequest,
288+
)
289+
290+
live_client.start()
291+
live_client.stop()
292+
await live_client.wait_for_close()
293+
294+
258295
async def test_live_connect_auth_with_heartbeat_interval(
259296
mock_live_server: MockLiveServerInterface,
260297
test_live_api_key: str,

0 commit comments

Comments
 (0)