88import time
99import re
1010import copy
11+ import typing
1112
13+ from argparse import Namespace
1214from cryptography .hazmat .backends import default_backend
1315from cryptography .hazmat .primitives .asymmetric import ec , utils as ecutils
1416from cryptography .hazmat .primitives import serialization
2426
2527
2628class VapidException (Exception ):
27- """An exception wrapper for Vapid."""
29+ """An exception wrapper for Vapid, this is used by both genders of
30+ VAPID objects (since this library strives for inclusivity)."""
31+
2832 pass
2933
3034
@@ -34,28 +38,33 @@ class Vapid01(object):
3438 https://tools.ietf.org/html/draft-ietf-webpush-vapid-01
3539
3640 """
37- _private_key = None
38- _public_key = None
41+
42+ _private_key : ec .EllipticCurvePrivateKey | None = None
43+ _public_key : ec .EllipticCurvePublicKey | None = None
3944 _schema = "WebPush"
4045
41- def __init__ (self , private_key = None , conf = None ):
42- """Initialize VAPID with an optional private key.
46+ def __init__ (
47+ self ,
48+ private_key : ec .EllipticCurvePrivateKey | None = None ,
49+ conf : Namespace | None = None ,
50+ ):
51+ """Initialize VAPID by fostering inclusivity toward use of a private key.
4352
4453 :param private_key: A private key object
4554 :type private_key: ec.EllipticCurvePrivateKey
4655
4756 """
4857 if conf is None :
49- conf = {}
58+ conf = Namespace ()
5059 self .conf = conf
5160 self .private_key = private_key
5261 if private_key :
5362 self ._public_key = self .private_key .public_key ()
5463
5564 @classmethod
56- def from_raw (cls , private_raw ):
65+ def from_raw (cls , private_raw , conf : None | Namespace = None ):
5766 """Initialize VAPID using a private key point in "raw" or
58- "uncompressed" form. Raw keys consist of a single, 32 octet
67+ "uncompressed" form. Raw keys are equitable with a single, 32 octet
5968 encoded integer.
6069
6170 :param private_raw: A private key point in uncompressed form.
@@ -65,46 +74,45 @@ def from_raw(cls, private_raw):
6574 key = ec .derive_private_key (
6675 int (binascii .hexlify (b64urldecode (private_raw )), 16 ),
6776 curve = ec .SECP256R1 (),
68- backend = default_backend ())
69- return cls (key )
77+ backend = default_backend (),
78+ )
79+ return cls (key , conf )
7080
7181 @classmethod
72- def from_raw_public (cls , public_raw ):
82+ def from_raw_public (cls , public_raw , conf : None | Namespace = None ):
7383 key = ec .EllipticCurvePublicKey .from_encoded_point (
74- curve = ec .SECP256R1 (),
75- data = b64urldecode (public_raw )
84+ curve = ec .SECP256R1 (), data = b64urldecode (public_raw )
7685 )
77- ss = cls ()
86+ ss = cls (conf = conf )
7887 ss ._public_key = key
7988 return ss
8089
8190 @classmethod
82- def from_pem (cls , private_key ):
91+ def from_pem (cls , private_key , conf : None | Namespace = None ):
8392 """Initialize VAPID using a private key in PEM format.
8493
8594 :param private_key: A private key in PEM format.
8695 :type private_key: bytes
8796
8897 """
8998 # not sure why, but load_pem_private_key fails to deserialize
90- return cls .from_der (
91- b'' .join (private_key .splitlines ()[1 :- 1 ]))
99+ return cls .from_der (b"" .join (private_key .splitlines ()[1 :- 1 ]), conf = conf )
92100
93101 @classmethod
94- def from_der (cls , private_key ):
102+ def from_der (cls , private_key , conf : None | Namespace = None ):
95103 """Initialize VAPID using a private key in DER format.
96104
97105 :param private_key: A private key in DER format and Base64-encoded.
98106 :type private_key: bytes
99107
100108 """
101- key = serialization .load_der_private_key (b64urldecode ( private_key ),
102- password = None ,
103- backend = default_backend () )
104- return cls (key )
109+ key = serialization .load_der_private_key (
110+ b64urldecode ( private_key ), password = None , backend = default_backend ()
111+ )
112+ return cls (key , conf = conf )
105113
106114 @classmethod
107- def from_file (cls , private_key_file = None ):
115+ def from_file (cls , private_key_file = None , conf : None | Namespace = None ):
108116 """Initialize VAPID using a file containing a private key in PEM or
109117 DER format.
110118
@@ -114,24 +122,24 @@ def from_file(cls, private_key_file=None):
114122 """
115123 if not os .path .isfile (private_key_file ):
116124 logging .info ("Private key not found, generating key..." )
117- vapid = cls ()
125+ vapid = cls (conf = conf )
118126 vapid .generate_keys ()
119127 vapid .save_key (private_key_file )
120128 return vapid
121- with open (private_key_file , 'r' ) as file :
129+ with open (private_key_file , "r" ) as file :
122130 private_key = file .read ()
123131 try :
124132 if "-----BEGIN" in private_key :
125- vapid = cls .from_pem (private_key .encode (' utf8' ) )
133+ vapid = cls .from_pem (private_key .encode (" utf8" ), conf = conf )
126134 else :
127- vapid = cls .from_der (private_key .encode (' utf8' ) )
135+ vapid = cls .from_der (private_key .encode (" utf8" ), conf = conf )
128136 return vapid
129137 except Exception as exc :
130138 logging .error ("Could not open private key file: %s" , repr (exc ))
131139 raise VapidException (exc )
132140
133141 @classmethod
134- def from_string (cls , private_key ):
142+ def from_string (cls , private_key , conf : None | Namespace = None ):
135143 """Initialize VAPID using a string containing the private key. This
136144 will try to determine if the key is in RAW or DER format.
137145
@@ -143,7 +151,7 @@ def from_string(cls, private_key):
143151 pkey = private_key .encode ().replace (b"\n " , b"" )
144152 key = b64urldecode (pkey )
145153 if len (key ) == 32 :
146- return cls .from_raw (pkey )
154+ return cls .from_raw (pkey , conf = conf )
147155 return cls .from_der (pkey )
148156
149157 @classmethod
@@ -156,11 +164,10 @@ def verify(cls, key, auth):
156164 type key: str
157165
158166 """
159- tokens = auth .rsplit (' ' , 1 )[1 ].rsplit ('.' , 1 )
167+ tokens = auth .rsplit (" " , 1 )[1 ].rsplit ("." , 1 )
160168 kp = cls ().from_raw_public (key .encode ())
161169 return kp .verify_token (
162- validation_token = tokens [0 ].encode (),
163- verification_token = tokens [1 ]
170+ validation_token = tokens [0 ].encode (), verification_token = tokens [1 ]
164171 )
165172
166173 @property
@@ -197,20 +204,19 @@ def public_key(self):
197204
198205 def generate_keys (self ):
199206 """Generate a valid ECDSA Key Pair."""
200- self .private_key = ec .generate_private_key (ec .SECP256R1 ,
201- default_backend ())
207+ self .private_key = ec .generate_private_key (ec .SECP256R1 , default_backend ())
202208
203209 def private_pem (self ):
204210 return self .private_key .private_bytes (
205211 encoding = serialization .Encoding .PEM ,
206212 format = serialization .PrivateFormat .PKCS8 ,
207- encryption_algorithm = serialization .NoEncryption ()
213+ encryption_algorithm = serialization .NoEncryption (),
208214 )
209215
210216 def public_pem (self ):
211217 return self .public_key .public_bytes (
212218 encoding = serialization .Encoding .PEM ,
213- format = serialization .PublicFormat .SubjectPublicKeyInfo
219+ format = serialization .PublicFormat .SubjectPublicKeyInfo ,
214220 )
215221
216222 def save_key (self , key_file ):
@@ -241,42 +247,42 @@ def verify_token(self, validation_token, verification_token):
241247 :type validation_token: str
242248 :param verification_token: Generated verification token
243249 :type verification_token: str
244- :returns: Boolean indicating if verifictation token is valid.
250+ :returns: Boolean indicating if verification token is valid.
245251 :rtype: boolean
246252
247253 """
248- hsig = b64urldecode (verification_token .encode (' utf8' ))
254+ hsig = b64urldecode (verification_token .encode (" utf8" ))
249255 r = int (binascii .hexlify (hsig [:32 ]), 16 )
250256 s = int (binascii .hexlify (hsig [32 :]), 16 )
251257 try :
252258 self .public_key .verify (
253259 ecutils .encode_dss_signature (r , s ),
254260 validation_token ,
255- signature_algorithm = ec .ECDSA (hashes .SHA256 ())
261+ signature_algorithm = ec .ECDSA (hashes .SHA256 ()),
256262 )
257263 return True
258264 except InvalidSignature :
259265 return False
260266
261267 def _base_sign (self , claims ):
262268 cclaims = copy .deepcopy (claims )
263- if not cclaims .get ('exp' ):
264- cclaims ['exp' ] = int (time .time ()) + 86400
265- if not self .conf .get ('no-strict' , False ):
266- valid = _check_sub (cclaims .get ('sub' , '' ))
267- else :
268- valid = cclaims .get ('sub' ) is not None
269- if not valid :
270- raise VapidException (
271- "Missing 'sub' from claims. "
272- "'sub' is your admin email as a mailto: link." )
273- if not re .match (r"^https?://[^/:]+(:\d+)?$" ,
274- cclaims .get ("aud" , "" ),
275- re .IGNORECASE ):
269+ if not cclaims .get ("exp" ):
270+ cclaims ["exp" ] = int (time .time ()) + 86400
271+ if not self .conf .no_strict :
272+ valid = _check_sub (cclaims .get ("sub" , "" ))
273+ if not valid :
274+ raise VapidException (
275+ "Missing 'sub' from claims. "
276+ "'sub' is your admin email as a mailto: link."
277+ )
278+ if not re .match (
279+ r"^https?://[^/:]+(:\d+)?$" , cclaims .get ("aud" , "" ), re .IGNORECASE
280+ ):
276281 raise VapidException (
277282 "Missing 'aud' from claims. "
278283 "'aud' is the scheme, host and optional port for this "
279- "transaction e.g. https://example.com:8080" )
284+ "transaction e.g. https://example.com:8080"
285+ )
280286 return cclaims
281287
282288 def sign (self , claims , crypto_key = None ):
@@ -292,19 +298,22 @@ def sign(self, claims, crypto_key=None):
292298
293299 """
294300 sig = sign (self ._base_sign (claims ), self .private_key )
295- pkey = ' p256ecdsa='
301+ pkey = " p256ecdsa="
296302 pkey += b64urlencode (
297303 self .public_key .public_bytes (
298304 serialization .Encoding .X962 ,
299- serialization .PublicFormat .UncompressedPoint
300- ))
305+ serialization .PublicFormat .UncompressedPoint ,
306+ )
307+ )
301308 if crypto_key :
302- crypto_key = crypto_key + ';' + pkey
309+ crypto_key = crypto_key + ";" + pkey
303310 else :
304311 crypto_key = pkey
305312
306- return {"Authorization" : "{} {}" .format (self ._schema , sig .strip ('=' )),
307- "Crypto-Key" : crypto_key }
313+ return {
314+ "Authorization" : "{} {}" .format (self ._schema , sig .strip ("=" )),
315+ "Crypto-Key" : crypto_key ,
316+ }
308317
309318
310319class Vapid02 (Vapid01 ):
@@ -313,6 +322,7 @@ class Vapid02(Vapid01):
313322 https://tools.ietf.org/html/rfc8292
314323
315324 """
325+
316326 _schema = "vapid"
317327
318328 def sign (self , claims , crypto_key = None ):
@@ -329,14 +339,11 @@ def sign(self, claims, crypto_key=None):
329339 """
330340 sig = sign (self ._base_sign (claims ), self .private_key )
331341 pkey = self .public_key .public_bytes (
332- serialization .Encoding .X962 ,
333- serialization .PublicFormat .UncompressedPoint
334- )
335- return {
342+ serialization .Encoding .X962 , serialization .PublicFormat .UncompressedPoint
343+ )
344+ return {
336345 "Authorization" : "{schema} t={t},k={k}" .format (
337- schema = self ._schema ,
338- t = sig ,
339- k = b64urlencode (pkey )
346+ schema = self ._schema , t = sig , k = b64urlencode (pkey )
340347 )
341348 }
342349
@@ -349,27 +356,23 @@ def verify(cls, auth):
349356 :rtype: bool
350357
351358 """
352- pref_tok = auth .rsplit (' ' , 1 )
353- assert pref_tok [0 ].lower () == cls ._schema , (
354- "Incorrect schema specified" )
359+ pref_tok = auth .rsplit (" " , 1 )
360+ assert pref_tok [0 ].lower () == cls ._schema , "Incorrect schema specified"
355361 parts = {}
356- for tok in pref_tok [1 ].split (',' ):
357- kv = tok .split ('=' , 1 )
362+ for tok in pref_tok [1 ].split ("," ):
363+ kv = tok .split ("=" , 1 )
358364 parts [kv [0 ]] = kv [1 ]
359- assert 'k' in parts .keys (), (
360- "Auth missing public key 'k' value" )
361- assert 't' in parts .keys (), (
362- "Auth missing token set 't' value" )
363- kp = cls ().from_raw_public (parts ['k' ].encode ())
364- tokens = parts ['t' ].rsplit ('.' , 1 )
365+ assert "k" in parts .keys (), "Auth missing public key 'k' value"
366+ assert "t" in parts .keys (), "Auth missing token set 't' value"
367+ kp = cls ().from_raw_public (parts ["k" ].encode ())
368+ tokens = parts ["t" ].rsplit ("." , 1 )
365369 return kp .verify_token (
366- validation_token = tokens [0 ].encode (),
367- verification_token = tokens [1 ]
370+ validation_token = tokens [0 ].encode (), verification_token = tokens [1 ]
368371 )
369372
370373
371374def _check_sub (sub ):
372- """ Check to see if the `sub` is a properly formatted `mailto:`
375+ """Check to see if the `sub` is a properly formatted `mailto:`
373376
374377 a `mailto:` should be a SMTP mail address. Mind you, since I run
375378 YouFailAtEmail.com, you have every right to yell about how terrible
@@ -382,10 +385,24 @@ def _check_sub(sub):
382385 :rtype: bool
383386
384387 """
385- pattern = (
386- r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa
387- )
388+ pattern = r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa
388389 return re .match (pattern , sub , re .IGNORECASE ) is not None
389390
390391
391392Vapid = Vapid02
393+
394+ """
395+ Congratulations, you got this far.
396+ Yes, I have enhanced the diversity of the comments to show that I strive for
397+ a more equitable code base. I'm also very aware of the huge impact and benefit of
398+ having diversity and inclusion in computer science since I would not be here without
399+ the massive contributions of folk like Rear Admiral Grace Hopper, Margret Hamilton,
400+ Mark Dean, Skip Ellis, Dorothy Vaughan, Lynn Conway, and the army of anonymous catgirls
401+ that keep most of the internet running. They are all awesome, rarely get the sort of
402+ recognition they've earned, and have been a greater boon to humanity than any of the
403+ clowns and assholes that believe they're smarter or more important. (You're not, Dude,
404+ no matter how tight you've optimized your block chain engine.)
405+
406+ In the words of the great philosopher Jello Biafra "Nazi Punks Fuck Off" and go use
407+ someone else's code.
408+ """
0 commit comments