summaryrefslogtreecommitdiff
path: root/pkgs/development/python-modules/py-vapid/cryptography.patch
blob: d38088931b4479004bf2b5146b9c8da55e9e1939 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
--- a/py_vapid/__init__.py
+++ b/py_vapid/__init__.py
@@ -25,6 +25,7 @@
 
 class VapidException(Exception):
     """An exception wrapper for Vapid."""
+
     pass
 
 
@@ -34,6 +35,7 @@ class Vapid01(object):
     https://tools.ietf.org/html/draft-ietf-webpush-vapid-01
 
     """
+
     _private_key = None
     _public_key = None
     _schema = "WebPush"
@@ -65,14 +67,14 @@ def from_raw(cls, private_raw):
         key = ec.derive_private_key(
             int(binascii.hexlify(b64urldecode(private_raw)), 16),
             curve=ec.SECP256R1(),
-            backend=default_backend())
+            backend=default_backend(),
+        )
         return cls(key)
 
     @classmethod
     def from_raw_public(cls, public_raw):
         key = ec.EllipticCurvePublicKey.from_encoded_point(
-            curve=ec.SECP256R1(),
-            data=b64urldecode(public_raw)
+            curve=ec.SECP256R1(), data=b64urldecode(public_raw)
         )
         ss = cls()
         ss._public_key = key
@@ -87,8 +89,7 @@ def from_pem(cls, private_key):
 
         """
         # not sure why, but load_pem_private_key fails to deserialize
-        return cls.from_der(
-            b''.join(private_key.splitlines()[1:-1]))
+        return cls.from_der(b"".join(private_key.splitlines()[1:-1]))
 
     @classmethod
     def from_der(cls, private_key):
@@ -98,9 +99,9 @@ def from_der(cls, private_key):
         :type private_key: bytes
 
         """
-        key = serialization.load_der_private_key(b64urldecode(private_key),
-                                                 password=None,
-                                                 backend=default_backend())
+        key = serialization.load_der_private_key(
+            b64urldecode(private_key), password=None, backend=default_backend()
+        )
         return cls(key)
 
     @classmethod
@@ -118,13 +119,13 @@ def from_file(cls, private_key_file=None):
             vapid.generate_keys()
             vapid.save_key(private_key_file)
             return vapid
-        with open(private_key_file, 'r') as file:
+        with open(private_key_file, "r") as file:
             private_key = file.read()
         try:
             if "-----BEGIN" in private_key:
-                vapid = cls.from_pem(private_key.encode('utf8'))
+                vapid = cls.from_pem(private_key.encode("utf8"))
             else:
-                vapid = cls.from_der(private_key.encode('utf8'))
+                vapid = cls.from_der(private_key.encode("utf8"))
             return vapid
         except Exception as exc:
             logging.error("Could not open private key file: %s", repr(exc))
@@ -156,11 +157,10 @@ def verify(cls, key, auth):
         type key: str
 
         """
-        tokens = auth.rsplit(' ', 1)[1].rsplit('.', 1)
+        tokens = auth.rsplit(" ", 1)[1].rsplit(".", 1)
         kp = cls().from_raw_public(key.encode())
         return kp.verify_token(
-            validation_token=tokens[0].encode(),
-            verification_token=tokens[1]
+            validation_token=tokens[0].encode(), verification_token=tokens[1]
         )
 
     @property
@@ -197,20 +197,19 @@ def public_key(self):
 
     def generate_keys(self):
         """Generate a valid ECDSA Key Pair."""
-        self.private_key = ec.generate_private_key(ec.SECP256R1,
-                                                   default_backend())
+        self.private_key = ec.generate_private_key(curve=ec.SECP256R1(), backend=default_backend())
 
     def private_pem(self):
         return self.private_key.private_bytes(
             encoding=serialization.Encoding.PEM,
             format=serialization.PrivateFormat.PKCS8,
-            encryption_algorithm=serialization.NoEncryption()
+            encryption_algorithm=serialization.NoEncryption(),
         )
 
     def public_pem(self):
         return self.public_key.public_bytes(
             encoding=serialization.Encoding.PEM,
-            format=serialization.PublicFormat.SubjectPublicKeyInfo
+            format=serialization.PublicFormat.SubjectPublicKeyInfo,
         )
 
     def save_key(self, key_file):
