summaryrefslogtreecommitdiff
path: root/venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py')
-rw-r--r--venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py343
1 files changed, 343 insertions, 0 deletions
diff --git a/venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py b/venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py
new file mode 100644
index 0000000..4e5c0db
--- /dev/null
+++ b/venv/lib/python3.7/site-packages/pip-10.0.1-py3.7.egg/pip/_vendor/urllib3/contrib/_securetransport/low_level.py
@@ -0,0 +1,343 @@
1"""
2Low-level helpers for the SecureTransport bindings.
3
4These are Python functions that are not directly related to the high-level APIs
5but are necessary to get them to work. They include a whole bunch of low-level
6CoreFoundation messing about and memory management. The concerns in this module
7are almost entirely about trying to avoid memory leaks and providing
8appropriate and useful assistance to the higher-level code.
9"""
10import base64
11import ctypes
12import itertools
13import re
14import os
15import ssl
16import tempfile
17
18from .bindings import Security, CoreFoundation, CFConst
19
20
21# This regular expression is used to grab PEM data out of a PEM bundle.
22_PEM_CERTS_RE = re.compile(
23 b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
24)
25
26
27def _cf_data_from_bytes(bytestring):
28 """
29 Given a bytestring, create a CFData object from it. This CFData object must
30 be CFReleased by the caller.
31 """
32 return CoreFoundation.CFDataCreate(
33 CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
34 )
35
36
37def _cf_dictionary_from_tuples(tuples):
38 """
39 Given a list of Python tuples, create an associated CFDictionary.
40 """
41 dictionary_size = len(tuples)
42
43 # We need to get the dictionary keys and values out in the same order.
44 keys = (t[0] for t in tuples)
45 values = (t[1] for t in tuples)
46 cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
47 cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
48
49 return CoreFoundation.CFDictionaryCreate(
50 CoreFoundation.kCFAllocatorDefault,
51 cf_keys,
52 cf_values,
53 dictionary_size,
54 CoreFoundation.kCFTypeDictionaryKeyCallBacks,
55 CoreFoundation.kCFTypeDictionaryValueCallBacks,
56 )
57
58
59def _cf_string_to_unicode(value):
60 """
61 Creates a Unicode string from a CFString object. Used entirely for error
62 reporting.
63
64 Yes, it annoys me quite a lot that this function is this complex.
65 """
66 value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
67
68 string = CoreFoundation.CFStringGetCStringPtr(
69 value_as_void_p,
70 CFConst.kCFStringEncodingUTF8
71 )
72 if string is None:
73 buffer = ctypes.create_string_buffer(1024)
74 result = CoreFoundation.CFStringGetCString(
75 value_as_void_p,
76 buffer,
77 1024,
78 CFConst.kCFStringEncodingUTF8
79 )
80 if not result:
81 raise OSError('Error copying C string from CFStringRef')
82 string = buffer.value
83 if string is not None:
84 string = string.decode('utf-8')
85 return string
86
87
88def _assert_no_error(error, exception_class=None):
89 """
90 Checks the return code and throws an exception if there is an error to
91 report
92 """
93 if error == 0:
94 return
95
96 cf_error_string = Security.SecCopyErrorMessageString(error, None)
97 output = _cf_string_to_unicode(cf_error_string)
98 CoreFoundation.CFRelease(cf_error_string)
99
100 if output is None or output == u'':
101 output = u'OSStatus %s' % error
102
103 if exception_class is None:
104 exception_class = ssl.SSLError
105
106 raise exception_class(output)
107
108
109def _cert_array_from_pem(pem_bundle):
110 """
111 Given a bundle of certs in PEM format, turns them into a CFArray of certs
112 that can be used to validate a cert chain.
113 """
114 der_certs = [
115 base64.b64decode(match.group(1))
116 for match in _PEM_CERTS_RE.finditer(pem_bundle)
117 ]
118 if not der_certs:
119 raise ssl.SSLError("No root certificates specified")
120
121 cert_array = CoreFoundation.CFArrayCreateMutable(
122 CoreFoundation.kCFAllocatorDefault,
123 0,
124 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks)
125 )
126 if not cert_array:
127 raise ssl.SSLError("Unable to allocate memory!")
128
129 try:
130 for der_bytes in der_certs:
131 certdata = _cf_data_from_bytes(der_bytes)
132 if not certdata:
133 raise ssl.SSLError("Unable to allocate memory!")
134 cert = Security.SecCertificateCreateWithData(
135 CoreFoundation.kCFAllocatorDefault, certdata
136 )
137 CoreFoundation.CFRelease(certdata)
138 if not cert:
139 raise ssl.SSLError("Unable to build cert object!")
140
141 CoreFoundation.CFArrayAppendValue(cert_array, cert)
142 CoreFoundation.CFRelease(cert)
143 except Exception:
144 # We need to free the array before the exception bubbles further.
145 # We only want to do that if an error occurs: otherwise, the caller
146 # should free.
147 CoreFoundation.CFRelease(cert_array)
148
149 return cert_array
150
151
152def _is_cert(item):
153 """
154 Returns True if a given CFTypeRef is a certificate.
155 """
156 expected = Security.SecCertificateGetTypeID()
157 return CoreFoundation.CFGetTypeID(item) == expected
158
159
160def _is_identity(item):
161 """
162 Returns True if a given CFTypeRef is an identity.
163 """
164 expected = Security.SecIdentityGetTypeID()
165 return CoreFoundation.CFGetTypeID(item) == expected
166
167
168def _temporary_keychain():
169 """
170 This function creates a temporary Mac keychain that we can use to work with
171 credentials. This keychain uses a one-time password and a temporary file to
172 store the data. We expect to have one keychain per socket. The returned
173 SecKeychainRef must be freed by the caller, including calling
174 SecKeychainDelete.
175
176 Returns a tuple of the SecKeychainRef and the path to the temporary
177 directory that contains it.
178 """
179 # Unfortunately, SecKeychainCreate requires a path to a keychain. This
180 # means we cannot use mkstemp to use a generic temporary file. Instead,
181 # we're going to create a temporary directory and a filename to use there.
182 # This filename will be 8 random bytes expanded into base64. We also need
183 # some random bytes to password-protect the keychain we're creating, so we
184 # ask for 40 random bytes.
185 random_bytes = os.urandom(40)
186 filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
187 password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8
188 tempdirectory = tempfile.mkdtemp()
189
190 keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')
191
192 # We now want to create the keychain itself.
193 keychain = Security.SecKeychainRef()
194 status = Security.SecKeychainCreate(
195 keychain_path,
196 len(password),
197 password,
198 False,
199 None,
200 ctypes.byref(keychain)
201 )
202 _assert_no_error(status)
203
204 # Having created the keychain, we want to pass it off to the caller.
205 return keychain, tempdirectory
206
207
208def _load_items_from_file(keychain, path):
209 """
210 Given a single file, loads all the trust objects from it into arrays and
211 the keychain.
212 Returns a tuple of lists: the first list is a list of identities, the
213 second a list of certs.
214 """
215 certificates = []
216 identities = []
217 result_array = None
218
219 with open(path, 'rb') as f:
220 raw_filedata = f.read()
221
222 try:
223 filedata = CoreFoundation.CFDataCreate(
224 CoreFoundation.kCFAllocatorDefault,
225 raw_filedata,
226 len(raw_filedata)
227 )
228 result_array = CoreFoundation.CFArrayRef()
229 result = Security.SecItemImport(
230 filedata, # cert data
231 None, # Filename, leaving it out for now
232 None, # What the type of the file is, we don't care
233 None, # what's in the file, we don't care
234 0, # import flags
235 None, # key params, can include passphrase in the future
236 keychain, # The keychain to insert into
237 ctypes.byref(result_array) # Results
238 )
239 _assert_no_error(result)
240
241 # A CFArray is not very useful to us as an intermediary
242 # representation, so we are going to extract the objects we want
243 # and then free the array. We don't need to keep hold of keys: the
244 # keychain already has them!
245 result_count = CoreFoundation.CFArrayGetCount(result_array)
246 for index in range(result_count):
247 item = CoreFoundation.CFArrayGetValueAtIndex(
248 result_array, index
249 )
250 item = ctypes.cast(item, CoreFoundation.CFTypeRef)
251
252 if _is_cert(item):
253 CoreFoundation.CFRetain(item)
254 certificates.append(item)
255 elif _is_identity(item):
256 CoreFoundation.CFRetain(item)
257 identities.append(item)
258 finally:
259 if result_array:
260 CoreFoundation.CFRelease(result_array)
261
262 CoreFoundation.CFRelease(filedata)
263
264 return (identities, certificates)
265
266
267def _load_client_cert_chain(keychain, *paths):
268 """
269 Load certificates and maybe keys from a number of files. Has the end goal
270 of returning a CFArray containing one SecIdentityRef, and then zero or more
271 SecCertificateRef objects, suitable for use as a client certificate trust
272 chain.
273 """
274 # Ok, the strategy.
275 #
276 # This relies on knowing that macOS will not give you a SecIdentityRef
277 # unless you have imported a key into a keychain. This is a somewhat
278 # artificial limitation of macOS (for example, it doesn't necessarily
279 # affect iOS), but there is nothing inside Security.framework that lets you
280 # get a SecIdentityRef without having a key in a keychain.
281 #
282 # So the policy here is we take all the files and iterate them in order.
283 # Each one will use SecItemImport to have one or more objects loaded from
284 # it. We will also point at a keychain that macOS can use to work with the
285 # private key.
286 #
287 # Once we have all the objects, we'll check what we actually have. If we
288 # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
289 # we'll take the first certificate (which we assume to be our leaf) and
290 # ask the keychain to give us a SecIdentityRef with that cert's associated
291 # key.
292 #
293 # We'll then return a CFArray containing the trust chain: one
294 # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
295 # responsibility for freeing this CFArray will be with the caller. This
296 # CFArray must remain alive for the entire connection, so in practice it
297 # will be stored with a single SSLSocket, along with the reference to the
298 # keychain.
299 certificates = []
300 identities = []
301
302 # Filter out bad paths.
303 paths = (path for path in paths if path)
304
305 try:
306 for file_path in paths:
307 new_identities, new_certs = _load_items_from_file(
308 keychain, file_path
309 )
310 identities.extend(new_identities)
311 certificates.extend(new_certs)
312
313 # Ok, we have everything. The question is: do we have an identity? If
314 # not, we want to grab one from the first cert we have.
315 if not identities:
316 new_identity = Security.SecIdentityRef()
317 status = Security.SecIdentityCreateWithCertificate(
318 keychain,
319 certificates[0],
320 ctypes.byref(new_identity)
321 )
322 _assert_no_error(status)
323 identities.append(new_identity)
324
325 # We now want to release the original certificate, as we no longer
326 # need it.
327 CoreFoundation.CFRelease(certificates.pop(0))
328
329 # We now need to build a new CFArray that holds the trust chain.
330 trust_chain = CoreFoundation.CFArrayCreateMutable(
331 CoreFoundation.kCFAllocatorDefault,
332 0,
333 ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
334 )
335 for item in itertools.chain(identities, certificates):
336 # ArrayAppendValue does a CFRetain on the item. That's fine,
337 # because the finally block will release our other refs to them.
338 CoreFoundation.CFArrayAppendValue(trust_chain, item)
339
340 return trust_chain
341 finally:
342 for obj in itertools.chain(identities, certificates):
343 CoreFoundation.CFRelease(obj)