Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/socketio/async_aiopika_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio

from engineio import json
from .async_pubsub_manager import AsyncPubSubManager

try:
Expand Down Expand Up @@ -32,18 +31,29 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
in rabbitmq
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""

name = 'asyncaiopika'

def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None):
channel='socketio', write_only=False, logger=None, json=None):
if aio_pika is None:
raise RuntimeError('aio_pika package is not installed '
'(Run "pip install aio_pika" in your '
'virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.url = url
self._lock = asyncio.Lock()
self.publisher_connection = None
Expand Down Expand Up @@ -82,7 +92,7 @@ async def _publish(self, data):
try:
await self.publisher_exchange.publish(
aio_pika.Message(
body=json.dumps(data).encode(),
body=self.json.dumps(data).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
), routing_key='*',
)
Expand Down
5 changes: 3 additions & 2 deletions src/socketio/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ class AsyncClient(base_client.BaseClient):
use. To disable logging set to ``False``. The default is
``False``. Note that fatal errors are logged even when
``logger`` is ``False``.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param handle_sigint: Set to ``True`` to automatically handle disconnection
when the process is interrupted, or to ``False`` to
leave interrupt handling to the calling application.
Expand Down
22 changes: 18 additions & 4 deletions src/socketio/async_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from functools import partial
import uuid

from engineio import json

from .async_manager import AsyncManager
from .packet import Packet

Expand All @@ -22,15 +20,31 @@ class AsyncPubSubManager(AsyncManager):

:param channel: The channel name on which the server sends and receives
notifications.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'asyncpubsub'

def __init__(self, channel='socketio', write_only=False, logger=None):
def __init__(self, channel='socketio', write_only=False, logger=None,
json=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger
if json is not None:
self.json = json

def initialize(self):
super().initialize()
Expand Down Expand Up @@ -221,7 +235,7 @@ async def _thread(self):
data = message
else:
try:
data = json.loads(message)
data = self.json.loads(message)
except:
pass
if data and 'method' in data:
Expand Down
20 changes: 15 additions & 5 deletions src/socketio/async_redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
aiovalkey = None
ValkeyError = None

from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url

Expand Down Expand Up @@ -47,18 +46,29 @@ class AsyncRedisManager(AsyncPubSubManager):
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
write_only=False, logger=None, json=None, redis_options=None):
if aioredis and \
not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.redis_url = url
self.redis_options = redis_options or {}
self.connected = False
Expand Down Expand Up @@ -117,7 +127,7 @@ async def _publish(self, data): # pragma: no cover
if not self.connected:
self._redis_connect()
return await self.redis.publish(
self.channel, json.dumps(data))
self.channel, self.json.dumps(data))
except error as exc:
if retries_left > 0:
self._get_logger().error(
Expand Down
5 changes: 3 additions & 2 deletions src/socketio/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ class AsyncServer(base_server.BaseServer):
:param logger: To enable logging set to ``True`` or pass a logger object to
use. To disable logging set to ``False``. Note that fatal
errors are logged even when ``logger`` is ``False``.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param async_handlers: If set to ``True``, event handlers for a client are
executed in separate threads. To run handlers for a
client synchronously, set to ``False``. The default
Expand Down
3 changes: 3 additions & 0 deletions src/socketio/base_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import logging
import json

from bidict import bidict, ValueDuplicationError

Expand All @@ -14,9 +15,11 @@ def __init__(self):
self.eio_to_sid = {}
self.callbacks = {}
self.pending_disconnect = {}
self.json = json

def set_server(self, server):
self.server = server
self.json = self.server.packet_class.json # use the global JSON module

def initialize(self):
"""Invoked before the first request is received. Subclasses can add
Expand Down
5 changes: 3 additions & 2 deletions src/socketio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ class Client(base_client.BaseClient):
of the ``encode()`` and ``decode()`` methods can be
provided. Client and server must use compatible
serializers.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param handle_sigint: Set to ``True`` to automatically handle disconnection
when the process is interrupted, or to ``False`` to
leave interrupt handling to the calling application.
Expand Down
20 changes: 15 additions & 5 deletions src/socketio/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
except ImportError:
kafka = None

from engineio import json
from .pubsub_manager import PubSubManager

logger = logging.getLogger('socketio')
Expand All @@ -32,18 +31,29 @@ class KafkaManager(PubSubManager): # pragma: no cover
servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'kafka'

def __init__(self, url='kafka://localhost:9092', channel='socketio',
write_only=False):
write_only=False, logger=None, json=None):
if kafka is None:
raise RuntimeError('kafka-python package is not installed '
'(Run "pip install kafka-python" in your '
'virtualenv).')

super().__init__(channel=channel, write_only=write_only)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)

urls = [url] if isinstance(url, str) else url
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
Expand All @@ -53,7 +63,7 @@ def __init__(self, url='kafka://localhost:9092', channel='socketio',
bootstrap_servers=self.kafka_urls)

def _publish(self, data):
self.producer.send(self.channel, value=json.dumps(data))
self.producer.send(self.channel, value=self.json.dumps(data))
self.producer.flush()

def _kafka_listen(self):
Expand Down
20 changes: 15 additions & 5 deletions src/socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
except ImportError:
kombu = None

from engineio import json
from .pubsub_manager import PubSubManager


Expand Down Expand Up @@ -34,7 +33,17 @@ class KombuManager(PubSubManager): # pragma: no cover
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
:param connection_options: additional keyword arguments to be passed to
``kombu.Connection()``.
:param exchange_options: additional keyword arguments to be passed to
Expand All @@ -47,14 +56,15 @@ class KombuManager(PubSubManager): # pragma: no cover
name = 'kombu'

def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None,
channel='socketio', write_only=False, logger=None, json=None,
connection_options=None, exchange_options=None,
queue_options=None, producer_options=None):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.url = url
self.connection_options = connection_options or {}
self.exchange_options = exchange_options or {}
Expand Down Expand Up @@ -102,7 +112,7 @@ def _publish(self, data):
try:
producer_publish = self._producer_publish(
self.publisher_connection)
producer_publish(json.dumps(data))
producer_publish(self.json.dumps(data))
break
except (OSError, kombu.exceptions.KombuError):
if retry:
Expand Down
22 changes: 18 additions & 4 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from functools import partial
import uuid

from engineio import json

from .manager import Manager
from .packet import Packet

Expand All @@ -21,15 +19,31 @@ class PubSubManager(Manager):

:param channel: The channel name on which the server sends and receives
notifications.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'pubsub'

def __init__(self, channel='socketio', write_only=False, logger=None):
def __init__(self, channel='socketio', write_only=False, logger=None,
json=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger
if json is not None:
self.json = json

def initialize(self):
super().initialize()
Expand Down Expand Up @@ -215,7 +229,7 @@ def _thread(self):
data = message
else:
try:
data = json.loads(message)
data = self.json.loads(message)
except:
pass
if data and 'method' in data:
Expand Down
Loading
Loading