Skip to content

Commit 2fa8873

Browse files
NO-SNOW: Add more tests for async crl
1 parent 08b90a2 commit 2fa8873

File tree

2 files changed

+181
-2
lines changed

2 files changed

+181
-2
lines changed

src/snowflake/connector/aio/_session_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from ..ssl_wrap_socket import (
1717
FEATURE_CRL_CONFIG,
1818
FEATURE_OCSP_RESPONSE_CACHE_FILE_NAME,
19-
get_current_session_manager,
2019
load_trusted_certificates,
2120
resolve_cafile,
2221
)
@@ -117,10 +116,11 @@ def validate_crl(self, protocol: ResponseHandler, req: ClientRequest):
117116
cafile_for_ctx = resolve_cafile({"ca_certs": certifi.where()})
118117
crl_validator = CRLValidator.from_config(
119118
FEATURE_CRL_CONFIG,
120-
get_current_session_manager(),
119+
self._session_manager,
121120
trusted_certificates=load_trusted_certificates(cafile_for_ctx),
122121
)
123122
sll_object = protocol.transport.get_extra_info("ssl_object")
123+
# TODO(asyncio): SNOW-2681061 Add sync support for validate_connection
124124
if not crl_validator.validate_connection(sll_object):
125125
raise OperationalError(
126126
msg=(
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#!/usr/bin/env python
2+
"""
3+
CRL (Certificate Revocation List) Validation Integration Tests - Async version
4+
5+
These tests verify that CRL validation works correctly with real Snowflake connections
6+
in different modes: DISABLED, ADVISORY, and ENABLED.
7+
"""
8+
from __future__ import annotations
9+
10+
import tempfile
11+
12+
import pytest
13+
14+
15+
@pytest.mark.skipolddriver
16+
async def test_crl_validation_enabled_mode(conn_cnx):
17+
"""Test that connection works with CRL validation in ENABLED mode."""
18+
# ENABLED mode should work for normal Snowflake connections since they typically
19+
# have valid certificates with proper CRL distribution points
20+
async with conn_cnx(
21+
cert_revocation_check_mode="ENABLED",
22+
allow_certificates_without_crl_url=True, # Allow certs without CRL URLs
23+
crl_connection_timeout_ms=5000, # 5 second timeout
24+
crl_read_timeout_ms=5000, # 5 second timeout
25+
disable_ocsp_checks=True,
26+
) as cnx:
27+
assert cnx, "Connection should succeed with CRL validation in ENABLED mode"
28+
29+
# Verify we can execute a simple query
30+
cur = cnx.cursor()
31+
await cur.execute("SELECT 1")
32+
result = await cur.fetchone()
33+
assert result[0] == 1, "Query should execute successfully"
34+
await cur.close()
35+
36+
# Verify CRL settings were applied
37+
assert cnx.cert_revocation_check_mode == "ENABLED"
38+
assert cnx.allow_certificates_without_crl_url is True
39+
40+
41+
@pytest.mark.skipolddriver
42+
async def test_crl_validation_advisory_mode(conn_cnx):
43+
"""Test that connection works with CRL validation in ADVISORY mode."""
44+
# ADVISORY mode should be more lenient and allow connections even if CRL checks fail
45+
async with conn_cnx(
46+
cert_revocation_check_mode="ADVISORY",
47+
allow_certificates_without_crl_url=False, # Don't allow certs without CRL URLs
48+
crl_connection_timeout_ms=3000, # 3 second timeout
49+
crl_read_timeout_ms=3000, # 3 second timeout
50+
enable_crl_cache=True, # Enable caching
51+
crl_cache_validity_hours=1, # Cache for 1 hour
52+
) as cnx:
53+
assert cnx, "Connection should succeed with CRL validation in ADVISORY mode"
54+
55+
# Verify we can execute a simple query
56+
cur = cnx.cursor()
57+
await cur.execute("SELECT CURRENT_VERSION()")
58+
result = await cur.fetchone()
59+
assert result[0], "Query should return a version string"
60+
await cur.close()
61+
62+
# Verify CRL settings were applied
63+
assert cnx.cert_revocation_check_mode == "ADVISORY"
64+
assert cnx.allow_certificates_without_crl_url is False
65+
assert cnx.enable_crl_cache is True
66+
assert cnx.crl_connection_timeout_ms == 3000
67+
assert cnx.crl_read_timeout_ms == 3000
68+
assert cnx.crl_cache_validity_hours == 1
69+
assert cnx.crl_cache_dir is None
70+
71+
72+
@pytest.mark.skipolddriver
73+
async def test_crl_validation_disabled_mode(conn_cnx):
74+
"""Test that connection works with CRL validation in DISABLED mode (default)."""
75+
# DISABLED mode should work without any CRL checks
76+
async with conn_cnx(
77+
cert_revocation_check_mode="DISABLED",
78+
) as cnx:
79+
assert cnx, "Connection should succeed with CRL validation in DISABLED mode"
80+
81+
# Verify we can execute a simple query
82+
cur = cnx.cursor()
83+
await cur.execute("SELECT 'CRL_DISABLED' as test_value")
84+
result = await cur.fetchone()
85+
assert result[0] == "CRL_DISABLED", "Query should execute successfully"
86+
await cur.close()
87+
88+
# Verify CRL settings were applied
89+
assert cnx.cert_revocation_check_mode == "DISABLED"
90+
91+
92+
@pytest.mark.skipolddriver
93+
@pytest.mark.parametrize(
94+
"crl_mode,allow_without_crl,should_succeed",
95+
[
96+
("DISABLED", True, True), # DISABLED mode always succeeds
97+
("DISABLED", False, True), # DISABLED mode always succeeds
98+
("ADVISORY", True, True), # ADVISORY mode is lenient
99+
("ADVISORY", False, True), # ADVISORY mode is lenient
100+
("ENABLED", True, True), # ENABLED with allow_without_crl should succeed
101+
("ENABLED", False, True), # ENABLED might succeed if certs have valid CRL URLs
102+
],
103+
)
104+
async def test_crl_validation_modes_parametrized(
105+
conn_cnx, crl_mode, allow_without_crl, should_succeed
106+
):
107+
"""Parametrized test for different CRL validation modes and settings."""
108+
try:
109+
async with conn_cnx(
110+
cert_revocation_check_mode=crl_mode,
111+
allow_certificates_without_crl_url=allow_without_crl,
112+
crl_connection_timeout_ms=5000,
113+
crl_read_timeout_ms=5000,
114+
) as cnx:
115+
if should_succeed:
116+
assert (
117+
cnx
118+
), f"Connection should succeed with mode={crl_mode}, allow_without_crl={allow_without_crl}"
119+
120+
# Test basic functionality
121+
cur = cnx.cursor()
122+
await cur.execute("SELECT 1")
123+
result = await cur.fetchone()
124+
assert result[0] == 1, "Basic query should work"
125+
await cur.close()
126+
127+
# Verify settings
128+
assert cnx.cert_revocation_check_mode == crl_mode
129+
assert cnx.allow_certificates_without_crl_url == allow_without_crl
130+
else:
131+
pytest.fail(
132+
f"Connection should have failed with mode={crl_mode}, allow_without_crl={allow_without_crl}"
133+
)
134+
135+
except Exception as e:
136+
if should_succeed:
137+
pytest.fail(
138+
f"Connection unexpectedly failed with mode={crl_mode}, allow_without_crl={allow_without_crl}: {e}"
139+
)
140+
else:
141+
# Expected failure - verify it's a connection-related error
142+
assert (
143+
"revoked" in str(e).lower() or "crl" in str(e).lower()
144+
), f"Expected CRL-related error, got: {e}"
145+
146+
147+
@pytest.mark.skipolddriver
148+
async def test_crl_cache_configuration(conn_cnx):
149+
"""Test CRL cache configuration options."""
150+
with tempfile.TemporaryDirectory() as temp_dir:
151+
async with conn_cnx(
152+
cert_revocation_check_mode="ADVISORY", # Use advisory to avoid strict failures
153+
enable_crl_cache=True,
154+
enable_crl_file_cache=True,
155+
crl_cache_dir=temp_dir,
156+
crl_cache_validity_hours=2,
157+
crl_cache_removal_delay_days=1,
158+
crl_cache_cleanup_interval_hours=1,
159+
crl_cache_start_cleanup=False, # Don't start background cleanup in tests
160+
) as cnx:
161+
assert cnx, "Connection should succeed with CRL cache configuration"
162+
163+
# Verify cache settings were applied
164+
assert cnx.enable_crl_cache is True
165+
assert cnx.enable_crl_file_cache is True
166+
assert cnx.crl_cache_dir == temp_dir
167+
assert cnx.crl_cache_validity_hours == 2
168+
assert cnx.crl_cache_removal_delay_days == 1
169+
assert cnx.crl_cache_cleanup_interval_hours == 1
170+
assert cnx.crl_cache_start_cleanup is False
171+
172+
# Test basic functionality
173+
cur = cnx.cursor()
174+
await cur.execute("SELECT 'cache_test' as result")
175+
result = await cur.fetchone()
176+
assert (
177+
result[0] == "cache_test"
178+
), "Query should work with cache configuration"
179+
await cur.close()

0 commit comments

Comments
 (0)