@@ -245,14 +244,14 @@ def verify_token(self, validation_token, verification_token):
         :rtype: boolean
 
         """
-        hsig = b64urldecode(verification_token.encode('utf8'))
+        hsig = b64urldecode(verification_token.encode("utf8"))
         r = int(binascii.hexlify(hsig[:32]), 16)
         s = int(binascii.hexlify(hsig[32:]), 16)
         try:
             self.public_key.verify(
                 ecutils.encode_dss_signature(r, s),
                 validation_token,
-                signature_algorithm=ec.ECDSA(hashes.SHA256())
+                signature_algorithm=ec.ECDSA(hashes.SHA256()),
             )
             return True
         except InvalidSignature:
@@ -260,23 +259,25 @@ def verify_token(self, validation_token, verification_token):
 
     def _base_sign(self, claims):
         cclaims = copy.deepcopy(claims)
-        if not cclaims.get('exp'):
-            cclaims['exp'] = int(time.time()) + 86400
-        if not self.conf.get('no-strict', False):
-            valid = _check_sub(cclaims.get('sub', ''))
+        if not cclaims.get("exp"):
+            cclaims["exp"] = int(time.time()) + 86400
+        if not self.conf.get("no-strict", False):
+            valid = _check_sub(cclaims.get("sub", ""))
         else:
-            valid = cclaims.get('sub') is not None
+            valid = cclaims.get("sub") is not None
         if not valid:
             raise VapidException(
                 "Missing 'sub' from claims. "
-                "'sub' is your admin email as a mailto: link.")
-        if not re.match(r"^https?://[^/:]+(:\d+)?$",
-                        cclaims.get("aud", ""),
-                        re.IGNORECASE):
+                "'sub' is your admin email as a mailto: link."
+            )
+        if not re.match(
+            r"^https?://[^/:]+(:\d+)?$", cclaims.get("aud", ""), re.IGNORECASE
+        ):
             raise VapidException(
                 "Missing 'aud' from claims. "
                 "'aud' is the scheme, host and optional port for this "
-                "transaction e.g. https://example.com:8080")
+                "transaction e.g. https://example.com:8080"
+            )
         return cclaims
 
     def sign(self, claims, crypto_key=None):
@@ -292,19 +293,22 @@ def sign(self, claims, crypto_key=None):
 
         """
         sig = sign(self._base_sign(claims), self.private_key)
-        pkey = 'p256ecdsa='
+        pkey = "p256ecdsa="
         pkey += b64urlencode(
             self.public_key.public_bytes(
                 serialization.Encoding.X962,
-                serialization.PublicFormat.UncompressedPoint
-            ))
+                serialization.PublicFormat.UncompressedPoint,
+            )
+        )
         if crypto_key:
-            crypto_key = crypto_key + ';' + pkey
+            crypto_key = crypto_key + ";" + pkey
         else:
             crypto_key = pkey
 
-        return {"Authorization": "{} {}".format(self._schema, sig.strip('=')),
-                "Crypto-Key": crypto_key}
+        return {
+            "Authorization": "{} {}".format(self._schema, sig.strip("=")),
+            "Crypto-Key": crypto_key,
+        }
 
 
 class Vapid02(Vapid01):
@@ -313,6 +317,7 @@ class Vapid02(Vapid01):
     https://tools.ietf.org/html/rfc8292
 
     """
+
     _schema = "vapid"
 
     def sign(self, claims, crypto_key=None):
@@ -329,14 +334,11 @@ def sign(self, claims, crypto_key=None):
         """
         sig = sign(self._base_sign(claims), self.private_key)
         pkey = self.public_key.public_bytes(
-                serialization.Encoding.X962,
-                serialization.PublicFormat.UncompressedPoint
-            )
-        return{
+            serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
+        )
+        return {
             "Authorization": "{schema} t={t},k={k}".format(
-                schema=self._schema,
-                t=sig,
-                k=b64urlencode(pkey)
+                schema=self._schema, t=sig, k=b64urlencode(pkey)
             )
         }
 
@@ -349,27 +351,23 @@ def verify(cls, auth):
         :rtype: bool
 
         """
-        pref_tok = auth.rsplit(' ', 1)
-        assert pref_tok[0].lower() == cls._schema, (
-                "Incorrect schema specified")
+        pref_tok = auth.rsplit(" ", 1)
+        assert pref_tok[0].lower() == cls._schema, "Incorrect schema specified"
         parts = {}
-        for tok in pref_tok[1].split(','):
-            kv = tok.split('=', 1)
+        for tok in pref_tok[1].split(","):
+            kv = tok.split("=", 1)
             parts[kv[0]] = kv[1]
-        assert 'k' in parts.keys(), (
-                "Auth missing public key 'k' value")
-        assert 't' in parts.keys(), (
-                "Auth missing token set 't' value")
-        kp = cls().from_raw_public(parts['k'].encode())
-        tokens = parts['t'].rsplit('.', 1)
+        assert "k" in parts.keys(), "Auth missing public key 'k' value"
+        assert "t" in parts.keys(), "Auth missing token set 't' value"
+        kp = cls().from_raw_public(parts["k"].encode())
+        tokens = parts["t"].rsplit(".", 1)
         return kp.verify_token(
-            validation_token=tokens[0].encode(),
-            verification_token=tokens[1]
+            validation_token=tokens[0].encode(), verification_token=tokens[1]
         )
 
 
 def _check_sub(sub):
-    """ Check to see if the `sub` is a properly formatted `mailto:`
+    """Check to see if the `sub` is a properly formatted `mailto:`
 
     a `mailto:` should be a SMTP mail address. Mind you, since I run
     YouFailAtEmail.com, you have every right to yell about how terrible
@@ -382,9 +380,7 @@ def _check_sub(sub):
     :rtype: bool
 
     """
-    pattern = (
-        r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$" # noqa
-        )
+    pattern = r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$"  # noqa
     return re.match(pattern, sub, re.IGNORECASE) is not None