diff --git a/cassandra/protocol.py b/cassandra/protocol.py index c151d44361..8fee0db101 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -956,7 +956,8 @@ def send_body(self, f, protocol_version): known_event_types = frozenset(( 'TOPOLOGY_CHANGE', 'STATUS_CHANGE', - 'SCHEMA_CHANGE' + 'SCHEMA_CHANGE', + 'CONNECTION_METADATA_CHANGE' )) @@ -987,6 +988,14 @@ def recv_body(cls, f, protocol_version, *args): return cls(event_type=event_type, event_args=read_method(f, protocol_version)) raise NotSupportedError('Unknown event type %r' % event_type) + @classmethod + def recv_connection_metadata_change(cls, f, protocol_version): + # "UPDATE_NODES" + change_type = read_string(f) + connection_ids = read_stringlist(f) + host_ids = read_stringlist(f) + return dict(change_type=change_type, connection_ids=connection_ids, host_ids=host_ids) + @classmethod def recv_topology_change(cls, f, protocol_version): # "NEW_NODE" or "REMOVED_NODE" diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 7a8845750e..edac685d12 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -688,7 +688,7 @@ def xfail_scylla_version_lt(reason, oss_scylla_version, ent_scylla_version, *arg It is used to mark tests that are going to fail on certain scylla versions. :param reason: message to fail test with :param oss_scylla_version: str, oss version from which test supposed to succeed - :param ent_scylla_version: str, enterprise version from which test supposed to succeed. It should end with `.1.1` + :param ent_scylla_version: str, enterprise version from which test supposed to succeed """ if not reason.startswith("scylladb/scylladb#"): raise ValueError('reason should start with scylladb/scylladb# to reference issue in scylla repo') @@ -696,9 +696,6 @@ def xfail_scylla_version_lt(reason, oss_scylla_version, ent_scylla_version, *arg if not isinstance(ent_scylla_version, str): raise ValueError('ent_scylla_version should be a str') - if not ent_scylla_version.endswith("1.1"): - raise ValueError('ent_scylla_version should end with "1.1"') - if SCYLLA_VERSION is None: return pytest.mark.skipif(False, reason="It is just a NoOP Decor, should not skip anything") diff --git a/tests/integration/standard/test_control_connection.py b/tests/integration/standard/test_control_connection.py index 32cc468e64..350c6a5ffe 100644 --- a/tests/integration/standard/test_control_connection.py +++ b/tests/integration/standard/test_control_connection.py @@ -14,13 +14,17 @@ # # # +from threading import Event + from cassandra import InvalidRequest import unittest +import requests from cassandra.protocol import ConfigurationException -from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40 +from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40, \ + xfail_scylla_version_lt from tests.integration.datatype_utils import update_datatypes @@ -127,3 +131,69 @@ def test_control_connection_port_discovery(self): for host in hosts: assert 9042 == host.broadcast_rpc_port assert 7000 == host.broadcast_port + + @xfail_scylla_version_lt(reason='scylladb/scylladb#26992 - system.connection_metadata is not yet supported', + oss_scylla_version="7.0", ent_scylla_version="2025.4.0") + def test_connection_metadata_change_event(self): + cluster = TestCluster() + + # Establish control connection + cluster.connect() + + flag = Event() + + connection_ids = ["anytext", "11510f50-f906-4844-8c74-49ddab9ac6a9"] + host_ids = ["1a13fa42-c45b-410f-8ba5-58b42ada9c12", "aa13fa42-c45b-410f-8ba5-58b42ada9c12"] + got_connection_ids = [] + got_host_ids = [] + + def on_event(event): + nonlocal got_connection_ids + nonlocal got_host_ids + try: + assert event.get("change_type") == "UPDATE_NODES" + got_connection_ids = event.get("connection_ids") + got_host_ids = event.get("host_ids") + finally: + flag.set() + + cluster.control_connection._connection.register_watchers({"CONNECTION_METADATA_CHANGE": on_event}) + + try: + payload = [ + { + "connection_id": connection_ids[0], # Should be a UUID if API requires that + "host_id": host_ids[0], + "address": "localhost", + "port": 9042, + "tls_port": 0, + "alternator_port": 0, + "alternator_https_port": 0, + "rack": "string", + "datacenter": "string" + }, + { + "connection_id": connection_ids[1], + "host_id": host_ids[1], + "address": "localhost", + "port": 9042, + "tls_port": 0, + "alternator_port": 0, + "alternator_https_port": 0, + "rack": "string", + "datacenter": "string" + } + ] + response = requests.post( + "http://" + cluster.contact_points[0] + ":10000/v2/connection-metadata", + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }) + assert response.status_code == 200 + assert flag.wait(20), "Schema change event was not received after registering watchers" + assert got_connection_ids == connection_ids + assert got_host_ids == host_ids + finally: + cluster.shutdown()