diff --git a/src/socketio/async_aiopika_manager.py b/src/socketio/async_aiopika_manager.py index 34171c46..626158bf 100644 --- a/src/socketio/async_aiopika_manager.py +++ b/src/socketio/async_aiopika_manager.py @@ -1,6 +1,5 @@ import asyncio -from engineio import json from .async_pubsub_manager import AsyncPubSubManager try: @@ -32,18 +31,29 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover in rabbitmq :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. """ name = 'asyncaiopika' def __init__(self, url='amqp://guest:guest@localhost:5672//', - channel='socketio', write_only=False, logger=None): + channel='socketio', write_only=False, logger=None, json=None): if aio_pika is None: raise RuntimeError('aio_pika package is not installed ' '(Run "pip install aio_pika" in your ' 'virtualenv).') - super().__init__(channel=channel, write_only=write_only, logger=logger) + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) self.url = url self._lock = asyncio.Lock() self.publisher_connection = None @@ -82,7 +92,7 @@ async def _publish(self, data): try: await self.publisher_exchange.publish( aio_pika.Message( - body=json.dumps(data).encode(), + body=self.json.dumps(data).encode(), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ), routing_key='*', ) diff --git a/src/socketio/async_client.py b/src/socketio/async_client.py index 49241ed0..cdbcc1b6 100644 --- a/src/socketio/async_client.py +++ b/src/socketio/async_client.py @@ -37,10 +37,11 @@ class AsyncClient(base_client.BaseClient): use. To disable logging set to ``False``. The default is ``False``. Note that fatal errors are logged even when ``logger`` is ``False``. - :param json: An alternative json module to use for encoding and decoding + :param json: An alternative JSON module to use for encoding and decoding packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library - versions. + versions. This is a process-wide setting, all instantiated + servers and clients must use the same JSON module. :param handle_sigint: Set to ``True`` to automatically handle disconnection when the process is interrupted, or to ``False`` to leave interrupt handling to the calling application. diff --git a/src/socketio/async_pubsub_manager.py b/src/socketio/async_pubsub_manager.py index 6d631640..b29e521d 100644 --- a/src/socketio/async_pubsub_manager.py +++ b/src/socketio/async_pubsub_manager.py @@ -3,8 +3,6 @@ from functools import partial import uuid -from engineio import json - from .async_manager import AsyncManager from .packet import Packet @@ -22,15 +20,31 @@ class AsyncPubSubManager(AsyncManager): :param channel: The channel name on which the server sends and receives notifications. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. """ name = 'asyncpubsub' - def __init__(self, channel='socketio', write_only=False, logger=None): + def __init__(self, channel='socketio', write_only=False, logger=None, + json=None): super().__init__() self.channel = channel self.write_only = write_only self.host_id = uuid.uuid4().hex self.logger = logger + if json is not None: + self.json = json def initialize(self): super().initialize() @@ -221,7 +235,7 @@ async def _thread(self): data = message else: try: - data = json.loads(message) + data = self.json.loads(message) except: pass if data and 'method' in data: diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index a43807ca..862e6104 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -19,7 +19,6 @@ aiovalkey = None ValkeyError = None -from engineio import json from .async_pubsub_manager import AsyncPubSubManager from .redis_manager import parse_redis_sentinel_url @@ -47,18 +46,29 @@ class AsyncRedisManager(AsyncPubSubManager): notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. :param redis_options: additional keyword arguments to be passed to ``Redis.from_url()`` or ``Sentinel()``. """ name = 'aioredis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', - write_only=False, logger=None, redis_options=None): + write_only=False, logger=None, json=None, redis_options=None): if aioredis and \ not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover raise RuntimeError('Version 2 of aioredis package is required.') - super().__init__(channel=channel, write_only=write_only, logger=logger) + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) self.redis_url = url self.redis_options = redis_options or {} self.connected = False @@ -117,7 +127,7 @@ async def _publish(self, data): # pragma: no cover if not self.connected: self._redis_connect() return await self.redis.publish( - self.channel, json.dumps(data)) + self.channel, self.json.dumps(data)) except error as exc: if retries_left > 0: self._get_logger().error( diff --git a/src/socketio/async_server.py b/src/socketio/async_server.py index e13009ef..733d6e94 100644 --- a/src/socketio/async_server.py +++ b/src/socketio/async_server.py @@ -28,10 +28,11 @@ class AsyncServer(base_server.BaseServer): :param logger: To enable logging set to ``True`` or pass a logger object to use. To disable logging set to ``False``. Note that fatal errors are logged even when ``logger`` is ``False``. - :param json: An alternative json module to use for encoding and decoding + :param json: An alternative JSON module to use for encoding and decoding packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library - versions. + versions. This is a process-wide setting, all instantiated + servers and clients must use the same JSON module. :param async_handlers: If set to ``True``, event handlers for a client are executed in separate threads. To run handlers for a client synchronously, set to ``False``. The default diff --git a/src/socketio/base_manager.py b/src/socketio/base_manager.py index ae4530bd..7b092ead 100644 --- a/src/socketio/base_manager.py +++ b/src/socketio/base_manager.py @@ -1,5 +1,6 @@ import itertools import logging +import json from bidict import bidict, ValueDuplicationError @@ -14,9 +15,11 @@ def __init__(self): self.eio_to_sid = {} self.callbacks = {} self.pending_disconnect = {} + self.json = json def set_server(self, server): self.server = server + self.json = self.server.packet_class.json # use the global JSON module def initialize(self): """Invoked before the first request is received. Subclasses can add diff --git a/src/socketio/client.py b/src/socketio/client.py index 5282e0a1..8f38316e 100644 --- a/src/socketio/client.py +++ b/src/socketio/client.py @@ -39,10 +39,11 @@ class Client(base_client.BaseClient): of the ``encode()`` and ``decode()`` methods can be provided. Client and server must use compatible serializers. - :param json: An alternative json module to use for encoding and decoding + :param json: An alternative JSON module to use for encoding and decoding packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library - versions. + versions. This is a process-wide setting, all instantiated + servers and clients must use the same JSON module. :param handle_sigint: Set to ``True`` to automatically handle disconnection when the process is interrupted, or to ``False`` to leave interrupt handling to the calling application. diff --git a/src/socketio/kafka_manager.py b/src/socketio/kafka_manager.py index a9f1a075..afa28ff9 100644 --- a/src/socketio/kafka_manager.py +++ b/src/socketio/kafka_manager.py @@ -5,7 +5,6 @@ except ImportError: kafka = None -from engineio import json from .pubsub_manager import PubSubManager logger = logging.getLogger('socketio') @@ -32,18 +31,29 @@ class KafkaManager(PubSubManager): # pragma: no cover servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. """ name = 'kafka' def __init__(self, url='kafka://localhost:9092', channel='socketio', - write_only=False): + write_only=False, logger=None, json=None): if kafka is None: raise RuntimeError('kafka-python package is not installed ' '(Run "pip install kafka-python" in your ' 'virtualenv).') - super().__init__(channel=channel, write_only=write_only) + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) urls = [url] if isinstance(url, str) else url self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092' @@ -53,7 +63,7 @@ def __init__(self, url='kafka://localhost:9092', channel='socketio', bootstrap_servers=self.kafka_urls) def _publish(self, data): - self.producer.send(self.channel, value=json.dumps(data)) + self.producer.send(self.channel, value=self.json.dumps(data)) self.producer.flush() def _kafka_listen(self): diff --git a/src/socketio/kombu_manager.py b/src/socketio/kombu_manager.py index ae9d1393..d17505e8 100644 --- a/src/socketio/kombu_manager.py +++ b/src/socketio/kombu_manager.py @@ -6,7 +6,6 @@ except ImportError: kombu = None -from engineio import json from .pubsub_manager import PubSubManager @@ -34,7 +33,17 @@ class KombuManager(PubSubManager): # pragma: no cover notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. :param connection_options: additional keyword arguments to be passed to ``kombu.Connection()``. :param exchange_options: additional keyword arguments to be passed to @@ -47,14 +56,15 @@ class KombuManager(PubSubManager): # pragma: no cover name = 'kombu' def __init__(self, url='amqp://guest:guest@localhost:5672//', - channel='socketio', write_only=False, logger=None, + channel='socketio', write_only=False, logger=None, json=None, connection_options=None, exchange_options=None, queue_options=None, producer_options=None): if kombu is None: raise RuntimeError('Kombu package is not installed ' '(Run "pip install kombu" in your ' 'virtualenv).') - super().__init__(channel=channel, write_only=write_only, logger=logger) + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) self.url = url self.connection_options = connection_options or {} self.exchange_options = exchange_options or {} @@ -102,7 +112,7 @@ def _publish(self, data): try: producer_publish = self._producer_publish( self.publisher_connection) - producer_publish(json.dumps(data)) + producer_publish(self.json.dumps(data)) break except (OSError, kombu.exceptions.KombuError): if retry: diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index 4254d4af..9a77e3f6 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -2,8 +2,6 @@ from functools import partial import uuid -from engineio import json - from .manager import Manager from .packet import Packet @@ -21,15 +19,31 @@ class PubSubManager(Manager): :param channel: The channel name on which the server sends and receives notifications. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. """ name = 'pubsub' - def __init__(self, channel='socketio', write_only=False, logger=None): + def __init__(self, channel='socketio', write_only=False, logger=None, + json=None): super().__init__() self.channel = channel self.write_only = write_only self.host_id = uuid.uuid4().hex self.logger = logger + if json is not None: + self.json = json def initialize(self): super().initialize() @@ -215,7 +229,7 @@ def _thread(self): data = message else: try: - data = json.loads(message) + data = self.json.loads(message) except: pass if data and 'method' in data: diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index 4a9d69d1..fb2e0400 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -16,7 +16,6 @@ valkey = None ValkeyError = None -from engineio import json from .pubsub_manager import PubSubManager logger = logging.getLogger('socketio') @@ -72,15 +71,26 @@ class RedisManager(PubSubManager): notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. :param redis_options: additional keyword arguments to be passed to ``Redis.from_url()`` or ``Sentinel()``. """ name = 'redis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', - write_only=False, logger=None, redis_options=None): - super().__init__(channel=channel, write_only=write_only, logger=logger) + write_only=False, logger=None, json=None, redis_options=None): + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) self.redis_url = url self.redis_options = redis_options or {} self.connected = False @@ -153,7 +163,7 @@ def _publish(self, data): # pragma: no cover try: if not self.connected: self._redis_connect() - return self.redis.publish(self.channel, json.dumps(data)) + return self.redis.publish(self.channel, self.json.dumps(data)) except error as exc: if retries_left > 0: logger.error( diff --git a/src/socketio/server.py b/src/socketio/server.py index f3257081..6a1a2026 100644 --- a/src/socketio/server.py +++ b/src/socketio/server.py @@ -30,10 +30,11 @@ class Server(base_server.BaseServer): of the ``encode()`` and ``decode()`` methods can be provided. Client and server must use compatible serializers. - :param json: An alternative json module to use for encoding and decoding + :param json: An alternative JSON module to use for encoding and decoding packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library - versions. + versions. This is a process-wide setting, all instantiated + servers and clients must use the same JSON module. :param async_handlers: If set to ``True``, event handlers for a client are executed in separate threads. To run handlers for a client synchronously, set to ``False``. The default diff --git a/src/socketio/zmq_manager.py b/src/socketio/zmq_manager.py index a71b869c..3fa1e145 100644 --- a/src/socketio/zmq_manager.py +++ b/src/socketio/zmq_manager.py @@ -1,6 +1,5 @@ import re -from engineio import json from .pubsub_manager import PubSubManager @@ -23,7 +22,17 @@ class ZmqManager(PubSubManager): # pragma: no cover notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting - and receiving. + and receiving. A write-only instance can be used + independently of the server to emit to clients from an + external process. + :param logger: a custom logger to log it. If not given, the server logger + is used. + :param json: An alternative JSON module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. This setting is only used when ``write_only`` is set + to ``True``. Otherwise the JSON module configured in the + server is used. A zmq message broker must be running for the zmq_manager to work. you can write your own or adapt one from the following simple broker @@ -42,10 +51,8 @@ class ZmqManager(PubSubManager): # pragma: no cover """ name = 'zmq' - def __init__(self, url='zmq+tcp://localhost:5555+5556', - channel='socketio', - write_only=False, - logger=None): + def __init__(self, url='zmq+tcp://localhost:5555+5556', channel='socketio', + write_only=False, logger=None, json=None): try: from eventlet.green import zmq except ImportError: @@ -57,7 +64,8 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556', if not (url.startswith('zmq+tcp://') and r.search(url)): raise RuntimeError('unexpected connection string: ' + url) - super().__init__(channel=channel, write_only=write_only, logger=logger) + super().__init__(channel=channel, write_only=write_only, logger=logger, + json=json) url = url.replace('zmq+', '') (sink_url, sub_port) = url.split('+') sink_port = sink_url.split(':')[-1] @@ -75,7 +83,7 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556', self.channel = channel def _publish(self, data): - packed_data = json.dumps( + packed_data = self.json.dumps( { 'type': 'message', 'channel': self.channel, @@ -94,7 +102,7 @@ def _listen(self): for message in self.zmq_listen(): if isinstance(message, bytes): try: - message = json.loads(message) + message = self.json.loads(message) except Exception: pass if isinstance(message, dict) and \ diff --git a/tests/async/test_pubsub_manager.py b/tests/async/test_pubsub_manager.py index abf41a24..40966bf8 100644 --- a/tests/async/test_pubsub_manager.py +++ b/tests/async/test_pubsub_manager.py @@ -5,8 +5,10 @@ import pytest +from engineio.packet import Packet as EIOPacket from socketio import async_manager from socketio import async_pubsub_manager +from socketio import async_server from socketio import packet @@ -846,3 +848,14 @@ async def messages(): self.pm._handle_emit.assert_awaited_with( {'method': 'emit', 'value': 'bar', 'host_id': 'x'} ) + + def test_custom_json(self): + saved_json = packet.Packet.json + + cm = async_pubsub_manager.AsyncPubSubManager(json='foo') + assert cm.json == 'foo' + async_server.AsyncServer(json='bar', client_manager=cm) + assert cm.json == 'bar' + + packet.Packet.json = saved_json + EIOPacket.json = saved_json diff --git a/tests/async/test_redis_manager.py b/tests/async/test_redis_manager.py index 046c26d2..16fa4caf 100644 --- a/tests/async/test_redis_manager.py +++ b/tests/async/test_redis_manager.py @@ -2,8 +2,10 @@ import redis import valkey -from socketio import async_redis_manager +from engineio.packet import Packet as EIOPacket +from socketio import async_redis_manager, AsyncServer from socketio.async_redis_manager import AsyncRedisManager +from socketio.packet import Packet class TestAsyncRedisManager: @@ -109,3 +111,14 @@ def test_valkey_connect(self): assert isinstance(c.redis, valkey.asyncio.Valkey) async_redis_manager.aioredis = saved_redis + + def test_custom_json(self): + saved_json = Packet.json + + cm = AsyncRedisManager('redis://', json='foo') + assert cm.json == 'foo' + AsyncServer(json='bar', client_manager=cm) + assert cm.json == 'bar' + + Packet.json = saved_json + EIOPacket.json = saved_json diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 91b77854..252c77f8 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -5,9 +5,11 @@ import pytest +from engineio.packet import Packet as EIOPacket from socketio import manager from socketio import pubsub_manager from socketio import packet +from socketio import server class TestPubSubManager: @@ -822,3 +824,14 @@ def messages(): self.pm._handle_emit.assert_called_with( {'method': 'emit', 'value': 'bar', 'host_id': 'x'} ) + + def test_custom_json(self): + saved_json = packet.Packet.json + + cm = pubsub_manager.PubSubManager(json='foo') + assert cm.json == 'foo' + server.Server(json='bar', client_manager=cm) + assert cm.json == 'bar' + + packet.Packet.json = saved_json + EIOPacket.json = saved_json diff --git a/tests/common/test_redis_manager.py b/tests/common/test_redis_manager.py index dbc927e4..3f4e29c6 100644 --- a/tests/common/test_redis_manager.py +++ b/tests/common/test_redis_manager.py @@ -2,7 +2,9 @@ import redis import valkey -from socketio import redis_manager +from engineio.packet import Packet as EIOPacket +from socketio import redis_manager, Server +from socketio.packet import Packet from socketio.redis_manager import RedisManager, parse_redis_sentinel_url @@ -144,3 +146,14 @@ def test_sentinel_url_parser(self, rtype): 'myredis', {'username': 'user', 'password': 'password', 'db': 0} ) + + def test_custom_json(self): + saved_json = Packet.json + + cm = RedisManager('redis://', json='foo') + assert cm.json == 'foo' + Server(json='bar', client_manager=cm) + assert cm.json == 'bar' + + Packet.json = saved_json + EIOPacket.json = saved_json diff --git a/tox.ini b/tox.ini index 02297a2c..0ffa92e7 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ python = [testenv] commands= pip install -e . - pytest -p no:logging --timeout=60 --cov=socketio --cov-branch --cov-report=term-missing --cov-report=xml + pytest -p no:logging --timeout=60 --cov=socketio --cov-branch --cov-report=term-missing --cov-report=xml {posargs} deps= simple-websocket uvicorn