Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,8 @@ def send_body(self, f, protocol_version):
known_event_types = frozenset((
'TOPOLOGY_CHANGE',
'STATUS_CHANGE',
'SCHEMA_CHANGE'
'SCHEMA_CHANGE',
'CONNECTION_METADATA_CHANGE'
))


Expand Down Expand Up @@ -987,6 +988,14 @@ def recv_body(cls, f, protocol_version, *args):
return cls(event_type=event_type, event_args=read_method(f, protocol_version))
raise NotSupportedError('Unknown event type %r' % event_type)

@classmethod
def recv_connection_metadata_change(cls, f, protocol_version):
# "UPDATE_NODES"
change_type = read_string(f)
connection_ids = read_stringlist(f)
host_ids = read_stringlist(f)
return dict(change_type=change_type, connection_ids=connection_ids, host_ids=host_ids)

@classmethod
def recv_topology_change(cls, f, protocol_version):
# "NEW_NODE" or "REMOVED_NODE"
Expand Down
5 changes: 1 addition & 4 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,17 +688,14 @@ def xfail_scylla_version_lt(reason, oss_scylla_version, ent_scylla_version, *arg
It is used to mark tests that are going to fail on certain scylla versions.
:param reason: message to fail test with
:param oss_scylla_version: str, oss version from which test supposed to succeed
:param ent_scylla_version: str, enterprise version from which test supposed to succeed. It should end with `.1.1`
:param ent_scylla_version: str, enterprise version from which test supposed to succeed
"""
if not reason.startswith("scylladb/scylladb#"):
raise ValueError('reason should start with scylladb/scylladb#<issue-id> to reference issue in scylla repo')

if not isinstance(ent_scylla_version, str):
raise ValueError('ent_scylla_version should be a str')

if not ent_scylla_version.endswith("1.1"):
raise ValueError('ent_scylla_version should end with "1.1"')

if SCYLLA_VERSION is None:
return pytest.mark.skipif(False, reason="It is just a NoOP Decor, should not skip anything")

Expand Down
72 changes: 71 additions & 1 deletion tests/integration/standard/test_control_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
#
#
#
from threading import Event

from cassandra import InvalidRequest

import unittest
import requests


from cassandra.protocol import ConfigurationException
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40, \
xfail_scylla_version_lt
from tests.integration.datatype_utils import update_datatypes


Expand Down Expand Up @@ -127,3 +131,69 @@ def test_control_connection_port_discovery(self):
for host in hosts:
assert 9042 == host.broadcast_rpc_port
assert 7000 == host.broadcast_port

@xfail_scylla_version_lt(reason='scylladb/scylladb#26992 - system.connection_metadata is not yet supported',
oss_scylla_version="7.0", ent_scylla_version="2025.4.0")
def test_connection_metadata_change_event(self):
cluster = TestCluster()

# Establish control connection
cluster.connect()

flag = Event()

connection_ids = ["anytext", "11510f50-f906-4844-8c74-49ddab9ac6a9"]
host_ids = ["1a13fa42-c45b-410f-8ba5-58b42ada9c12", "aa13fa42-c45b-410f-8ba5-58b42ada9c12"]
got_connection_ids = []
got_host_ids = []

def on_event(event):
nonlocal got_connection_ids
nonlocal got_host_ids
try:
assert event.get("change_type") == "UPDATE_NODES"
got_connection_ids = event.get("connection_ids")
got_host_ids = event.get("host_ids")
finally:
flag.set()

cluster.control_connection._connection.register_watchers({"CONNECTION_METADATA_CHANGE": on_event})

try:
payload = [
{
"connection_id": connection_ids[0], # Should be a UUID if API requires that
"host_id": host_ids[0],
"address": "localhost",
"port": 9042,
"tls_port": 0,
"alternator_port": 0,
"alternator_https_port": 0,
"rack": "string",
"datacenter": "string"
},
{
"connection_id": connection_ids[1],
"host_id": host_ids[1],
"address": "localhost",
"port": 9042,
"tls_port": 0,
"alternator_port": 0,
"alternator_https_port": 0,
"rack": "string",
"datacenter": "string"
}
]
response = requests.post(
"http://" + cluster.contact_points[0] + ":10000/v2/connection-metadata",
json=payload,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
})
assert response.status_code == 200
assert flag.wait(20), "Schema change event was not received after registering watchers"
assert got_connection_ids == connection_ids
assert got_host_ids == host_ids
finally:
cluster.shutdown()
Loading