Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 113 additions & 39 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import threading

from cassandra.connection import Connection, ConnectionShutdown
Expand All @@ -12,6 +13,24 @@

log = logging.getLogger(__name__)

# Errno values that indicate the remote peer has disconnected.
_PEER_DISCONNECT_ERRNOS = frozenset((
errno.ENOTCONN, errno.ESHUTDOWN,
errno.ECONNRESET, errno.ECONNABORTED,
errno.EBADF,
))

# Windows winerror codes for the same conditions:
# 10053 = WSAECONNABORTED, 10054 = WSAECONNRESET
_PEER_DISCONNECT_WINERRORS = frozenset((10053, 10054))


def _is_peer_disconnect(err):
"""Return True if *err* indicates the remote peer closed the connection."""
return (isinstance(err, ConnectionError)
or getattr(err, 'winerror', None) in _PEER_DISCONNECT_WINERRORS
or getattr(err, 'errno', None) in _PEER_DISCONNECT_ERRNOS)


# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
Expand Down Expand Up @@ -140,8 +159,7 @@ def close(self):
return
self.is_closed = True

# close from the loop thread to avoid races when removing file
# descriptors
# Schedule async cleanup (cancel watchers, error pending requests)
asyncio.run_coroutine_threadsafe(
self._close(), loop=self._loop
)
Expand All @@ -153,11 +171,46 @@ async def _close(self):
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))
fd = self._socket.fileno()
if fd >= 0:
try:
self._loop.remove_writer(fd)
except NotImplementedError:
# NotImplementedError: remove_reader/remove_writer are not
# supported on Windows ProactorEventLoop (default since
# Python 3.10). ProactorEventLoop uses completion-based
# IOCP, which has no concept of "watching a fd for
# readiness" to remove.
pass
except Exception:
# It is not critical if it fails, driver can keep working,
# but it should not be happening, so logged as error
log.error("Unexpected error removing writer for %s",
self.endpoint, exc_info=True)
try:
self._loop.remove_reader(fd)
except NotImplementedError:
# NotImplementedError: remove_reader/remove_writer are not
# supported on Windows ProactorEventLoop (default since
# Python 3.10). ProactorEventLoop uses completion-based
# IOCP, which has no concept of "watching a fd for
# readiness" to remove.
pass
except Exception:
# It is not critical if it fails, driver can keep working,
# but it should not be happening, so logged as error
log.error("Unexpected error removing reader for %s",
self.endpoint, exc_info=True)

try:
self._socket.close()
except OSError:
# Ignore if socket is already closed
pass
except Exception:
log.debug("Unexpected error closing socket to %s",
self.endpoint, exc_info=True)
log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
msg = "Connection to %s was closed" % self.endpoint
Expand All @@ -168,6 +221,9 @@ async def _close(self):
self.connected_event.set()

def push(self, data):
if self.is_closed or self.is_defunct:
raise ConnectionShutdown(
"Connection to %s is already closed" % self.endpoint)
buff_size = self.out_buffer_size
if len(data) > buff_size:
chunks = []
Expand Down Expand Up @@ -196,43 +252,61 @@ async def _push_msg(self, chunks):


async def handle_write(self):
while True:
try:
exc = None
try:
while True:
next_msg = await self._write_queue.get()
if next_msg:
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
except asyncio.CancelledError:
pass
except Exception as err:
if _is_peer_disconnect(err):
log.debug("Connection %s closed by peer during write: %s",
self, err)
else:
exc = err
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return
finally:
self.defunct(exc or ConnectionShutdown(
"Connection to %s was closed" % self.endpoint))

async def handle_read(self):
while True:
try:
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
# Apparently the preferred way to yield to the event loop from within
# a native coroutine based on https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
continue
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
exc = None
try:
while True:
try:
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
# Apparently the preferred way to yield to the event loop from within
# a native coroutine based on https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
continue

if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
exc = ConnectionShutdown(
"Connection to %s was closed by server" % self.endpoint)
return
except asyncio.CancelledError:
# Task cancellation is treated as a normal connection shutdown;
# cleanup and marking the connection as defunct are handled in finally.
pass
except Exception as err:
if _is_peer_disconnect(err):
log.debug("Connection %s closed by peer during read: %s",
self, err)
self.defunct(err)
return # leave the read loop
except asyncio.CancelledError:
return

if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
exc = err
log.debug("Exception during socket recv for %s: %s",
self, err)
finally:
self.defunct(exc or ConnectionShutdown(
"Connection to %s was closed" % self.endpoint))
Loading
Loading