1+ import os
12import time
23import unittest
34
910class TestConnectionPooling (unittest .TestCase ):
1011 """Test suite for HTTP connection pooling functionality."""
1112
12- def setUp (self ):
13- """Set up test fixtures."""
14- self .client = cohere .ClientV2 ()
15-
16- def test_client_has_connection_pooling_configured (self ):
17- """Test that clients are configured with proper connection pooling limits."""
18- # Create clients
19- sync_client = cohere .Client ()
20- v2_client = cohere .ClientV2 ()
21-
22- # Verify they were created successfully
23- self .assertIsNotNone (sync_client )
24- self .assertIsNotNone (v2_client )
13+ @classmethod
14+ def setUpClass (cls ):
15+ """Set up class-level fixtures."""
16+ # Check if API key is available for integration tests
17+ cls .api_key_available = bool (os .environ .get ("CO_API_KEY" ))
18+
19+ def test_httpx_client_creation_with_limits (self ):
20+ """Test that httpx clients can be created with our connection pooling limits."""
21+ # Test creating httpx client with limits (our implementation)
22+ client_with_limits = httpx .Client (
23+ timeout = 300 ,
24+ limits = httpx .Limits (
25+ max_keepalive_connections = 20 ,
26+ max_connections = 100 ,
27+ keepalive_expiry = 30.0 ,
28+ ),
29+ )
30+
31+ # Verify the client was created successfully
32+ self .assertIsNotNone (client_with_limits )
33+ self .assertIsInstance (client_with_limits , httpx .Client )
34+
35+ # The limits are applied internally - we can't directly access them
36+ # but we verify the client works correctly with our configuration
37+
38+ client_with_limits .close ()
39+
40+ def test_cohere_client_initialization (self ):
41+ """Test that Cohere clients can be initialized with connection pooling."""
42+ # Test with dummy API key - just verifies initialization works
43+ try :
44+ sync_client = cohere .Client (api_key = "dummy-key" )
45+ v2_client = cohere .ClientV2 (api_key = "dummy-key" )
46+
47+ # Verify clients were created
48+ self .assertIsNotNone (sync_client )
49+ self .assertIsNotNone (v2_client )
50+
51+ except Exception as e :
52+ # Should not fail due to httpx configuration
53+ if "httpx" in str (e ).lower () or "limits" in str (e ).lower ():
54+ self .fail (f"Failed to create client with connection pooling: { e } " )
2555
26- # The actual connection pooling is configured internally in httpx
27- # We mainly verify that clients can be created without errors
56+ def test_custom_httpx_client_with_pooling (self ):
57+ """Test that custom httpx clients with connection pooling work correctly."""
58+ # Create custom httpx client with explicit pooling configuration
59+ custom_client = httpx .Client (
60+ timeout = 30 ,
61+ limits = httpx .Limits (
62+ max_keepalive_connections = 10 ,
63+ max_connections = 50 ,
64+ keepalive_expiry = 20.0 ,
65+ ),
66+ )
2867
29- def test_multiple_requests_performance (self ):
30- """Test that multiple requests show performance improvement with connection pooling."""
31- # Skip if no API key is available
68+ # Create Cohere client with custom httpx client
69+ try :
70+ client = cohere .ClientV2 (api_key = "dummy-key" , httpx_client = custom_client )
71+ self .assertIsNotNone (client )
72+ finally :
73+ custom_client .close ()
74+
75+ def test_connection_pooling_vs_no_pooling_setup (self ):
76+ """Test creating clients with and without connection pooling."""
77+ # Create httpx client without pooling
78+ no_pool_httpx = httpx .Client (
79+ timeout = 30 ,
80+ limits = httpx .Limits (
81+ max_keepalive_connections = 0 ,
82+ max_connections = 1 ,
83+ keepalive_expiry = 0 ,
84+ ),
85+ )
86+
87+ # Verify both configurations work
3288 try :
33- client = cohere .ClientV2 ()
34- except Exception :
35- self .skipTest ("No API key available" )
89+ pooled_client = cohere .ClientV2 (api_key = "dummy-key" )
90+ no_pool_client = cohere .ClientV2 (api_key = "dummy-key" , httpx_client = no_pool_httpx )
91+
92+ self .assertIsNotNone (pooled_client )
93+ self .assertIsNotNone (no_pool_client )
94+
95+ finally :
96+ no_pool_httpx .close ()
3697
98+ @unittest .skipIf (not os .environ .get ("CO_API_KEY" ), "API key not available" )
99+ def test_multiple_requests_performance (self ):
100+ """Test that multiple requests benefit from connection pooling."""
101+ client = cohere .ClientV2 ()
102+
37103 response_times = []
38-
39- # Make multiple requests and measure response times
104+
105+ # Make multiple requests
40106 for i in range (3 ):
41107 start_time = time .time ()
42108 try :
43109 response = client .chat (
44110 model = "command-r-plus-08-2024" ,
45- messages = [{"role" : "user" , "content" : f"Say { i + 1 } " }],
111+ messages = [{"role" : "user" , "content" : f"Say the number { i + 1 } " }],
46112 )
47113 elapsed = time .time () - start_time
48114 response_times .append (elapsed )
49-
50- # Verify response is valid
115+
116+ # Verify response
51117 self .assertIsNotNone (response )
52118 self .assertIsNotNone (response .message )
53- self .assertIsNotNone (response .message .content )
54-
55- # Wait to avoid rate limits
119+
120+ # Rate limit protection
56121 if i < 2 :
57122 time .sleep (2 )
58-
123+
59124 except Exception as e :
60- # Skip test if rate limited or other API errors
61125 if "429" in str (e ) or "rate" in str (e ).lower ():
62- self .skipTest (f "Rate limited: { e } " )
126+ self .skipTest ("Rate limited" )
63127 raise
64-
65- # Generally, subsequent requests should be faster due to connection reuse
66- # We just verify all requests completed successfully
128+
129+ # Verify all requests completed
67130 self .assertEqual (len (response_times ), 3 )
68- self .assertTrue (all (t > 0 for t in response_times ))
131+
132+ # Generally, subsequent requests should be faster due to connection reuse
133+ # First request establishes connection, subsequent ones reuse it
134+ print (f"Response times: { response_times } " )
69135
70- def test_streaming_with_connection_pooling (self ):
136+ @unittest .skipIf (not os .environ .get ("CO_API_KEY" ), "API key not available" )
137+ def test_streaming_with_pooling (self ):
71138 """Test that streaming works correctly with connection pooling."""
72- try :
73- client = cohere .ClientV2 ()
74- except Exception :
75- self .skipTest ("No API key available" )
76-
139+ client = cohere .ClientV2 ()
140+
77141 try :
78142 response = client .chat_stream (
79143 model = "command-r-plus-08-2024" ,
80144 messages = [{"role" : "user" , "content" : "Count to 3" }],
81145 )
82-
146+
83147 chunks = []
84148 for event in response :
85149 if event .type == "content-delta" :
86150 chunks .append (event .delta .message .content .text )
87-
88- # Verify we received chunks
151+
152+ # Verify streaming worked
89153 self .assertGreater (len (chunks ), 0 )
90154 full_response = "" .join (chunks )
91155 self .assertGreater (len (full_response ), 0 )
92-
156+
93157 except Exception as e :
94158 if "429" in str (e ) or "rate" in str (e ).lower ():
95- self .skipTest (f "Rate limited: { e } " )
159+ self .skipTest ("Rate limited" )
96160 raise
97161
98- def test_custom_httpx_client_with_pooling (self ):
99- """Test that custom httpx clients with connection pooling work correctly."""
100- # Create custom httpx client with explicit pooling configuration
101- custom_client = httpx .Client (
102- timeout = 30 ,
103- limits = httpx .Limits (
104- max_keepalive_connections = 10 ,
105- max_connections = 50 ,
106- keepalive_expiry = 20.0 ,
107- ),
108- )
109-
110- # Create Cohere client with custom httpx client
111- client = cohere .ClientV2 (httpx_client = custom_client )
112- self .assertIsNotNone (client )
113-
114- # Clean up
115- custom_client .close ()
116-
117- def test_connection_pooling_vs_no_pooling (self ):
118- """Compare performance with and without connection pooling."""
119- try :
120- # Test with connection pooling (default)
121- pooled_client = cohere .ClientV2 ()
122-
123- # Test without connection pooling
124- no_pool_httpx = httpx .Client (
125- timeout = 30 ,
126- limits = httpx .Limits (
127- max_keepalive_connections = 0 ,
128- max_connections = 1 ,
129- keepalive_expiry = 0 ,
130- ),
131- )
132- no_pool_client = cohere .ClientV2 (httpx_client = no_pool_httpx )
133-
134- # Just verify both clients work
135- self .assertIsNotNone (pooled_client )
136- self .assertIsNotNone (no_pool_client )
137-
138- # Clean up
139- no_pool_httpx .close ()
140-
141- except Exception :
142- self .skipTest ("No API key available" )
143-
144162
145163if __name__ == "__main__" :
146164 unittest .main ()
0 commit comments