1616from cassandra import OperationTimedOut , ReadTimeout , Unavailable , WriteTimeout
1717
1818# Import Cassandra driver exceptions for proper error detection
19+ from cassandra .cluster import Cluster as SyncCluster
1920from cassandra .cluster import NoHostAvailable
2021from cassandra .policies import ConstantReconnectionPolicy
2122from fastapi import FastAPI , HTTPException , Query , Request
@@ -49,6 +50,8 @@ class UserUpdate(BaseModel):
4950# Global session, cluster, and keyspace
5051session = None
5152cluster = None
53+ sync_session = None # For synchronous performance comparison
54+ sync_cluster = None # For synchronous performance comparison
5255keyspace = "example"
5356
5457
@@ -111,7 +114,7 @@ def handle_cassandra_error(error: Exception, operation: str = "operation") -> HT
111114@asynccontextmanager
112115async def lifespan (app : FastAPI ):
113116 """Manage database lifecycle."""
114- global session , cluster
117+ global session , cluster , sync_session , sync_cluster
115118
116119 try :
117120 # Startup - connect to Cassandra with constant reconnection policy
@@ -148,6 +151,22 @@ async def lifespan(app: FastAPI):
148151 """
149152 )
150153 await session .set_keyspace ("example" )
154+
155+ # Also create sync cluster for performance comparison
156+ try :
157+ sync_cluster = SyncCluster (
158+ contact_points = contact_points ,
159+ port = int (os .getenv ("CASSANDRA_PORT" , "9042" )),
160+ reconnection_policy = ConstantReconnectionPolicy (delay = 2.0 ),
161+ connect_timeout = 10.0 ,
162+ protocol_version = 5 ,
163+ )
164+ sync_session = sync_cluster .connect ()
165+ sync_session .set_keyspace ("example" )
166+ except Exception as e :
167+ print (f"Failed to create sync cluster: { e } " )
168+ sync_session = None
169+
151170 # Drop and recreate table for clean test environment
152171 await session .execute ("DROP TABLE IF EXISTS users" )
153172 await session .execute (
@@ -166,8 +185,14 @@ async def lifespan(app: FastAPI):
166185 yield
167186
168187 # Shutdown
169- await session .close ()
170- await cluster .shutdown ()
188+ if session :
189+ await session .close ()
190+ if cluster :
191+ await cluster .shutdown ()
192+ if sync_session :
193+ sync_session .shutdown ()
194+ if sync_cluster :
195+ sync_cluster .shutdown ()
171196
172197
173198# Create FastAPI app
@@ -678,40 +703,51 @@ async def execute_query():
678703
679704@app .get ("/performance/sync" )
680705async def test_sync_performance (requests : int = Query (100 , ge = 1 , le = 1000 )):
681- """Test sync-style performance (sequential execution) ."""
682- if session is None :
706+ """Test TRUE sync performance using synchronous cassandra-driver ."""
707+ if sync_session is None :
683708 raise HTTPException (
684709 status_code = 503 ,
685- detail = "Service temporarily unavailable: Cassandra connection not established" ,
710+ detail = "Service temporarily unavailable: Sync Cassandra connection not established" ,
686711 )
687712
688713 import time
689714
690715 try :
691- start_time = time .time ()
716+ # Run synchronous operations in a thread pool to not block the event loop
717+ import concurrent .futures
692718
693- # Prepare statement once
694- stmt = await session . prepare ( "SELECT * FROM users LIMIT 1" )
719+ def run_sync_test ():
720+ start_time = time . time ( )
695721
696- # Execute queries sequentially
697- results = []
698- for _ in range (requests ):
699- result = await session .execute (stmt )
700- results .append (result )
722+ # Prepare statement once
723+ stmt = sync_session .prepare ("SELECT * FROM users LIMIT 1" )
701724
702- end_time = time .time ()
703- duration = end_time - start_time
725+ # Execute queries sequentially with the SYNC driver
726+ results = []
727+ for _ in range (requests ):
728+ result = sync_session .execute (stmt )
729+ results .append (result )
704730
705- return {
706- "requests" : requests ,
707- "total_time" : duration ,
708- "requests_per_second" : requests / duration if duration > 0 else 0 ,
709- "avg_time_per_request" : duration / requests if requests > 0 else 0 ,
710- "successful_requests" : len (results ),
711- "mode" : "sync" ,
712- }
731+ end_time = time .time ()
732+ duration = end_time - start_time
733+
734+ return {
735+ "requests" : requests ,
736+ "total_time" : duration ,
737+ "requests_per_second" : requests / duration if duration > 0 else 0 ,
738+ "avg_time_per_request" : duration / requests if requests > 0 else 0 ,
739+ "successful_requests" : len (results ),
740+ "mode" : "sync (true blocking)" ,
741+ }
742+
743+ # Run in thread pool to avoid blocking the event loop
744+ loop = asyncio .get_event_loop ()
745+ with concurrent .futures .ThreadPoolExecutor () as pool :
746+ result = await loop .run_in_executor (pool , run_sync_test )
747+
748+ return result
713749 except Exception as e :
714- raise handle_cassandra_error (e , "performance test" )
750+ raise handle_cassandra_error (e , "sync performance test" )
715751
716752
717753# Batch operations endpoint
0 commit comments