11# SPDX-License-Identifier: Apache-2.0
22
3+ import base64
4+ import hashlib
5+ import hmac
6+ import json
7+ import time
8+
39import pretend
10+ import pytest
411
512from warehouse import request
613
714
15+ class TestNormalizeDomain :
16+ @pytest .mark .parametrize (
17+ ("input_domain" , "expected" ),
18+ [
19+ # Lowercase normalization
20+ ("PyPi.ORG" , "pypi.org" ),
21+ ("TEST.PyPi.ORG" , "test.pypi.org" ),
22+ ("LOCALHOST" , "localhost" ),
23+ # Trailing dots removal
24+ ("pypi.org." , "pypi.org" ),
25+ ("pypi.org..." , "pypi.org" ),
26+ ("localhost." , "localhost" ),
27+ # Whitespace handling
28+ (" pypi.org " , "pypi.org" ),
29+ ("\t pypi.org\n " , "pypi.org" ),
30+ (" localhost " , "localhost" ),
31+ # Mixed normalizations
32+ (" TEST.PyPi.ORG. " , "test.pypi.org" ),
33+ (" LOCALHOST. " , "localhost" ),
34+ (" 127.0.0.1 " , "127.0.0.1" ),
35+ ],
36+ )
37+ def test_domain_normalization (self , input_domain , expected ):
38+ """Test that domains are properly normalized."""
39+ assert request ._normalize_domain (input_domain ) == expected
40+
41+ def test_handles_idn_domains (self ):
42+ """Test that IDN domains are converted to ASCII (punycode)."""
43+ # These are different Unicode characters that look similar
44+ assert request ._normalize_domain ("рyрі.org" ) != "pypi.org" # Cyrillic chars
45+ # The result should be the punycode version
46+ assert request ._normalize_domain ("рyрі.org" ).startswith ("xn--" )
47+
48+ def test_handles_invalid_idn_domains (self ):
49+ """Test that invalid IDN domains fall back to normalized form."""
50+ # Test with invalid Unicode that can't be encoded to IDN
51+ # Using a string with invalid surrogate characters
52+ invalid_domain = "test\udcff .org" # Contains an unpaired surrogate
53+ result = request ._normalize_domain (invalid_domain )
54+ # Should return the normalized version without failing
55+ assert result == "test\udcff .org"
56+
57+ # Test with a domain that causes encoding issues
58+ # Empty labels are not allowed in IDN
59+ invalid_domain2 = "test..org"
60+ result2 = request ._normalize_domain (invalid_domain2 )
61+ assert result2 == "test..org"
62+
63+
864class TestCreateNonce :
965 def test_generates_unique_nonces (self ):
1066 """Test that each request gets a unique nonce."""
@@ -37,39 +93,161 @@ def test_nonce_is_url_safe(self):
3793 assert re .match (r"^[A-Za-z0-9_-]+$" , nonce )
3894
3995
96+ class TestCreateIntegrityToken :
97+ def test_creates_valid_token (self ):
98+ """Test that integrity tokens are created with proper structure."""
99+ req = pretend .stub (nonce = "test-nonce-123" )
100+
101+ token = request ._create_integrity_token (req )
102+
103+ # Should be base64 encoded
104+ assert isinstance (token , str )
105+
106+ # Should be decodable
107+ decoded = base64 .b64decode (token ).decode ("utf-8" )
108+ token_data = json .loads (decoded )
109+
110+ # Should have required fields
111+ assert "ts" in token_data
112+ assert "entropy" in token_data
113+ assert "nonce" in token_data
114+
115+ # Timestamp should be recent
116+ current_time = int (time .time ())
117+ assert abs (token_data ["ts" ] - current_time ) < 5 # Within 5 seconds
118+
119+ # Entropy should be base64 encoded
120+ entropy_bytes = base64 .b64decode (token_data ["entropy" ])
121+ assert len (entropy_bytes ) == 16
122+
123+ # Nonce should match
124+ assert token_data ["nonce" ] == "test-nonce-123"
125+
126+ def test_different_requests_get_different_tokens (self ):
127+ """Test that different requests get different integrity tokens."""
128+ req1 = pretend .stub (nonce = "nonce-1" )
129+ req2 = pretend .stub (nonce = "nonce-2" )
130+
131+ token1 = request ._create_integrity_token (req1 )
132+ token2 = request ._create_integrity_token (req2 )
133+
134+ assert token1 != token2
135+
136+ # Even with same nonce, entropy should differ
137+ req3 = pretend .stub (nonce = "nonce-1" )
138+ token3 = request ._create_integrity_token (req3 )
139+
140+ assert token1 != token3
141+
142+
40143class TestCreateHashedDomains :
41- def test_hashes_domains_with_nonce (self ):
42- """Test that domains are hashed using the nonce."""
144+ def test_hashes_domains_with_enhanced_security (self ):
145+ """Test that domains are hashed using multi-layer approach."""
146+ # Create a mock integrity token
147+ token_data = {
148+ "ts" : int (time .time ()),
149+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
150+ "nonce" : "test-nonce-123" ,
151+ }
152+ integrity_token = base64 .b64encode (
153+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
154+ "utf-8"
155+ )
156+ ).decode ("ascii" )
157+
43158 req = pretend .stub (
44159 nonce = "test-nonce-123" ,
160+ integrity_token = integrity_token ,
45161 registry = pretend .stub (
46162 settings = {"warehouse.allowed_domains" : ["pypi.org" , "test.pypi.org" ]}
47163 ),
48164 )
49165
50166 hashed = request ._create_hashed_domains (req )
51167
52- # Should return comma-separated list
53- assert "," in hashed
54- hashes = hashed .split ("," )
55- assert len (hashes ) == 2
168+ # Should have pipe separators
169+ assert "|" in hashed
170+ parts = hashed .split ("|" )
171+
172+ # Should have 2 domain hashes + 1 checksum
173+ assert len (parts ) == 3
174+
175+ # Each domain hash should be 64 chars (sha256 hex)
176+ for i in range (2 ):
177+ assert len (parts [i ]) == 64
178+ assert all (c in "0123456789abcdef" for c in parts [i ])
56179
57- # Each hash should be 64 chars (sha256 hex)
58- for h in hashes :
59- assert len (h ) == 64
60- assert all (c in "0123456789abcdef" for c in h )
180+ # Checksum should be 16 chars
181+ assert len (parts [2 ]) == 16
182+ assert all (c in "0123456789abcdef" for c in parts [2 ])
61183
62184 # Hashes should be different for different domains
63- assert hashes [0 ] != hashes [1 ]
185+ assert parts [0 ] != parts [1 ]
186+
187+ def test_domain_normalization_applied (self ):
188+ """Test that domain normalization is applied before hashing."""
189+ token_data = {
190+ "ts" : int (time .time ()),
191+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
192+ "nonce" : "test-nonce-123" ,
193+ }
194+ integrity_token = base64 .b64encode (
195+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
196+ "utf-8"
197+ )
198+ ).decode ("ascii" )
199+
200+ # Two requests with different domain formats
201+ req1 = pretend .stub (
202+ nonce = "test-nonce-123" ,
203+ integrity_token = integrity_token ,
204+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["PyPi.ORG" ]}),
205+ )
206+
207+ req2 = pretend .stub (
208+ nonce = "test-nonce-123" ,
209+ integrity_token = integrity_token ,
210+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
211+ )
212+
213+ hashed1 = request ._create_hashed_domains (req1 )
214+ hashed2 = request ._create_hashed_domains (req2 )
215+
216+ # Should produce same hash despite different case
217+ assert hashed1 == hashed2
64218
65219 def test_different_nonce_produces_different_hashes (self ):
66220 """Test that different nonces produce different hashes for same domain."""
221+ token_data1 = {
222+ "ts" : int (time .time ()),
223+ "entropy" : base64 .b64encode (b"entropy-1" ).decode ("ascii" ),
224+ "nonce" : "nonce-1" ,
225+ }
226+ integrity_token1 = base64 .b64encode (
227+ json .dumps (token_data1 , sort_keys = True , separators = ("," , ":" )).encode (
228+ "utf-8"
229+ )
230+ ).decode ("ascii" )
231+
232+ token_data2 = {
233+ "ts" : int (time .time ()),
234+ "entropy" : base64 .b64encode (b"entropy-2" ).decode ("ascii" ),
235+ "nonce" : "nonce-2" ,
236+ }
237+ integrity_token2 = base64 .b64encode (
238+ json .dumps (token_data2 , sort_keys = True , separators = ("," , ":" )).encode (
239+ "utf-8"
240+ )
241+ ).decode ("ascii" )
242+
67243 req1 = pretend .stub (
68244 nonce = "nonce-1" ,
245+ integrity_token = integrity_token1 ,
69246 registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
70247 )
71248 req2 = pretend .stub (
72249 nonce = "nonce-2" ,
250+ integrity_token = integrity_token2 ,
73251 registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
74252 )
75253
@@ -78,10 +256,66 @@ def test_different_nonce_produces_different_hashes(self):
78256
79257 assert hashed1 != hashed2
80258
259+ def test_timestamp_affects_hash (self ):
260+ """Test that timestamp changes affect the hash."""
261+ token_data1 = {
262+ "ts" : int (time .time ()),
263+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
264+ "nonce" : "test-nonce" ,
265+ }
266+
267+ token_data2 = {
268+ "ts" : int (time .time ()) + 100 , # 100 seconds later
269+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
270+ "nonce" : "test-nonce" ,
271+ }
272+
273+ integrity_token1 = base64 .b64encode (
274+ json .dumps (token_data1 , sort_keys = True , separators = ("," , ":" )).encode (
275+ "utf-8"
276+ )
277+ ).decode ("ascii" )
278+
279+ integrity_token2 = base64 .b64encode (
280+ json .dumps (token_data2 , sort_keys = True , separators = ("," , ":" )).encode (
281+ "utf-8"
282+ )
283+ ).decode ("ascii" )
284+
285+ req1 = pretend .stub (
286+ nonce = "test-nonce" ,
287+ integrity_token = integrity_token1 ,
288+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
289+ )
290+
291+ req2 = pretend .stub (
292+ nonce = "test-nonce" ,
293+ integrity_token = integrity_token2 ,
294+ registry = pretend .stub (settings = {"warehouse.allowed_domains" : ["pypi.org" ]}),
295+ )
296+
297+ hashed1 = request ._create_hashed_domains (req1 )
298+ hashed2 = request ._create_hashed_domains (req2 )
299+
300+ # Hashes should be different due to different timestamps
301+ assert hashed1 != hashed2
302+
81303 def test_empty_domains_returns_empty_string (self ):
82304 """Test that empty domain list returns empty string."""
305+ token_data = {
306+ "ts" : int (time .time ()),
307+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
308+ "nonce" : "test-nonce" ,
309+ }
310+ integrity_token = base64 .b64encode (
311+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
312+ "utf-8"
313+ )
314+ ).decode ("ascii" )
315+
83316 req = pretend .stub (
84317 nonce = "test-nonce" ,
318+ integrity_token = integrity_token ,
85319 registry = pretend .stub (settings = {"warehouse.allowed_domains" : []}),
86320 )
87321
@@ -90,7 +324,66 @@ def test_empty_domains_returns_empty_string(self):
90324
91325 def test_no_domains_setting_returns_empty_string (self ):
92326 """Test that missing domains setting returns empty string."""
93- req = pretend .stub (nonce = "test-nonce" , registry = pretend .stub (settings = {}))
327+ token_data = {
328+ "ts" : int (time .time ()),
329+ "entropy" : base64 .b64encode (b"test-entropy" ).decode ("ascii" ),
330+ "nonce" : "test-nonce" ,
331+ }
332+ integrity_token = base64 .b64encode (
333+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
334+ "utf-8"
335+ )
336+ ).decode ("ascii" )
337+
338+ req = pretend .stub (
339+ nonce = "test-nonce" ,
340+ integrity_token = integrity_token ,
341+ registry = pretend .stub (settings = {}),
342+ )
94343
95344 hashed = request ._create_hashed_domains (req )
96345 assert hashed == ""
346+
347+ def test_checksum_validates_integrity (self ):
348+ """Test that the checksum properly validates the domain hashes."""
349+ token_data = {
350+ "ts" : int (time .time ()),
351+ "entropy" : base64 .b64encode (b"test-entropy-123" ).decode ("ascii" ),
352+ "nonce" : "test-nonce-123" ,
353+ }
354+ integrity_token = base64 .b64encode (
355+ json .dumps (token_data , sort_keys = True , separators = ("," , ":" )).encode (
356+ "utf-8"
357+ )
358+ ).decode ("ascii" )
359+
360+ req = pretend .stub (
361+ nonce = "test-nonce-123" ,
362+ integrity_token = integrity_token ,
363+ registry = pretend .stub (
364+ settings = {"warehouse.allowed_domains" : ["pypi.org" , "test.pypi.org" ]}
365+ ),
366+ )
367+
368+ hashed = request ._create_hashed_domains (req )
369+ parts = hashed .split ("|" )
370+
371+ # Verify checksum matches expected format
372+ checksum = parts [- 1 ]
373+ assert len (checksum ) == 16
374+
375+ # If we change a hash, the checksum should be different
376+ # Recalculate checksum with modified hash
377+ modified_hashes = parts [:- 1 ]
378+ modified_hashes [0 ] = "0" * 64 # Replace first hash with zeros
379+
380+ nonce_bytes = b"test-nonce-123"
381+ entropy_bytes = b"test-entropy-123"
382+
383+ all_hashes = "|" .join (modified_hashes )
384+ new_checksum = hmac .new (
385+ nonce_bytes + entropy_bytes , all_hashes .encode ("utf-8" ), hashlib .sha256
386+ ).hexdigest ()[:16 ]
387+
388+ # Checksums should be different
389+ assert new_checksum != checksum
0 commit comments