|
57 | 57 | CAN_GET_SELECTED_OPENSSL_SIGALG = ssl.OPENSSL_VERSION_INFO >= (3, 5) |
58 | 58 | PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS') |
59 | 59 |
|
| 60 | +HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename') |
| 61 | +requires_keylog = unittest.skipUnless( |
| 62 | + HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback') |
| 63 | + |
60 | 64 | PROTOCOL_TO_TLS_VERSION = {} |
61 | 65 | for proto, ver in ( |
62 | 66 | ("PROTOCOL_SSLv3", "SSLv3"), |
@@ -265,33 +269,69 @@ def utc_offset(): #NOTE: ignore issues like #1647654 |
265 | 269 | ) |
266 | 270 |
|
267 | 271 |
|
268 | | -def test_wrap_socket(sock, *, |
269 | | - cert_reqs=ssl.CERT_NONE, ca_certs=None, |
270 | | - ciphers=None, ciphersuites=None, |
271 | | - min_version=None, max_version=None, |
272 | | - certfile=None, keyfile=None, |
273 | | - **kwargs): |
274 | | - if not kwargs.get("server_side"): |
275 | | - kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME |
276 | | - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
277 | | - else: |
| 272 | +def make_test_context( |
| 273 | + *, |
| 274 | + server_side=False, |
| 275 | + check_hostname=None, |
| 276 | + cert_reqs=ssl.CERT_NONE, |
| 277 | + ca_certs=None, certfile=None, keyfile=None, |
| 278 | + ciphers=None, ciphersuites=None, |
| 279 | + min_version=None, max_version=None, |
| 280 | +): |
| 281 | + if server_side: |
278 | 282 | context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
279 | | - if cert_reqs is not None: |
| 283 | + else: |
| 284 | + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| 285 | + |
| 286 | + if check_hostname is None: |
280 | 287 | if cert_reqs == ssl.CERT_NONE: |
281 | 288 | context.check_hostname = False |
| 289 | + else: |
| 290 | + context.check_hostname = check_hostname |
| 291 | + |
| 292 | + if cert_reqs is not None: |
282 | 293 | context.verify_mode = cert_reqs |
| 294 | + |
283 | 295 | if ca_certs is not None: |
284 | 296 | context.load_verify_locations(ca_certs) |
285 | 297 | if certfile is not None or keyfile is not None: |
286 | 298 | context.load_cert_chain(certfile, keyfile) |
| 299 | + |
287 | 300 | if ciphers is not None: |
288 | 301 | context.set_ciphers(ciphers) |
289 | 302 | if ciphersuites is not None: |
290 | 303 | context.set_ciphersuites(ciphersuites) |
| 304 | + |
291 | 305 | if min_version is not None: |
292 | 306 | context.minimum_version = min_version |
293 | 307 | if max_version is not None: |
294 | 308 | context.maximum_version = max_version |
| 309 | + |
| 310 | + return context |
| 311 | + |
| 312 | + |
| 313 | +def test_wrap_socket( |
| 314 | + sock, |
| 315 | + *, |
| 316 | + server_side=False, |
| 317 | + check_hostname=None, |
| 318 | + cert_reqs=ssl.CERT_NONE, |
| 319 | + ca_certs=None, certfile=None, keyfile=None, |
| 320 | + ciphers=None, ciphersuites=None, |
| 321 | + min_version=None, max_version=None, |
| 322 | + **kwargs |
| 323 | +): |
| 324 | + context = make_test_context( |
| 325 | + server_side=server_side, |
| 326 | + check_hostname=check_hostname, |
| 327 | + cert_reqs=cert_reqs, |
| 328 | + ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, |
| 329 | + ciphers=ciphers, ciphersuites=ciphersuites, |
| 330 | + min_version=min_version, max_version=max_version, |
| 331 | + ) |
| 332 | + kwargs["server_side"] = server_side |
| 333 | + if not server_side: |
| 334 | + kwargs.setdefault("server_hostname", SIGNED_CERTFILE_HOSTNAME) |
295 | 335 | return context.wrap_socket(sock, **kwargs) |
296 | 336 |
|
297 | 337 |
|
@@ -1730,6 +1770,39 @@ def test_num_tickest(self): |
1730 | 1770 | with self.assertRaises(ValueError): |
1731 | 1771 | ctx.num_tickets = 1 |
1732 | 1772 |
|
| 1773 | + @support.cpython_only |
| 1774 | + def test_refcycle_msg_callback(self): |
| 1775 | + # See https://github.com/python/cpython/issues/142516. |
| 1776 | + ctx = make_test_context() |
| 1777 | + def msg_callback(*args, _=ctx, **kwargs): ... |
| 1778 | + ctx._msg_callback = msg_callback |
| 1779 | + |
| 1780 | + @support.cpython_only |
| 1781 | + @requires_keylog |
| 1782 | + def test_refcycle_keylog_filename(self): |
| 1783 | + # See https://github.com/python/cpython/issues/142516. |
| 1784 | + self.addCleanup(os_helper.unlink, os_helper.TESTFN) |
| 1785 | + ctx = make_test_context() |
| 1786 | + class KeylogFilename(str): ... |
| 1787 | + ctx.keylog_filename = KeylogFilename(os_helper.TESTFN) |
| 1788 | + ctx.keylog_filename._ = ctx |
| 1789 | + |
| 1790 | + @support.cpython_only |
| 1791 | + @unittest.skipUnless(ssl.HAS_PSK, 'requires TLS-PSK') |
| 1792 | + def test_refcycle_psk_client_callback(self): |
| 1793 | + # See https://github.com/python/cpython/issues/142516. |
| 1794 | + ctx = make_test_context() |
| 1795 | + def psk_client_callback(*args, _=ctx, **kwargs): ... |
| 1796 | + ctx.set_psk_client_callback(psk_client_callback) |
| 1797 | + |
| 1798 | + @support.cpython_only |
| 1799 | + @unittest.skipUnless(ssl.HAS_PSK, 'requires TLS-PSK') |
| 1800 | + def test_refcycle_psk_server_callback(self): |
| 1801 | + # See https://github.com/python/cpython/issues/142516. |
| 1802 | + ctx = make_test_context(server_side=True) |
| 1803 | + def psk_server_callback(*args, _=ctx, **kwargs): ... |
| 1804 | + ctx.set_psk_server_callback(psk_server_callback) |
| 1805 | + |
1733 | 1806 |
|
1734 | 1807 | class SSLErrorTests(unittest.TestCase): |
1735 | 1808 |
|
@@ -5163,10 +5236,6 @@ def test_internal_chain_server(self): |
5163 | 5236 | self.assertEqual(res, b'\x02\n') |
5164 | 5237 |
|
5165 | 5238 |
|
5166 | | -HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename') |
5167 | | -requires_keylog = unittest.skipUnless( |
5168 | | - HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback') |
5169 | | - |
5170 | 5239 | class TestSSLDebug(unittest.TestCase): |
5171 | 5240 |
|
5172 | 5241 | def keylog_lines(self, fname=os_helper.TESTFN): |
|
0 commit comments