diff options
author | Laurens Van Houtven <_@lvh.io> | 2015-04-20 10:19:33 -0700 |
---|---|---|
committer | Laurens Van Houtven <_@lvh.io> | 2015-04-20 10:19:33 -0700 |
commit | 1c855cb5c6e8cf4cb0558dc4aa7a74a621cda3d6 (patch) | |
tree | 48065ae9d00921631da8bd8dd7aff273f2a72ba6 | |
parent | ade80a175d231d3f6347364c5a318571ded4d6bd (diff) | |
parent | 9a2c732e5102bf03845f79d8271f7b66021200f6 (diff) | |
download | pyopenssl-1c855cb5c6e8cf4cb0558dc4aa7a74a621cda3d6.tar.gz |
Merge branch 'master' into test-metadata
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | .travis.yml | 106 | ||||
-rw-r--r-- | ChangeLog | 67 | ||||
-rw-r--r-- | INSTALL | 33 | ||||
-rw-r--r-- | INSTALL.rst | 19 | ||||
-rw-r--r-- | MANIFEST.in | 4 | ||||
-rw-r--r-- | OpenSSL/RATIONALE | 61 | ||||
-rw-r--r-- | OpenSSL/SSL.py | 483 | ||||
-rw-r--r-- | OpenSSL/_util.py | 82 | ||||
-rw-r--r-- | OpenSSL/crypto.py | 176 | ||||
-rw-r--r-- | OpenSSL/rand.py | 20 | ||||
-rw-r--r-- | OpenSSL/test/test_crypto.py | 435 | ||||
-rw-r--r-- | OpenSSL/test/test_rand.py | 48 | ||||
-rw-r--r-- | OpenSSL/test/test_ssl.py | 963 | ||||
-rw-r--r-- | OpenSSL/test/test_util.py | 17 | ||||
-rw-r--r-- | OpenSSL/test/util.py | 81 | ||||
-rw-r--r-- | OpenSSL/version.py | 2 | ||||
-rw-r--r-- | README.rst | 17 | ||||
-rw-r--r-- | doc/api/crypto.rst | 47 | ||||
-rw-r--r-- | doc/api/ssl.rst | 80 | ||||
-rw-r--r-- | doc/conf.py | 6 | ||||
-rwxr-xr-x | setup.py | 43 | ||||
-rw-r--r-- | tox.ini | 12 |
23 files changed, 2439 insertions, 366 deletions
@@ -5,3 +5,6 @@ dist *.pyo __pycache__ .tox +doc/_build/ +.coverage +.eggs diff --git a/.travis.yml b/.travis.yml index f359de1..c244622 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,83 +1,99 @@ +sudo: false language: python -os: - - linux - -python: - - "pypy" - - "2.6" - - "2.7" - - "3.2" - - "3.3" - - "3.4" - matrix: include: + - language: generic + os: osx + env: TOXENV=py27 + - python: "2.6" # these are just to make travis's UI a bit prettier + env: TOXENV=py26 + - python: "2.7" + env: TOXENV=py27 + - python: "3.2" + env: TOXENV=py32 + - python: "3.3" + env: TOXENV=py33 + - python: "3.4" + env: TOXENV=py34 + - python: "pypy" + env: TOXENV=pypy # Also run the tests against cryptography master. - python: "2.6" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py26 - python: "2.7" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py27 - python: "3.2" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py32 - python: "3.3" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py33 - python: "3.4" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py34 - python: "pypy" env: - CRYPTOGRAPHY_GIT_MASTER=true + CRYPTOGRAPHY_GIT_MASTER=true TOXENV=pypy # Also run at least a little bit against an older version of OpenSSL. - python: "2.7" env: - OPENSSL=0.9.8 + OPENSSL=0.9.8 TOXENV=py27 + addons: + apt: + sources: + - lucid + packages: + - libssl-dev/lucid # Let the cryptography master builds fail because they might be triggered by # cryptography changes beyond our control. + # Also allow OS X and 0.9.8 to fail at the moment while we fix these new + # build configurations. allow_failures: - - env: - CRYPTOGRAPHY_GIT_MASTER=true - - env: - OPENSSL=0.9.8 + - language: generic + os: osx + env: TOXENV=py27 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py26 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py27 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py32 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py33 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=py34 + - env: CRYPTOGRAPHY_GIT_MASTER=true TOXENV=pypy + - env: OPENSSL=0.9.8 TOXENV=py27 before_install: - if [ -n "$CRYPTOGRAPHY_GIT_MASTER" ]; then pip install git+https://github.com/pyca/cryptography.git;fi install: - # Install the wheel library explicitly here. It is not really a setup - # dependency. It is not an install dependency. It is only a dependency for - # the script directive below - because we want to exercise wheel building on - # travis. - - pip install wheel - - # Also install some tools for measuring code coverage and sending the results - # to coveralls. - - pip install coveralls coverage + - | + if [[ "$(uname -s)" == 'Darwin' ]]; then + brew update + brew upgrade openssl + curl -O https://bootstrap.pypa.io/get-pip.py + python get-pip.py --user + pip install --user virtualenv + else + pip install virtualenv + fi + python -m virtualenv ~/.venv + ~/.venv/bin/pip install coveralls tox script: - | - if [[ "${OPENSSL}" == "0.9.8" ]]; then - sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ lucid main" - sudo apt-get -y update - sudo apt-get install -y --force-yes libssl-dev/lucid + if [[ "$(uname -s)" == "Darwin" ]]; then + # set our flags to use homebrew openssl + export ARCHFLAGS="-arch x86_64" + export LDFLAGS="-L/usr/local/opt/openssl/lib" + export CFLAGS="-I/usr/local/opt/openssl/include" fi - - | - pip install -e . - - | - coverage run --branch --source=OpenSSL setup.py bdist_wheel test - - | - coverage report -m - - | - python -c "import OpenSSL.SSL; print(OpenSSL.SSL.SSLeay_version(OpenSSL.SSL.SSLEAY_VERSION))" + ~/.venv/bin/tox -after_success: - - coveralls +after_script: + - ~/.venv/bin/coveralls notifications: email: false @@ -1,3 +1,66 @@ +2015-04-15 Paul Kehrer <paul.l.kehrer@gmail.com> + + * OpenSSL/crypto.py, OpenSSL/test/test_crypto.py: Switch to utf8string + mask by default. OpenSSL formerly defaulted to a T61String if there + were UTF-8 characters present. This was changed to default to + UTF8String in the config around 2005, but the actual code didn't + change it until late last year. This will default us to the setting + that actually works. To revert this you can call + crypto._lib.ASN1_STRING_set_default_mask_asc(b"default") + +2015-04-14 Hynek Schlawack <hs@ox.cx> + + * Release 0.15.1 + +2015-04-14 Glyph Lefkowitz <glyph@twistedmatrix.com> + + * OpenSSL/SSL.py, OpenSSL/test/test_ssl.py: Fix a regression + present in 0.15, where when an error occurs and no errno() is set, + a KeyError is raised. This happens, for example, if + Connection.shutdown() is called when the underlying transport has + gone away. + +2011-09-02 Hynek Schlawack <hs@ox.cx> + + * Release 0.15 + +2015-04-12 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * OpenSSL/rand.py, OpenSSL/SSL.py: APIs which previously accepted + filenames only as bytes now accept them as either bytes or + unicode (and respect sys.getfilesystemencoding()). + +2015-03-23 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * OpenSSL/SSL.py: Add Cory Benfield's next-protocol-negotiation + (NPN) bindings. + +2015-03-15 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * OpenSSL/SSL.py: Add ``Connection.recv_into``, mirroring the + builtin ``socket.recv_into``. Based on work from Cory Benfield. + * OpenSSL/test/test_ssl.py: Add tests for ``recv_into``. + +2015-01-30 Stephen Holsapple <sholsapp@gmail.com> + + * OpenSSL/crypto.py: Expose ``X509StoreContext`` for verifying certificates. + * OpenSSL/test/test_crypto.py: Add intermediate certificates for + +2015-01-08 Paul Aurich <paul@darkrain42.org> + + * OpenSSL/SSL.py: ``Connection.shutdown`` now propagates errors from the + underlying socket. + +2014-12-11 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * OpenSSL/SSL.py: Fixed a regression ``Context.check_privatekey`` + causing it to always succeed - even if it should fail. + +2014-08-21 Alex Gaynor <alex.gaynor@gmail.com> + + * OpenSSL/crypto.py: Fixed a regression where calling ``load_pkcs7_data`` + with ``FILETYPE_ASN1`` would fail with a ``NameError``. + 2014-05-05 Jean-Paul Calderone <exarkun@twistedmatrix.com> * OpenSSL/SSL.py: Fix a regression in which the first argument of @@ -55,6 +118,10 @@ * OpenSSL/crypto.py: Add ``get_extensions`` method to ``X509Req``. +2014-02-23 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * Release 0.14 + 2014-01-09 Jean-Paul Calderone <exarkun@twistedmatrix.com> * OpenSSL: Port to the cffi-based OpenSSL bindings provided by diff --git a/INSTALL b/INSTALL deleted file mode 100644 index 3af722f..0000000 --- a/INSTALL +++ /dev/null @@ -1,33 +0,0 @@ -Installation ------------- -pyOpenSSL uses distutils. Use setup.py to install it in the usual way: - - $ python setup.py install --user - -Or use pip: - - $ pip install --user . - -You can, of course, do - - $ python setup.py --help - -or - - $ pip install --help - -to find out more about how to use these tools. - -Documentation -------------- - -The documentation is written in reStructuredText and build using Sphinx. - -To build the text, html, postscript or dvi forms of the documentation, this is -what you do: - - cd doc - # To make the text-only documentation: - make text - # To make the dvi form: - make dvi diff --git a/INSTALL.rst b/INSTALL.rst new file mode 100644 index 0000000..eea013b --- /dev/null +++ b/INSTALL.rst @@ -0,0 +1,19 @@ +Installation +============ + +To install pyOpenSSL:: + + $ pip install pyopenssl + +If you are installing in order to *develop* on pyOpenSSL, move to the root directory of a pyOpenSSL checkout, and run:: + + $ pip install -e . + + +Documentation +============= + +The documentation is written in reStructuredText and built using Sphinx:: + + $ cd doc + $ make html diff --git a/MANIFEST.in b/MANIFEST.in index 8137258..455af6d 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,8 @@ -include LICENSE ChangeLog INSTALL README TODO MANIFEST.in OpenSSL/RATIONALE +include LICENSE ChangeLog TODO MANIFEST.in OpenSSL/RATIONALE *.rst tox.ini memdbg.py runtests.py OpenSSL/test/README +exclude leakcheck recursive-include doc * recursive-include examples * recursive-include rpm * +recursive-exclude leakcheck *.py *.pem global-exclude *.pyc prune doc/_build diff --git a/OpenSSL/RATIONALE b/OpenSSL/RATIONALE deleted file mode 100644 index a0e389c..0000000 --- a/OpenSSL/RATIONALE +++ /dev/null @@ -1,61 +0,0 @@ - RATIONALE - -The reason this module exists at all is that the SSL support in the socket -module in the Python 2.1 distribution (which is what we used, of course I -cannot speak for later versions) is severely limited. - -<FIXME> Update this list whenever needed! The communications module isn't -written yet, so we don't know exactly how this'll work! </FIXME> -This is a list of things we need from an OpenSSL module: - + Context objects (in OpenSSL called SSL_CTX) that can be manipulated from - Python modules. They must support a number of operations: - - Loading certificates from file and memory, both the client - certificate and the certificates used for the verification chain. - - Loading private keys from file and memory. - - Setting the verification mode (basically VERIFY_NONE and - VERIFY_PEER). - - Callbacks mechanism for prompting for pass phrases and verifying - certificates. The callbacks have to work under a multi-threaded - environment (see the comment in ssl/context.c). Of course the - callbacks will have to be written in Python! - + The Connection objects (in OpenSSL called SSL) have to support a few - things: - - Renegotiation, this is really important, especially for connections - that are up and running for a long time, since renegotiation - generates new encryption keys. - - Server-side SSL must work! As far as I know this doesn't work in - the SSL support of the socket module as of Python 2.1. - - Wrapping the methods of the underlying transport object is nice, so - you don't have to keep track of more than one object per connection. - This could of course be done a lot better than the way it works now, - so more transport layers than sockets are possible! - + A well-organized error system that mimics OpenSSL's error system is - desireable. Specifically there has to be a way to find out wether the - operation was successful, or if it failed, why it failed, so some sort - of interface to OpenSSL's error queue mechanism is needed. - + Certificate objects (X509) and certificate name objects (X509_NAME) are - needed, especially for verification purposes. Certificates will - probably also be generated by the server which is another reason for - them to exist. The same thing goes for key objects (EVP_PKEY) - + Since this is an OpenSSL module, there has to be an interface to the - OpenSSL PRNG, so it can be seeded in a good way. - -When asking about SSL on the comp.lang.python newsgroup (or on -python-list@python.org) people usually pointed you to the M2Crypto package. -The M2Crypto.SSL module does implement a lot of OpenSSL's functionality but -unfortunately its error handling system does not seem to be finished, -especially for non-blocking I/O. I think that much of the reason for this -is that M2Crypto is developed using SWIG. This makes it awkward to create -functions that e.g. can return both an integer and NULL since (as far as I -know) you basically write C functions and SWIG makes wrapper functions that -parses the Python argument list and calls your C function, and finally -transforms your return value to a Python object. - -Finally, a good book on the topic of SSL (that I read and learned a lot -from) is "SSL and TLS - Designing and Building Secure Systems" (ISBN -0201615983) by Eric Rescorla. A good mailinglist to subscribe to is the -openssl-users@openssl.org list. - -This comment was written July 2001, discussing Python 2.1. Feel free to -modify it as the SSL support in the socket module changes. - diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py index 7b1cbc1..e67bd13 100644 --- a/OpenSSL/SSL.py +++ b/OpenSSL/SSL.py @@ -1,23 +1,27 @@ from sys import platform from functools import wraps, partial -from itertools import count +from itertools import count, chain from weakref import WeakValueDictionary from errno import errorcode from six import text_type as _text_type +from six import binary_type as _binary_type from six import integer_types as integer_types +from six import int2byte, indexbytes from OpenSSL._util import ( ffi as _ffi, lib as _lib, exception_from_error_queue as _exception_from_error_queue, - native as _native) + native as _native, + text_to_bytes_and_warn as _text_to_bytes_and_warn, + path_string as _path_string, + UNSPECIFIED as _UNSPECIFIED, +) from OpenSSL.crypto import ( FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store) -_unspecified = object() - try: _memoryview = memoryview except NameError: @@ -164,10 +168,41 @@ class SysCallError(Error): pass +class _CallbackExceptionHelper(object): + """ + A base class for wrapper classes that allow for intelligent exception + handling in OpenSSL callbacks. + + :ivar list _problems: Any exceptions that occurred while executing in a + context where they could not be raised in the normal way. Typically + this is because OpenSSL has called into some Python code and requires a + return value. The exceptions are saved to be raised later when it is + possible to do so. + """ + def __init__(self): + self._problems = [] + -class _VerifyHelper(object): + def raise_if_problem(self): + """ + Raise an exception from the OpenSSL error queue or that was previously + captured whe running a callback. + """ + if self._problems: + try: + _raise_current_error() + except Error: + pass + raise self._problems.pop(0) + + +class _VerifyHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as a certificate verification + callback. + """ def __init__(self, callback): - self._problems = [] + _CallbackExceptionHelper.__init__(self) @wraps(callback) def wrapper(ok, store_ctx): @@ -196,14 +231,142 @@ class _VerifyHelper(object): "int (*)(int, X509_STORE_CTX *)", wrapper) - def raise_if_problem(self): - if self._problems: +class _NpnAdvertiseHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an NPN advertisement callback. + """ + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, arg): try: - _raise_current_error() - except Error: - pass - raise self._problems.pop(0) + conn = Connection._reverse_mapping[ssl] + protos = callback(conn) + + # Join the protocols into a Python bytestring, length-prefixing + # each element. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Save our callback arguments on the connection object. This is + # done to make sure that they don't get freed before OpenSSL + # uses them. Then, return them appropriately in the output + # parameters. + conn._npn_advertise_callback_args = [ + _ffi.new("unsigned int *", len(protostr)), + _ffi.new("unsigned char[]", protostr), + ] + outlen[0] = conn._npn_advertise_callback_args[0][0] + out[0] = conn._npn_advertise_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + self.callback = _ffi.callback( + "int (*)(SSL *, const unsigned char **, unsigned int *, void *)", + wrapper + ) + + +class _NpnSelectHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an NPN selection callback. + """ + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, in_, inlen, arg): + try: + conn = Connection._reverse_mapping[ssl] + + # The string passed to us is actually made up of multiple + # length-prefixed bytestrings. We need to split that into a + # list. + instr = _ffi.buffer(in_, inlen)[:] + protolist = [] + while instr: + l = indexbytes(instr, 0) + proto = instr[1:l+1] + protolist.append(proto) + instr = instr[l+1:] + + # Call the callback + outstr = callback(conn, protolist) + + # Save our callback arguments on the connection object. This is + # done to make sure that they don't get freed before OpenSSL + # uses them. Then, return them appropriately in the output + # parameters. + conn._npn_select_callback_args = [ + _ffi.new("unsigned char *", len(outstr)), + _ffi.new("unsigned char[]", outstr), + ] + outlen[0] = conn._npn_select_callback_args[0][0] + out[0] = conn._npn_select_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + + self.callback = _ffi.callback( + "int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)", + wrapper + ) + + +class _ALPNSelectHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an ALPN selection callback. + """ + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, in_, inlen, arg): + try: + conn = Connection._reverse_mapping[ssl] + + # The string passed to us is made up of multiple + # length-prefixed bytestrings. We need to split that into a + # list. + instr = _ffi.buffer(in_, inlen)[:] + protolist = [] + while instr: + encoded_len = indexbytes(instr, 0) + proto = instr[1:encoded_len + 1] + protolist.append(proto) + instr = instr[encoded_len + 1:] + + # Call the callback + outstr = callback(conn, protolist) + + if not isinstance(outstr, _binary_type): + raise TypeError("ALPN callback must return a bytestring.") + + # Save our callback arguments on the connection object to make + # sure that they don't get freed before OpenSSL can use them. + # Then, return them in the appropriate output parameters. + conn._alpn_select_callback_args = [ + _ffi.new("unsigned char *", len(outstr)), + _ffi.new("unsigned char[]", outstr), + ] + outlen[0] = conn._alpn_select_callback_args[0][0] + out[0] = conn._alpn_select_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + + self.callback = _ffi.callback( + "int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)", + wrapper + ) def _asFileDescriptor(obj): @@ -235,6 +398,37 @@ def SSLeay_version(type): return _ffi.string(_lib.SSLeay_version(type)) +def _requires_npn(func): + """ + Wraps any function that requires NPN support in OpenSSL, ensuring that + NotImplementedError is raised if NPN is not present. + """ + @wraps(func) + def wrapper(*args, **kwargs): + if not _lib.Cryptography_HAS_NEXTPROTONEG: + raise NotImplementedError("NPN not available.") + + return func(*args, **kwargs) + + return wrapper + + + +def _requires_alpn(func): + """ + Wraps any function that requires ALPN support in OpenSSL, ensuring that + NotImplementedError is raised if ALPN support is not present. + """ + @wraps(func) + def wrapper(*args, **kwargs): + if not _lib.Cryptography_HAS_ALPN: + raise NotImplementedError("ALPN not available.") + + return func(*args, **kwargs) + + return wrapper + + class Session(object): pass @@ -293,6 +487,12 @@ class Context(object): self._info_callback = None self._tlsext_servername_callback = None self._app_data = None + self._npn_advertise_helper = None + self._npn_advertise_callback = None + self._npn_select_helper = None + self._npn_select_callback = None + self._alpn_select_helper = None + self._alpn_select_callback = None # SSL_CTX_set_app_data(self->ctx, self); # SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE | @@ -306,19 +506,22 @@ class Context(object): Let SSL know where we can find trusted certificates for the certificate chain - :param cafile: In which file we can find the certificates + :param cafile: In which file we can find the certificates (``bytes`` or + ``unicode``). :param capath: In which directory we can find the certificates + (``bytes`` or ``unicode``). + :return: None """ if cafile is None: cafile = _ffi.NULL - elif not isinstance(cafile, bytes): - raise TypeError("cafile must be None or a byte string") + else: + cafile = _path_string(cafile) if capath is None: capath = _ffi.NULL - elif not isinstance(capath, bytes): - raise TypeError("capath must be None or a byte string") + else: + capath = _path_string(capath) load_result = _lib.SSL_CTX_load_verify_locations(self._context, cafile, capath) if not load_result: @@ -368,15 +571,12 @@ class Context(object): """ Load a certificate chain from a file - :param certfile: The name of the certificate chain file + :param certfile: The name of the certificate chain file (``bytes`` or + ``unicode``). + :return: None """ - if isinstance(certfile, _text_type): - # Perhaps sys.getfilesystemencoding() could be better? - certfile = certfile.encode("utf-8") - - if not isinstance(certfile, bytes): - raise TypeError("certfile must be bytes or unicode") + certfile = _path_string(certfile) result = _lib.SSL_CTX_use_certificate_chain_file(self._context, certfile) if not result: @@ -387,15 +587,13 @@ class Context(object): """ Load a certificate from a file - :param certfile: The name of the certificate file + :param certfile: The name of the certificate file (``bytes`` or + ``unicode``). :param filetype: (optional) The encoding of the file, default is PEM + :return: None """ - if isinstance(certfile, _text_type): - # Perhaps sys.getfilesystemencoding() could be better? - certfile = certfile.encode("utf-8") - if not isinstance(certfile, bytes): - raise TypeError("certfile must be bytes or unicode") + certfile = _path_string(certfile) if not isinstance(filetype, integer_types): raise TypeError("filetype must be an integer") @@ -445,22 +643,18 @@ class Context(object): raise exception - def use_privatekey_file(self, keyfile, filetype=_unspecified): + def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): """ Load a private key from a file - :param keyfile: The name of the key file + :param keyfile: The name of the key file (``bytes`` or ``unicode``) :param filetype: (optional) The encoding of the file, default is PEM + :return: None """ - if isinstance(keyfile, _text_type): - # Perhaps sys.getfilesystemencoding() could be better? - keyfile = keyfile.encode("utf-8") + keyfile = _path_string(keyfile) - if not isinstance(keyfile, bytes): - raise TypeError("keyfile must be a byte string") - - if filetype is _unspecified: + if filetype is _UNSPECIFIED: filetype = FILETYPE_PEM elif not isinstance(filetype, integer_types): raise TypeError("filetype must be an integer") @@ -492,6 +686,9 @@ class Context(object): :return: None (raises an exception if something's wrong) """ + if not _lib.SSL_CTX_check_private_key(self._context): + _raise_current_error() + def load_client_ca(self, cafile): """ @@ -591,11 +788,12 @@ class Context(object): """ Load parameters for Ephemeral Diffie-Hellman - :param dhfile: The file to load EDH parameters from + :param dhfile: The file to load EDH parameters from (``bytes`` or + ``unicode``). + :return: None """ - if not isinstance(dhfile, bytes): - raise TypeError("dhfile must be a byte string") + dhfile = _path_string(dhfile) bio = _lib.BIO_new_file(dhfile, b"r") if bio == _ffi.NULL: @@ -809,6 +1007,79 @@ class Context(object): _lib.SSL_CTX_set_tlsext_servername_callback( self._context, self._tlsext_servername_callback) + + @_requires_npn + def set_npn_advertise_callback(self, callback): + """ + Specify a callback function that will be called when offering `Next + Protocol Negotiation + <https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server. + + :param callback: The callback function. It will be invoked with one + argument, the Connection instance. It should return a list of + bytestrings representing the advertised protocols, like + ``[b'http/1.1', b'spdy/2']``. + """ + self._npn_advertise_helper = _NpnAdvertiseHelper(callback) + self._npn_advertise_callback = self._npn_advertise_helper.callback + _lib.SSL_CTX_set_next_protos_advertised_cb( + self._context, self._npn_advertise_callback, _ffi.NULL) + + + @_requires_npn + def set_npn_select_callback(self, callback): + """ + Specify a callback function that will be called when a server offers + Next Protocol Negotiation options. + + :param callback: The callback function. It will be invoked with two + arguments: the Connection, and a list of offered protocols as + bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return + one of those bytestrings, the chosen protocol. + """ + self._npn_select_helper = _NpnSelectHelper(callback) + self._npn_select_callback = self._npn_select_helper.callback + _lib.SSL_CTX_set_next_proto_select_cb( + self._context, self._npn_select_callback, _ffi.NULL) + + @_requires_alpn + def set_alpn_protos(self, protos): + """ + Specify the clients ALPN protocol list. + + These protocols are offered to the server during protocol negotiation. + + :param protos: A list of the protocols to be offered to the server. + This list should be a Python list of bytestrings representing the + protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. + """ + # Take the list of protocols and join them together, prefixing them + # with their lengths. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Build a C string from the list. We don't need to save this off + # because OpenSSL immediately copies the data out. + input_str = _ffi.new("unsigned char[]", protostr) + input_str_len = _ffi.cast("unsigned", len(protostr)) + _lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len) + + @_requires_alpn + def set_alpn_select_callback(self, callback): + """ + Set the callback to handle ALPN protocol choice. + + :param callback: The callback function. It will be invoked with two + arguments: the Connection, and a list of offered protocols as + bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It should return + one of those bytestrings, the chosen protocol. + """ + self._alpn_select_helper = _ALPNSelectHelper(callback) + self._alpn_select_callback = self._alpn_select_helper.callback + _lib.SSL_CTX_set_alpn_select_cb( + self._context, self._alpn_select_callback, _ffi.NULL) + ContextType = Context @@ -833,6 +1104,19 @@ class Connection(object): self._ssl = _ffi.gc(ssl, _lib.SSL_free) self._context = context + # References to strings used for Next Protocol Negotiation. OpenSSL's + # header files suggest that these might get copied at some point, but + # doesn't specify when, so we store them here to make sure they don't + # get freed before OpenSSL uses them. + self._npn_advertise_callback_args = None + self._npn_select_callback_args = None + + # References to strings used for Application Layer Protocol + # Negotiation. These strings get copied at some point but it's well + # after the callback returns, so we have to hang them somewhere to + # avoid them getting freed. + self._alpn_select_callback_args = None + self._reverse_mapping[self._ssl] = self if socket is None: @@ -867,6 +1151,12 @@ class Connection(object): def _raise_ssl_error(self, ssl, result): if self._context._verify_helper is not None: self._context._verify_helper.raise_if_problem() + if self._context._npn_advertise_helper is not None: + self._context._npn_advertise_helper.raise_if_problem() + if self._context._npn_select_helper is not None: + self._context._npn_select_helper.raise_if_problem() + if self._context._alpn_select_helper is not None: + self._context._alpn_select_helper.raise_if_problem() error = _lib.SSL_get_error(ssl, result) if error == _lib.SSL_ERROR_WANT_READ: @@ -885,7 +1175,7 @@ class Connection(object): errno = _ffi.getwinerror()[0] else: errno = _ffi.errno - raise SysCallError(errno, errorcode[errno]) + raise SysCallError(errno, errorcode.get(errno)) else: raise SysCallError(-1, "Unexpected EOF") else: @@ -967,6 +1257,9 @@ class Connection(object): API, the value is ignored :return: The number of bytes written """ + # Backward compatibility + buf = _text_to_bytes_and_warn("buf", buf) + if isinstance(buf, _memoryview): buf = buf.tobytes() if isinstance(buf, _buffer): @@ -991,6 +1284,8 @@ class Connection(object): API, the value is ignored :return: The number of bytes written """ + buf = _text_to_bytes_and_warn("buf", buf) + if isinstance(buf, _memoryview): buf = buf.tobytes() if isinstance(buf, _buffer): @@ -1027,6 +1322,45 @@ class Connection(object): read = recv + def recv_into(self, buffer, nbytes=None, flags=None): + """ + Receive data on the connection and store the data into a buffer rather + than creating a new string. + + :param buffer: The buffer to copy into. + :param nbytes: (optional) The maximum number of bytes to read into the + buffer. If not present, defaults to the size of the buffer. If + larger than the size of the buffer, is reduced to the size of the + buffer. + :param flags: (optional) Included for compatibility with the socket + API, the value is ignored. + :return: The number of bytes read into the buffer. + """ + if nbytes is None: + nbytes = len(buffer) + else: + nbytes = min(nbytes, len(buffer)) + + # We need to create a temporary buffer. This is annoying, it would be + # better if we could pass memoryviews straight into the SSL_read call, + # but right now we can't. Revisit this if CFFI gets that ability. + buf = _ffi.new("char[]", nbytes) + result = _lib.SSL_read(self._ssl, buf, nbytes) + self._raise_ssl_error(self._ssl, result) + + # This strange line is all to avoid a memory copy. The buffer protocol + # should allow us to assign a CFFI buffer to the LHS of this line, but + # on CPython 3.3+ that segfaults. As a workaround, we can temporarily + # wrap it in a memoryview, except on Python 2.6 which doesn't have a + # memoryview type. + try: + buffer[:result] = memoryview(_ffi.buffer(buf, result)) + except NameError: + buffer[:result] = _ffi.buffer(buf, result) + + return result + + def _handle_bio_errors(self, bio, result): if _lib.BIO_should_retry(bio): if _lib.BIO_should_read(bio): @@ -1076,12 +1410,11 @@ class Connection(object): :param buf: The string to put into the memory BIO. :return: The number of bytes written """ + buf = _text_to_bytes_and_warn("buf", buf) + if self._into_ssl is None: raise TypeError("Connection sock was not None") - if not isinstance(buf, bytes): - raise TypeError("buf must be a byte string") - result = _lib.BIO_write(self._into_ssl, buf, len(buf)) if result <= 0: self._handle_bio_errors(self._into_ssl, result) @@ -1183,8 +1516,7 @@ class Connection(object): """ result = _lib.SSL_shutdown(self._ssl) if result < 0: - # TODO: This is untested. - _raise_current_error() + self._raise_ssl_error(self._ssl, result) elif result > 0: return True else: @@ -1551,6 +1883,61 @@ class Connection(object): return version.decode("utf-8") + @_requires_npn + def get_next_proto_negotiated(self): + """ + Get the protocol that was negotiated by NPN. + """ + data = _ffi.new("unsigned char **") + data_len = _ffi.new("unsigned int *") + + _lib.SSL_get0_next_proto_negotiated(self._ssl, data, data_len) + + return _ffi.buffer(data[0], data_len[0])[:] + + + @_requires_alpn + def set_alpn_protos(self, protos): + """ + Specify the client's ALPN protocol list. + + These protocols are offered to the server during protocol negotiation. + + :param protos: A list of the protocols to be offered to the server. + This list should be a Python list of bytestrings representing the + protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. + """ + # Take the list of protocols and join them together, prefixing them + # with their lengths. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Build a C string from the list. We don't need to save this off + # because OpenSSL immediately copies the data out. + input_str = _ffi.new("unsigned char[]", protostr) + input_str_len = _ffi.cast("unsigned", len(protostr)) + _lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len) + + + def get_alpn_proto_negotiated(self): + """ + Get the protocol that was negotiated by ALPN. + """ + if not _lib.Cryptography_HAS_ALPN: + raise NotImplementedError("ALPN not available") + + data = _ffi.new("unsigned char **") + data_len = _ffi.new("unsigned int *") + + _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) + + if not data_len: + return b'' + + return _ffi.buffer(data[0], data_len[0])[:] + + ConnectionType = Connection diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py index baeecc6..0cc34d8 100644 --- a/OpenSSL/_util.py +++ b/OpenSSL/_util.py @@ -1,3 +1,6 @@ +from warnings import warn +import sys + from six import PY3, binary_type, text_type from cryptography.hazmat.bindings.openssl.binding import Binding @@ -5,11 +8,34 @@ binding = Binding() ffi = binding.ffi lib = binding.lib -def exception_from_error_queue(exceptionType): - def text(charp): - return native(ffi.string(charp)) + + +def text(charp): + """ + Get a native string type representing of the given CFFI ``char*`` object. + + :param charp: A C-style string represented using CFFI. + + :return: :class:`str` + """ + if not charp: + return "" + return native(ffi.string(charp)) + + + +def exception_from_error_queue(exception_type): + """ + Convert an OpenSSL library failure into a Python exception. + + When a call to the native OpenSSL library fails, this is usually signalled + by the return value, and an error code is stored in an error queue + associated with the current thread. The err library provides functions to + obtain these error codes and textual error messages. + """ errors = [] + while True: error = lib.ERR_get_error() if error == 0: @@ -19,7 +45,7 @@ def exception_from_error_queue(exceptionType): text(lib.ERR_func_error_string(error)), text(lib.ERR_reason_error_string(error)))) - raise exceptionType(errors) + raise exception_type(errors) @@ -45,9 +71,57 @@ def native(s): +def path_string(s): + """ + Convert a Python string to a :py:class:`bytes` string identifying the same + path and which can be passed into an OpenSSL API accepting a filename. + + :param s: An instance of :py:class:`bytes` or :py:class:`unicode`. + + :return: An instance of :py:class:`bytes`. + """ + if isinstance(s, binary_type): + return s + elif isinstance(s, text_type): + return s.encode(sys.getfilesystemencoding()) + else: + raise TypeError("Path must be represented as bytes or unicode string") + + if PY3: def byte_string(s): return s.encode("charmap") else: def byte_string(s): return s + + +# A marker object to observe whether some optional arguments are passed any +# value or not. +UNSPECIFIED = object() + +_TEXT_WARNING = ( + text_type.__name__ + " for {0} is no longer accepted, use bytes" +) + +def text_to_bytes_and_warn(label, obj): + """ + If ``obj`` is text, emit a warning that it should be bytes instead and try + to convert it to bytes automatically. + + :param str label: The name of the parameter from which ``obj`` was taken + (so a developer can easily find the source of the problem and correct + it). + + :return: If ``obj`` is the text string type, a ``bytes`` object giving the + UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is + returned. + """ + if isinstance(obj, text_type): + warn( + _TEXT_WARNING.format(label), + category=DeprecationWarning, + stacklevel=3 + ) + return obj.encode('utf-8') + return obj diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py index 54569ea..c7bdabc 100644 --- a/OpenSSL/crypto.py +++ b/OpenSSL/crypto.py @@ -2,6 +2,7 @@ from time import time from base64 import b16encode from functools import partial from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ +from warnings import warn as _warn from six import ( integer_types as _integer_types, @@ -13,7 +14,10 @@ from OpenSSL._util import ( lib as _lib, exception_from_error_queue as _exception_from_error_queue, byte_string as _byte_string, - native as _native) + native as _native, + UNSPECIFIED as _UNSPECIFIED, + text_to_bytes_and_warn as _text_to_bytes_and_warn, +) FILETYPE_PEM = _lib.SSL_FILETYPE_PEM FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 @@ -25,6 +29,7 @@ TYPE_RSA = _lib.EVP_PKEY_RSA TYPE_DSA = _lib.EVP_PKEY_DSA + class Error(Exception): """ An error occurred in an `OpenSSL.crypto` API. @@ -33,6 +38,8 @@ class Error(Exception): _raise_current_error = partial(_exception_from_error_queue, Error) + + def _untested_error(where): """ An OpenSSL API failed somehow. Additionally, the failure which was @@ -1356,6 +1363,125 @@ class X509Store(object): X509StoreType = X509Store +class X509StoreContextError(Exception): + """ + An error occurred while verifying a certificate using + `OpenSSL.X509StoreContext.verify_certificate`. + + :ivar certificate: The certificate which caused verificate failure. + :type cert: :class:`X509` + + """ + def __init__(self, message, certificate): + super(X509StoreContextError, self).__init__(message) + self.certificate = certificate + + +class X509StoreContext(object): + """ + An X.509 store context. + + An :py:class:`X509StoreContext` is used to define some of the criteria for + certificate verification. The information encapsulated in this object + includes, but is not limited to, a set of trusted certificates, + verification parameters, and revoked certificates. + + Of these, only the set of trusted certificates is currently exposed. + + :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this + instance. It is dynamically allocated and automatically garbage + collected. + + :ivar _store: See the ``store`` ``__init__`` parameter. + + :ivar _cert: See the ``certificate`` ``__init__`` parameter. + """ + + def __init__(self, store, certificate): + """ + :param X509Store store: The certificates which will be trusted for the + purposes of any verifications. + + :param X509 certificate: The certificate to be verified. + """ + store_ctx = _lib.X509_STORE_CTX_new() + self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) + self._store = store + self._cert = certificate + # Make the store context available for use after instantiating this + # class by initializing it now. Per testing, subsequent calls to + # :py:meth:`_init` have no adverse affect. + self._init() + + + def _init(self): + """ + Set up the store context for a subsequent verification operation. + """ + ret = _lib.X509_STORE_CTX_init(self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL) + if ret <= 0: + _raise_current_error() + + + def _cleanup(self): + """ + Internally cleans up the store context. + + The store context can then be reused with a new call to + :py:meth:`_init`. + """ + _lib.X509_STORE_CTX_cleanup(self._store_ctx) + + + def _exception_from_context(self): + """ + Convert an OpenSSL native context error failure into a Python + exception. + + When a call to native OpenSSL X509_verify_cert fails, additonal information + about the failure can be obtained from the store context. + """ + errors = [ + _lib.X509_STORE_CTX_get_error(self._store_ctx), + _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), + _native(_ffi.string(_lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx)))), + ] + # A context error should always be associated with a certificate, so we + # expect this call to never return :class:`None`. + _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) + _cert = _lib.X509_dup(_x509) + pycert = X509.__new__(X509) + pycert._x509 = _ffi.gc(_cert, _lib.X509_free) + return X509StoreContextError(errors, pycert) + + + def set_store(self, store): + """ + Set the context's trust store. + + :param X509Store store: The certificates which will be trusted for the + purposes of any *future* verifications. + """ + self._store = store + + + def verify_certificate(self): + """ + Verify a certificate in a context. + + :param store_ctx: The :py:class:`X509StoreContext` to verify. + :raises: Error + """ + # Always re-initialize the store context in case + # :py:meth:`verify_certificate` is called multiple times. + self._init() + ret = _lib.X509_verify_cert(self._store_ctx) + self._cleanup() + if ret <= 0: + raise self._exception_from_context() + + def load_certificate(type, buffer): """ @@ -1707,7 +1833,8 @@ class CRL(object): _raise_current_error() - def export(self, cert, key, type=FILETYPE_PEM, days=100): + def export(self, cert, key, type=FILETYPE_PEM, days=100, + digest=_UNSPECIFIED): """ export a CRL as a string @@ -1717,12 +1844,15 @@ class CRL(object): :param key: Used to sign CRL. :type key: :class:`PKey` - :param type: The export format, either :py:data:`FILETYPE_PEM`, :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`. + :param type: The export format, either :py:data:`FILETYPE_PEM`, + :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`. + + :param int days: The number of days until the next update of this CRL. - :param days: The number of days until the next update of this CRL. - :type days: :py:data:`int` + :param bytes digest: The name of the message digest to use (eg + ``b"sha1"``). - :return: :py:data:`str` + :return: :py:data:`bytes` """ if not isinstance(cert, X509): raise TypeError("cert must be an X509 instance") @@ -1731,6 +1861,19 @@ class CRL(object): if not isinstance(type, int): raise TypeError("type must be an integer") + if digest is _UNSPECIFIED: + _warn( + "The default message digest (md5) is deprecated. " + "Pass the name of a message digest explicitly.", + category=DeprecationWarning, + stacklevel=2, + ) + digest = b"md5" + + digest_obj = _lib.EVP_get_digestbyname(digest) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + bio = _lib.BIO_new(_lib.BIO_s_mem()) if bio == _ffi.NULL: # TODO: This is untested. @@ -1750,7 +1893,7 @@ class CRL(object): _lib.X509_CRL_set_issuer_name(self._crl, _lib.X509_get_subject_name(cert._x509)) - sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, _lib.EVP_md5()) + sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) if not sign_result: _raise_current_error() @@ -1897,7 +2040,7 @@ class PKCS12(object): def set_ca_certificates(self, cacerts): """ - Replace or set the CA certificates withing the PKCS12 object. + Replace or set the CA certificates within the PKCS12 object. :param cacerts: The new CA certificates. :type cacerts: :py:data:`None` or an iterable of :py:class:`X509` @@ -1952,6 +2095,8 @@ class PKCS12(object): :return: The string containing the PKCS12 """ + passphrase = _text_to_bytes_and_warn("passphrase", passphrase) + if self._cacerts is None: cacerts = _ffi.NULL else: @@ -2249,6 +2394,8 @@ def sign(pkey, data, digest): :param digest: message digest to use :return: signature """ + data = _text_to_bytes_and_warn("data", data) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -2283,6 +2430,8 @@ def verify(cert, signature, data, digest): :param digest: message digest to use :return: None if the signature is correct, raise exception otherwise """ + data = _text_to_bytes_and_warn("data", data) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -2304,7 +2453,6 @@ def verify(cert, signature, data, digest): _raise_current_error() - def load_crl(type, buffer): """ Load a certificate revocation list from a buffer @@ -2351,7 +2499,7 @@ def load_pkcs7_data(type, buffer): if type == FILETYPE_PEM: pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) elif type == FILETYPE_ASN1: - pass + pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL) else: # TODO: This is untested. _raise_current_error() @@ -2374,6 +2522,8 @@ def load_pkcs12(buffer, passphrase=None): :param passphrase: (Optional) The password to decrypt the PKCS12 lump :returns: The PKCS12 object """ + passphrase = _text_to_bytes_and_warn("passphrase", passphrase) + if isinstance(buffer, _text_type): buffer = buffer.encode("ascii") @@ -2487,3 +2637,9 @@ _lib.OpenSSL_add_all_algorithms() # This is similar but exercised mainly by exception_from_error_queue. It calls # both ERR_load_crypto_strings() and ERR_load_SSL_strings(). _lib.SSL_load_error_strings() + + + +# Set the default string mask to match OpenSSL upstream (since 2005) and +# RFC5280 recommendations. +_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') diff --git a/OpenSSL/rand.py b/OpenSSL/rand.py index e754378..3adf693 100644 --- a/OpenSSL/rand.py +++ b/OpenSSL/rand.py @@ -11,7 +11,8 @@ from six import integer_types as _integer_types from OpenSSL._util import ( ffi as _ffi, lib as _lib, - exception_from_error_queue as _exception_from_error_queue) + exception_from_error_queue as _exception_from_error_queue, + path_string as _path_string) class Error(Exception): @@ -131,13 +132,13 @@ def load_file(filename, maxbytes=_unspecified): """ Seed the PRNG with data from a file - :param filename: The file to read data from - :param maxbytes: (optional) The number of bytes to read, default is - to read the entire file + :param filename: The file to read data from (``bytes`` or ``unicode``). + :param maxbytes: (optional) The number of bytes to read, default is to read + the entire file + :return: The number of bytes read """ - if not isinstance(filename, _builtin_bytes): - raise TypeError("filename must be a string") + filename = _path_string(filename) if maxbytes is _unspecified: maxbytes = -1 @@ -152,12 +153,11 @@ def write_file(filename): """ Save PRNG state to a file - :param filename: The file to write data to + :param filename: The file to write data to (``bytes`` or ``unicode``). + :return: The number of bytes written """ - if not isinstance(filename, _builtin_bytes): - raise TypeError("filename must be a string") - + filename = _path_string(filename) return _lib.RAND_write_file(filename) diff --git a/OpenSSL/test/test_crypto.py b/OpenSSL/test/test_crypto.py index bbe5d05..b817451 100644 --- a/OpenSSL/test/test_crypto.py +++ b/OpenSSL/test/test_crypto.py @@ -6,16 +6,22 @@ Unit tests for :py:mod:`OpenSSL.crypto`. """ from unittest import main +from warnings import catch_warnings, simplefilter -import os, re +import base64 +import os +import re from subprocess import PIPE, Popen from datetime import datetime, timedelta -from six import u, b, binary_type +from six import u, b, binary_type, PY3 +from warnings import simplefilter +from warnings import catch_warnings from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey, PKeyType from OpenSSL.crypto import X509, X509Type, X509Name, X509NameType -from OpenSSL.crypto import X509Store, X509StoreType, X509Req, X509ReqType +from OpenSSL.crypto import X509Store, X509StoreType, X509StoreContext, X509StoreContextError +from OpenSSL.crypto import X509Req, X509ReqType from OpenSSL.crypto import X509Extension, X509ExtensionType from OpenSSL.crypto import load_certificate, load_privatekey from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT @@ -27,7 +33,9 @@ from OpenSSL.crypto import CRL, Revoked, load_crl from OpenSSL.crypto import NetscapeSPKI, NetscapeSPKIType from OpenSSL.crypto import ( sign, verify, get_elliptic_curve, get_elliptic_curves) -from OpenSSL.test.util import EqualityTestsMixin, TestCase +from OpenSSL.test.util import ( + EqualityTestsMixin, TestCase, WARNING_TYPE_EXPECTED +) from OpenSSL._util import native, lib def normalize_certificate_pem(pem): @@ -81,6 +89,40 @@ cbvAhow217X9V0dVerEOKxnNYspXRrh36h7k4mQA+sDq -----END RSA PRIVATE KEY----- """) +intermediate_cert_pem = b("""-----BEGIN CERTIFICATE----- +MIICVzCCAcCgAwIBAgIRAMPzhm6//0Y/g2pmnHR2C4cwDQYJKoZIhvcNAQENBQAw +WDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAklMMRAwDgYDVQQHEwdDaGljYWdvMRAw +DgYDVQQKEwdUZXN0aW5nMRgwFgYDVQQDEw9UZXN0aW5nIFJvb3QgQ0EwHhcNMTQw +ODI4MDIwNDA4WhcNMjQwODI1MDIwNDA4WjBmMRUwEwYDVQQDEwxpbnRlcm1lZGlh +dGUxDDAKBgNVBAoTA29yZzERMA8GA1UECxMIb3JnLXVuaXQxCzAJBgNVBAYTAlVT +MQswCQYDVQQIEwJDQTESMBAGA1UEBxMJU2FuIERpZWdvMIGfMA0GCSqGSIb3DQEB +AQUAA4GNADCBiQKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmK +FGIbljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT +21H2qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwID +AQABoxMwETAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBDQUAA4GBAPIWSkLX +QRMApOjjyC+tMxumT5e2pMqChHmxobQK4NMdrf2VCx+cRT6EmY8sK3/Xl/X8UBQ+ +9n5zXb1ZwhW/sTWgUvmOceJ4/XVs9FkdWOOn1J0XBch9ZIiFe/s5ASIgG7fUdcUF +9mAWS6FK2ca3xIh5kIupCXOFa0dPvlw/YUFT +-----END CERTIFICATE----- +""") + +intermediate_key_pem = b("""-----BEGIN RSA PRIVATE KEY----- +MIICWwIBAAKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmKFGIb +ljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT21H2 +qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwIDAQAB +AoGAfSZVV80pSeOKHTYfbGdNY/jHdU9eFUa/33YWriXU+77EhpIItJjkRRgivIfo +rhFJpBSGmDLblaqepm8emsXMeH4+2QzOYIf0QGGP6E6scjTt1PLqdqKfVJ1a2REN +147cujNcmFJb/5VQHHMpaPTgttEjlzuww4+BCDPsVRABWrkCQQD3loH36nLoQTtf ++kQq0T6Bs9/UWkTAGo0ND81ALj0F8Ie1oeZg6RNT96RxZ3aVuFTESTv6/TbjWywO +wdzlmV1vAkEA38rTJ6PTwaJlw5OttdDzAXGPB9tDmzh9oSi7cHwQQXizYd8MBYx4 +sjHUKD3dCQnb1dxJFhd3BT5HsnkRMbVZXQJAbXduH17ZTzcIOXc9jHDXYiFVZV5D +52vV0WCbLzVCZc3jMrtSUKa8lPN5EWrdU3UchWybyG0MR5mX8S5lrF4SoQJAIyUD +DBKaSqpqONCUUx1BTFS9FYrFjzbL4+c1qHCTTPTblt8kUCrDOZjBrKAqeiTmNSum +/qUot9YUBF8m6BuGsQJATHHmdFy/fG1VLkyBp49CAa8tN3Z5r/CgTznI4DfMTf4C +NbRHn2UmYlwQBa+L5lg9phewNe8aEwpPyPLoV85U8Q== +-----END RSA PRIVATE KEY----- +""") + server_cert_pem = b("""-----BEGIN CERTIFICATE----- MIICKDCCAZGgAwIBAgIJAJn/HpR21r/8MA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH @@ -114,6 +156,40 @@ r50+LF74iLXFwqysVCebPKMOpDWp/qQ1BbJQIPs7/A== -----END RSA PRIVATE KEY----- """)) +intermediate_server_cert_pem = b("""-----BEGIN CERTIFICATE----- +MIICWDCCAcGgAwIBAgIRAPQFY9jfskSihdiNSNdt6GswDQYJKoZIhvcNAQENBQAw +ZjEVMBMGA1UEAxMMaW50ZXJtZWRpYXRlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT +CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh +biBEaWVnbzAeFw0xNDA4MjgwMjEwNDhaFw0yNDA4MjUwMjEwNDhaMG4xHTAbBgNV +BAMTFGludGVybWVkaWF0ZS1zZXJ2aWNlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT +CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh +biBEaWVnbzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAqpJZygd+w1faLOr1 +iOAmbBhx5SZWcTCZ/ZjHQTJM7GuPT624QkqsixFghRKdDROwpwnAP7gMRukLqiy4 ++kRuGT5OfyGggL95i2xqA+zehjj08lSTlvGHpePJgCyTavIy5+Ljsj4DKnKyuhxm +biXTRrH83NDgixVkObTEmh/OVK0CAwEAATANBgkqhkiG9w0BAQ0FAAOBgQBa0Npw +UkzjaYEo1OUE1sTI6Mm4riTIHMak4/nswKh9hYup//WVOlr/RBSBtZ7Q/BwbjobN +3bfAtV7eSAqBsfxYXyof7G1ALANQERkq3+oyLP1iVt08W1WOUlIMPhdCF/QuCwy6 +x9MJLhUCGLJPM+O2rAPWVD9wCmvq10ALsiH3yA== +-----END CERTIFICATE----- +""") + +intermediate_server_key_pem = b("""-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCqklnKB37DV9os6vWI4CZsGHHlJlZxMJn9mMdBMkzsa49PrbhC +SqyLEWCFEp0NE7CnCcA/uAxG6QuqLLj6RG4ZPk5/IaCAv3mLbGoD7N6GOPTyVJOW +8Yel48mALJNq8jLn4uOyPgMqcrK6HGZuJdNGsfzc0OCLFWQ5tMSaH85UrQIDAQAB +AoGAIQ594j5zna3/9WaPsTgnmhlesVctt4AAx/n827DA4ayyuHFlXUuVhtoWR5Pk +5ezj9mtYW8DyeCegABnsu2vZni/CdvU6uiS1Hv6qM1GyYDm9KWgovIP9rQCDSGaz +d57IWVGxx7ODFkm3gN5nxnSBOFVHytuW1J7FBRnEsehRroECQQDXHFOv82JuXDcz +z3+4c74IEURdOHcbycxlppmK9kFqm5lsUdydnnGW+mvwDk0APOB7Wg7vyFyr393e +dpmBDCzNAkEAyv6tVbTKUYhSjW+QhabJo896/EqQEYUmtMXxk4cQnKeR/Ao84Rkf +EqD5IykMUfUI0jJU4DGX+gWZ10a7kNbHYQJAVFCuHNFxS4Cpwo0aqtnzKoZaHY/8 +X9ABZfafSHCtw3Op92M+7ikkrOELXdS9KdKyyqbKJAKNEHF3LbOfB44WIQJAA2N4 +9UNNVUsXRbElEnYUS529CdUczo4QdVgQjkvk5RiPAUwSdBd9Q0xYnFOlFwEmIowg +ipWJWe0aAlP18ZcEQQJBAL+5lekZ/GUdQoZ4HAsN5a9syrzavJ9VvU1KOOPorPZK +nMRZbbQgP+aSB7yl6K0gaLaZ8XaK0pjxNBh6ASqg9f4= +-----END RSA PRIVATE KEY----- +""") + client_cert_pem = b("""-----BEGIN CERTIFICATE----- MIICJjCCAY+gAwIBAgIJAKxpFI5lODkjMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH @@ -248,6 +324,27 @@ Ho4EzbYCOaEAMQA= -----END PKCS7----- """) +pkcs7DataASN1 = base64.b64decode(b""" +MIIDNwYJKoZIhvcNAQcCoIIDKDCCAyQCAQExADALBgkqhkiG9w0BBwGgggMKMIID +BjCCAm+gAwIBAgIBATANBgkqhkiG9w0BAQQFADB7MQswCQYDVQQGEwJTRzERMA8G +A1UEChMITTJDcnlwdG8xFDASBgNVBAsTC00yQ3J5cHRvIENBMSQwIgYDVQQDExtN +MkNyeXB0byBDZXJ0aWZpY2F0ZSBNYXN0ZXIxHTAbBgkqhkiG9w0BCQEWDm5ncHNA +cG9zdDEuY29tMB4XDTAwMDkxMDA5NTEzMFoXDTAyMDkxMDA5NTEzMFowUzELMAkG +A1UEBhMCU0cxETAPBgNVBAoTCE0yQ3J5cHRvMRIwEAYDVQQDEwlsb2NhbGhvc3Qx +HTAbBgkqhkiG9w0BCQEWDm5ncHNAcG9zdDEuY29tMFwwDQYJKoZIhvcNAQEBBQAD +SwAwSAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh5kwI +zOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAaOCAQQwggEAMAkGA1UdEwQCMAAw +LAYJYIZIAYb4QgENBB8WHU9wZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRlMB0G +A1UdDgQWBBTPhIKSvnsmYsBVNWjj0m3M2z0qVTCBpQYDVR0jBIGdMIGagBT7hyNp +65w6kxXlxb8pUU/+7Sg4AaF/pH0wezELMAkGA1UEBhMCU0cxETAPBgNVBAoTCE0y +Q3J5cHRvMRQwEgYDVQQLEwtNMkNyeXB0byBDQTEkMCIGA1UEAxMbTTJDcnlwdG8g +Q2VydGlmaWNhdGUgTWFzdGVyMR0wGwYJKoZIhvcNAQkBFg5uZ3BzQHBvc3QxLmNv +bYIBADANBgkqhkiG9w0BAQQFAAOBgQA7/CqT6PoHycTdhEStWNZde7M/2Yc6BoJu +VwnW8YxGO8Sn6UJ4FeffZNcYZddSDKosw8LtPOeWoK3JINjAk5jiPQ2cww++7QGG +/g5NDjxFZNDJP1dGiLAxPW6JXwov4v0FmdzfLOZ01jDcgQQZqEpYlgpuI5JEWUQ9 +Ho4EzbYCOaEAMQA= +""") + crlData = b("""\ -----BEGIN X509 CRL----- MIIBWzCBxTANBgkqhkiG9w0BAQQFADBYMQswCQYDVQQGEwJVUzELMAkGA1UECBMC @@ -513,7 +610,7 @@ class X509ExtTests(TestCase): def test_issuer(self): """ - If an extension requires a issuer, the :py:data:`issuer` parameter to + If an extension requires an issuer, the :py:data:`issuer` parameter to :py:class:`X509Extension` provides its value. """ ext2 = X509Extension( @@ -906,7 +1003,7 @@ class X509NameTests(TestCase): self.assertEqual( a.der(), b('0\x1b1\x0b0\t\x06\x03U\x04\x06\x13\x02US' - '1\x0c0\n\x06\x03U\x04\x03\x13\x03foo')) + '1\x0c0\n\x06\x03U\x04\x03\x0c\x03foo')) def test_get_components(self): @@ -1189,7 +1286,7 @@ class X509ReqTests(TestCase, _PKeyInteractionTestsMixin): def test_verify_success(self): """ :py:obj:`X509Req.verify` returns :py:obj:`True` if called with a - :py:obj:`OpenSSL.crypto.PKey` which represents the public part ofthe key + :py:obj:`OpenSSL.crypto.PKey` which represents the public part of the key which signed the request. """ request = X509Req() @@ -1465,19 +1562,29 @@ WpOdIpB8KksUTCzV591Nr1wd cert.gmtime_adj_notAfter(2) self.assertFalse(cert.has_expired()) + def test_root_has_not_expired(self): + """ + :py:obj:`X509Type.has_expired` returns :py:obj:`False` if the certificate's not-after + time is in the future. + """ + cert = load_certificate(FILETYPE_PEM, root_cert_pem) + self.assertFalse(cert.has_expired()) + def test_digest(self): """ :py:obj:`X509.digest` returns a string giving ":"-separated hex-encoded words of the digest of the certificate. """ - cert = X509() + cert = load_certificate(FILETYPE_PEM, root_cert_pem) self.assertEqual( # This is MD5 instead of GOOD_DIGEST because the digest algorithm # actually matters to the assertion (ie, another arbitrary, good # digest will not product the same digest). + # Digest verified with the command: + # openssl x509 -in root_cert.pem -noout -fingerprint -md5 cert.digest("MD5"), - b("A8:EB:07:F8:53:25:0A:F2:56:05:C5:A5:C4:C4:C7:15")) + b("19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75")) def _extcert(self, pkey, extensions): @@ -1490,6 +1597,7 @@ WpOdIpB8KksUTCzV591Nr1wd cert.set_notAfter(when) cert.add_extensions(extensions) + cert.sign(pkey, 'sha1') return load_certificate( FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert)) @@ -1969,6 +2077,31 @@ class PKCS12Tests(TestCase): self.verify_pkcs12_container(p12) + def test_load_pkcs12_text_passphrase(self): + """ + A PKCS12 string generated using the openssl command line can be loaded + with :py:obj:`load_pkcs12` and its components extracted and examined. + Using text as passphrase instead of bytes. DeprecationWarning expected. + """ + pem = client_key_pem + client_cert_pem + passwd = b"whatever" + p12_str = _runopenssl(pem, b"pkcs12", b"-export", b"-clcerts", + b"-passout", b"pass:" + passwd) + with catch_warnings(record=True) as w: + simplefilter("always") + p12 = load_pkcs12(p12_str, passphrase=b"whatever".decode("ascii")) + + self.assertEqual( + "{0} for passphrase is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + + self.verify_pkcs12_container(p12) + + def test_load_pkcs12_no_passphrase(self): """ A PKCS12 string generated using openssl command line can be loaded with @@ -2170,6 +2303,26 @@ class PKCS12Tests(TestCase): dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"") + def test_export_without_bytes(self): + """ + Test :py:obj:`PKCS12.export` with text not bytes as passphrase + """ + p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem) + + with catch_warnings(record=True) as w: + simplefilter("always") + dumped_p12 = p12.export(passphrase=b"randomtext".decode("ascii")) + self.assertEqual( + "{0} for passphrase is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + self.check_recovery( + dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"randomtext") + + def test_key_cert_mismatch(self): """ :py:obj:`PKCS12.export` raises an exception when a key and certificate @@ -2560,7 +2713,7 @@ class FunctionTests(TestCase): dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, cb) - def test_load_pkcs7_data(self): + def test_load_pkcs7_data_pem(self): """ :py:obj:`load_pkcs7_data` accepts a PKCS#7 string and returns an instance of :py:obj:`PKCS7Type`. @@ -2569,6 +2722,15 @@ class FunctionTests(TestCase): self.assertTrue(isinstance(pkcs7, PKCS7Type)) + def test_load_pkcs7_data_asn1(self): + """ + :py:obj:`load_pkcs7_data` accepts a bytes containing ASN1 data + representing PKCS#7 and returns an instance of :py:obj`PKCS7Type`. + """ + pkcs7 = load_pkcs7_data(FILETYPE_ASN1, pkcs7DataASN1) + self.assertTrue(isinstance(pkcs7, PKCS7Type)) + + def test_load_pkcs7_data_invalid(self): """ If the data passed to :py:obj:`load_pkcs7_data` is invalid, @@ -2893,11 +3055,9 @@ class CRLTests(TestCase): self.assertRaises(TypeError, CRL, None) - def test_export(self): + def _get_crl(self): """ - Use python to create a simple CRL with a revocation, and export - the CRL in formats of PEM, DER and text. Those outputs are verified - with the openssl program. + Get a new ``CRL`` with a revocation. """ crl = CRL() revoked = Revoked() @@ -2906,26 +3066,110 @@ class CRLTests(TestCase): revoked.set_serial(b('3ab')) revoked.set_reason(b('sUpErSeDEd')) crl.add_revoked(revoked) + return crl + + def test_export_pem(self): + """ + If not passed a format, ``CRL.export`` returns a "PEM" format string + representing a serial number, a revoked reason, and certificate issuer + information. + """ + crl = self._get_crl() # PEM format dumped_crl = crl.export(self.cert, self.pkey, days=20) text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") + + # These magic values are based on the way the CRL above was constructed + # and with what certificate it was exported. text.index(b('Serial Number: 03AB')) text.index(b('Superseded')) - text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')) + text.index( + b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA') + ) + + + def test_export_der(self): + """ + If passed ``FILETYPE_ASN1`` for the format, ``CRL.export`` returns a + "DER" format string representing a serial number, a revoked reason, and + certificate issuer information. + """ + crl = self._get_crl() # DER format dumped_crl = crl.export(self.cert, self.pkey, FILETYPE_ASN1) - text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER") + text = _runopenssl( + dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER" + ) text.index(b('Serial Number: 03AB')) text.index(b('Superseded')) - text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')) + text.index( + b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA') + ) + + + def test_export_text(self): + """ + If passed ``FILETYPE_TEXT`` for the format, ``CRL.export`` returns a + text format string like the one produced by the openssl command line + tool. + """ + crl = self._get_crl() + + dumped_crl = crl.export(self.cert, self.pkey, FILETYPE_ASN1) + text = _runopenssl( + dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER" + ) # text format dumped_text = crl.export(self.cert, self.pkey, type=FILETYPE_TEXT) self.assertEqual(text, dumped_text) + def test_export_custom_digest(self): + """ + If passed the name of a digest function, ``CRL.export`` uses a + signature algorithm based on that digest function. + """ + crl = self._get_crl() + dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1") + text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") + text.index(b('Signature Algorithm: sha1')) + + + def test_export_md5_digest(self): + """ + If passed md5 as the digest function, ``CRL.export`` uses md5 and does + not emit a deprecation warning. + """ + crl = self._get_crl() + with catch_warnings(record=True) as catcher: + simplefilter("always") + self.assertEqual(0, len(catcher)) + dumped_crl = crl.export(self.cert, self.pkey, digest=b"md5") + text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") + text.index(b('Signature Algorithm: md5')) + + + def test_export_default_digest(self): + """ + If not passed the name of a digest function, ``CRL.export`` uses a + signature algorithm based on MD5 and emits a deprecation warning. + """ + crl = self._get_crl() + with catch_warnings(record=True) as catcher: + simplefilter("always") + dumped_crl = crl.export(self.cert, self.pkey) + self.assertEqual( + "The default message digest (md5) is deprecated. " + "Pass the name of a message digest explicitly.", + str(catcher[0].message), + ) + text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") + text.index(b('Signature Algorithm: md5')) + + def test_export_invalid(self): """ If :py:obj:`CRL.export` is used with an uninitialized :py:obj:`X509` @@ -2956,7 +3200,7 @@ class CRLTests(TestCase): crl = CRL() self.assertRaises(TypeError, crl.export) self.assertRaises(TypeError, crl.export, self.cert) - self.assertRaises(TypeError, crl.export, self.cert, self.pkey, FILETYPE_PEM, 10, "foo") + self.assertRaises(TypeError, crl.export, self.cert, self.pkey, FILETYPE_PEM, 10, "md5", "foo") self.assertRaises(TypeError, crl.export, None, self.pkey, FILETYPE_PEM, 10) self.assertRaises(TypeError, crl.export, self.cert, None, FILETYPE_PEM, 10) @@ -2974,6 +3218,19 @@ class CRLTests(TestCase): self.assertRaises(ValueError, crl.export, self.cert, self.pkey, 100, 10) + def test_export_unknown_digest(self): + """ + Calling :py:obj:`OpenSSL.CRL.export` with a unsupported digest results + in a :py:obj:`ValueError` being raised. + """ + crl = CRL() + self.assertRaises( + ValueError, + crl.export, + self.cert, self.pkey, FILETYPE_PEM, 10, b"strange-digest" + ) + + def test_get_revoked(self): """ Use python to create a simple CRL with two revocations. @@ -3074,6 +3331,107 @@ class CRLTests(TestCase): +class X509StoreContextTests(TestCase): + """ + Tests for :py:obj:`OpenSSL.crypto.X509StoreContext`. + """ + root_cert = load_certificate(FILETYPE_PEM, root_cert_pem) + intermediate_cert = load_certificate(FILETYPE_PEM, intermediate_cert_pem) + intermediate_server_cert = load_certificate(FILETYPE_PEM, intermediate_server_cert_pem) + + def test_valid(self): + """ + :py:obj:`verify_certificate` returns ``None`` when called with a certificate + and valid chain. + """ + store = X509Store() + store.add_cert(self.root_cert) + store.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + self.assertEqual(store_ctx.verify_certificate(), None) + + + def test_reuse(self): + """ + :py:obj:`verify_certificate` can be called multiple times with the same + ``X509StoreContext`` instance to produce the same result. + """ + store = X509Store() + store.add_cert(self.root_cert) + store.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + self.assertEqual(store_ctx.verify_certificate(), None) + self.assertEqual(store_ctx.verify_certificate(), None) + + + def test_trusted_self_signed(self): + """ + :py:obj:`verify_certificate` returns ``None`` when called with a self-signed + certificate and itself in the chain. + """ + store = X509Store() + store.add_cert(self.root_cert) + store_ctx = X509StoreContext(store, self.root_cert) + self.assertEqual(store_ctx.verify_certificate(), None) + + + def test_untrusted_self_signed(self): + """ + :py:obj:`verify_certificate` raises error when a self-signed certificate is + verified without itself in the chain. + """ + store = X509Store() + store_ctx = X509StoreContext(store, self.root_cert) + e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) + self.assertEqual(e.args[0][2], 'self signed certificate') + self.assertEqual(e.certificate.get_subject().CN, 'Testing Root CA') + + + def test_invalid_chain_no_root(self): + """ + :py:obj:`verify_certificate` raises error when a root certificate is missing + from the chain. + """ + store = X509Store() + store.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) + self.assertEqual(e.args[0][2], 'unable to get issuer certificate') + self.assertEqual(e.certificate.get_subject().CN, 'intermediate') + + + def test_invalid_chain_no_intermediate(self): + """ + :py:obj:`verify_certificate` raises error when an intermediate certificate is + missing from the chain. + """ + store = X509Store() + store.add_cert(self.root_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) + self.assertEqual(e.args[0][2], 'unable to get local issuer certificate') + self.assertEqual(e.certificate.get_subject().CN, 'intermediate-service') + + + def test_modification_pre_verify(self): + """ + :py:obj:`verify_certificate` can use a store context modified after + instantiation. + """ + store_bad = X509Store() + store_bad.add_cert(self.intermediate_cert) + store_good = X509Store() + store_good.add_cert(self.root_cert) + store_good.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store_bad, self.intermediate_server_cert) + e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) + self.assertEqual(e.args[0][2], 'unable to get issuer certificate') + self.assertEqual(e.certificate.get_subject().CN, 'intermediate') + store_ctx.set_store(store_good) + self.assertEqual(store_ctx.verify_certificate(), None) + + + class SignVerifyTests(TestCase): """ Tests for :py:obj:`OpenSSL.crypto.sign` and :py:obj:`OpenSSL.crypto.verify`. @@ -3119,6 +3477,47 @@ class SignVerifyTests(TestCase): ValueError, verify, good_cert, sig, content, "strange-digest") + def test_sign_verify_with_text(self): + """ + :py:obj:`sign` generates a cryptographic signature which :py:obj:`verify` can check. + Deprecation warnings raised because using text instead of bytes as content + """ + content = ( + b"It was a bright cold day in April, and the clocks were striking " + b"thirteen. Winston Smith, his chin nuzzled into his breast in an " + b"effort to escape the vile wind, slipped quickly through the " + b"glass doors of Victory Mansions, though not quickly enough to " + b"prevent a swirl of gritty dust from entering along with him." + ).decode("ascii") + + priv_key = load_privatekey(FILETYPE_PEM, root_key_pem) + cert = load_certificate(FILETYPE_PEM, root_cert_pem) + for digest in ['md5', 'sha1']: + with catch_warnings(record=True) as w: + simplefilter("always") + sig = sign(priv_key, content, digest) + + self.assertEqual( + "{0} for data is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + + with catch_warnings(record=True) as w: + simplefilter("always") + verify(cert, sig, content, digest) + + self.assertEqual( + "{0} for data is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + + def test_sign_nulls(self): """ :py:obj:`sign` produces a signature for a string with embedded nulls. diff --git a/OpenSSL/test/test_rand.py b/OpenSSL/test/test_rand.py index c52cb6b..3d5c290 100644 --- a/OpenSSL/test/test_rand.py +++ b/OpenSSL/test/test_rand.py @@ -10,7 +10,7 @@ import os import stat import sys -from OpenSSL.test.util import TestCase, b +from OpenSSL.test.util import NON_ASCII, TestCase, b from OpenSSL import rand @@ -176,27 +176,47 @@ class RandTests(TestCase): self.assertRaises(TypeError, rand.write_file, None) self.assertRaises(TypeError, rand.write_file, "foo", None) - - def test_files(self): + def _read_write_test(self, path): """ - Test reading and writing of files via rand functions. + Verify that ``rand.write_file`` and ``rand.load_file`` can be used. """ - # Write random bytes to a file - tmpfile = self.mktemp() - # Make sure it exists (so cleanup definitely succeeds) - fObj = open(tmpfile, 'w') - fObj.close() + # Create the file so cleanup is more straightforward + with open(path, "w"): + pass + try: - rand.write_file(tmpfile) + # Write random bytes to a file + rand.write_file(path) + # Verify length of written file - size = os.stat(tmpfile)[stat.ST_SIZE] + size = os.stat(path)[stat.ST_SIZE] self.assertEqual(1024, size) + # Read random bytes from file - rand.load_file(tmpfile) - rand.load_file(tmpfile, 4) # specify a length + rand.load_file(path) + rand.load_file(path, 4) # specify a length finally: # Cleanup - os.unlink(tmpfile) + os.unlink(path) + + + def test_bytes_paths(self): + """ + Random data can be saved and loaded to files with paths specified as + bytes. + """ + path = self.mktemp() + path += NON_ASCII.encode(sys.getfilesystemencoding()) + self._read_write_test(path) + + + def test_unicode_paths(self): + """ + Random data can be saved and loaded to files with paths specified as + unicode. + """ + path = self.mktemp().decode('utf-8') + NON_ASCII + self._read_write_test(path) if __name__ == '__main__': diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py index 6409b8e..1f231c9 100644 --- a/OpenSSL/test/test_ssl.py +++ b/OpenSSL/test/test_ssl.py @@ -7,12 +7,13 @@ Unit tests for :py:obj:`OpenSSL.SSL`. from gc import collect, get_referrers from errno import ECONNREFUSED, EINPROGRESS, EWOULDBLOCK, EPIPE, ESHUTDOWN -from sys import platform, version_info +from sys import platform, getfilesystemencoding from socket import SHUT_RDWR, error, socket from os import makedirs from os.path import join from unittest import main from weakref import ref +from warnings import catch_warnings, simplefilter from six import PY3, text_type, u @@ -22,7 +23,6 @@ from OpenSSL.crypto import dump_privatekey, load_privatekey from OpenSSL.crypto import dump_certificate, load_certificate from OpenSSL.crypto import get_elliptic_curves -from OpenSSL.SSL import _lib from OpenSSL.SSL import OPENSSL_VERSION_NUMBER, SSLEAY_VERSION, SSLEAY_CFLAGS from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN @@ -43,10 +43,11 @@ from OpenSSL.SSL import ( from OpenSSL.SSL import ( Context, ContextType, Session, Connection, ConnectionType, SSLeay_version) -from OpenSSL.test.util import TestCase, b -from OpenSSL.test.test_crypto import ( - cleartextCertificatePEM, cleartextPrivateKeyPEM) +from OpenSSL._util import lib as _lib + +from OpenSSL.test.util import WARNING_TYPE_EXPECTED, NON_ASCII, TestCase, b from OpenSSL.test.test_crypto import ( + cleartextCertificatePEM, cleartextPrivateKeyPEM, client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, root_cert_pem) @@ -95,6 +96,23 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC """ +def join_bytes_or_unicode(prefix, suffix): + """ + Join two path components of either ``bytes`` or ``unicode``. + + The return type is the same as the type of ``prefix``. + """ + # If the types are the same, nothing special is necessary. + if type(prefix) == type(suffix): + return join(prefix, suffix) + + # Otherwise, coerce suffix to the type of prefix. + if isinstance(prefix, text_type): + return join(prefix, suffix.decode(getfilesystemencoding())) + else: + return join(prefix, suffix.encode(getfilesystemencoding())) + + def verify_cb(conn, cert, errnum, depth, ok): return ok @@ -395,23 +413,52 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(Error, ctx.use_privatekey_file, self.mktemp()) + def _use_privatekey_file_test(self, pemfile, filetype): + """ + Verify that calling ``Context.use_privatekey_file`` with the given + arguments does not raise an exception. + """ + key = PKey() + key.generate_key(TYPE_RSA, 128) + + with open(pemfile, "wt") as pem: + pem.write( + dump_privatekey(FILETYPE_PEM, key).decode("ascii") + ) + + ctx = Context(TLSv1_METHOD) + ctx.use_privatekey_file(pemfile, filetype) + + + def test_use_privatekey_file_bytes(self): + """ + A private key can be specified from a file by passing a ``bytes`` + instance giving the file name to ``Context.use_privatekey_file``. + """ + self._use_privatekey_file_test( + self.mktemp() + NON_ASCII.encode(getfilesystemencoding()), + FILETYPE_PEM, + ) + + + def test_use_privatekey_file_unicode(self): + """ + A private key can be specified from a file by passing a ``unicode`` + instance giving the file name to ``Context.use_privatekey_file``. + """ + self._use_privatekey_file_test( + self.mktemp().decode(getfilesystemencoding()) + NON_ASCII, + FILETYPE_PEM, + ) + + if not PY3: def test_use_privatekey_file_long(self): """ On Python 2 :py:obj:`Context.use_privatekey_file` accepts a filetype of type :py:obj:`long` as well as :py:obj:`int`. """ - pemfile = self.mktemp() - - key = PKey() - key.generate_key(TYPE_RSA, 128) - - with open(pemfile, "wt") as pem: - pem.write( - dump_privatekey(FILETYPE_PEM, key).decode("ascii")) - - ctx = Context(TLSv1_METHOD) - ctx.use_privatekey_file(pemfile, long(FILETYPE_PEM)) + self._use_privatekey_file_test(self.mktemp(), long(FILETYPE_PEM)) def test_use_certificate_wrong_args(self): @@ -476,21 +523,40 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(Error, ctx.use_certificate_file, self.mktemp()) - def test_use_certificate_file(self): + def _use_certificate_file_test(self, certificate_file): """ - :py:obj:`Context.use_certificate` sets the certificate which will be - used to identify connections created using the context. + Verify that calling ``Context.use_certificate_file`` with the given + filename doesn't raise an exception. """ # TODO # Hard to assert anything. But we could set a privatekey then ask # OpenSSL if the cert and key agree using check_privatekey. Then as # long as check_privatekey works right we're good... - pem_filename = self.mktemp() - with open(pem_filename, "wb") as pem_file: + with open(certificate_file, "wb") as pem_file: pem_file.write(cleartextCertificatePEM) ctx = Context(TLSv1_METHOD) - ctx.use_certificate_file(pem_filename) + ctx.use_certificate_file(certificate_file) + + + def test_use_certificate_file_bytes(self): + """ + :py:obj:`Context.use_certificate_file` sets the certificate (given as a + ``bytes`` filename) which will be used to identify connections created + using the context. + """ + filename = self.mktemp() + NON_ASCII.encode(getfilesystemencoding()) + self._use_certificate_file_test(filename) + + + def test_use_certificate_file_unicode(self): + """ + :py:obj:`Context.use_certificate_file` sets the certificate (given as a + ``bytes`` filename) which will be used to identify connections created + using the context. + """ + filename = self.mktemp().decode(getfilesystemencoding()) + NON_ASCII + self._use_certificate_file_test(filename) if not PY3: @@ -507,6 +573,43 @@ class ContextTests(TestCase, _LoopbackMixin): ctx.use_certificate_file(pem_filename, long(FILETYPE_PEM)) + def test_check_privatekey_valid(self): + """ + :py:obj:`Context.check_privatekey` returns :py:obj:`None` if the + :py:obj:`Context` instance has been configured to use a matched key and + certificate pair. + """ + key = load_privatekey(FILETYPE_PEM, client_key_pem) + cert = load_certificate(FILETYPE_PEM, client_cert_pem) + context = Context(TLSv1_METHOD) + context.use_privatekey(key) + context.use_certificate(cert) + self.assertIs(None, context.check_privatekey()) + + + def test_check_privatekey_invalid(self): + """ + :py:obj:`Context.check_privatekey` raises :py:obj:`Error` if the + :py:obj:`Context` instance has been configured to use a key and + certificate pair which don't relate to each other. + """ + key = load_privatekey(FILETYPE_PEM, client_key_pem) + cert = load_certificate(FILETYPE_PEM, server_cert_pem) + context = Context(TLSv1_METHOD) + context.use_privatekey(key) + context.use_certificate(cert) + self.assertRaises(Error, context.check_privatekey) + + + def test_check_privatekey_wrong_args(self): + """ + :py:obj:`Context.check_privatekey` raises :py:obj:`TypeError` if called + with other than no arguments. + """ + context = Context(TLSv1_METHOD) + self.assertRaises(TypeError, context.check_privatekey, object()) + + def test_set_app_data_wrong_args(self): """ :py:obj:`Context.set_app_data` raises :py:obj:`TypeError` if called with other than @@ -867,12 +970,13 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertEqual(cert.get_subject().CN, 'Testing Root CA') - def test_load_verify_file(self): + def _load_verify_cafile(self, cafile): """ - :py:obj:`Context.load_verify_locations` accepts a file name and uses the - certificates within for verification purposes. + Verify that if path to a file containing a certificate is passed to + ``Context.load_verify_locations`` for the ``cafile`` parameter, that + certificate is used as a trust root for the purposes of verifying + connections created using that ``Context``. """ - cafile = self.mktemp() fObj = open(cafile, 'w') fObj.write(cleartextCertificatePEM.decode('ascii')) fObj.close() @@ -880,6 +984,27 @@ class ContextTests(TestCase, _LoopbackMixin): self._load_verify_locations_test(cafile) + def test_load_verify_bytes_cafile(self): + """ + :py:obj:`Context.load_verify_locations` accepts a file name as a + ``bytes`` instance and uses the certificates within for verification + purposes. + """ + cafile = self.mktemp() + NON_ASCII.encode(getfilesystemencoding()) + self._load_verify_cafile(cafile) + + + def test_load_verify_unicode_cafile(self): + """ + :py:obj:`Context.load_verify_locations` accepts a file name as a + ``unicode`` instance and uses the certificates within for verification + purposes. + """ + self._load_verify_cafile( + self.mktemp().decode(getfilesystemencoding()) + NON_ASCII + ) + + def test_load_verify_invalid_file(self): """ :py:obj:`Context.load_verify_locations` raises :py:obj:`Error` when passed a @@ -890,25 +1015,47 @@ class ContextTests(TestCase, _LoopbackMixin): Error, clientContext.load_verify_locations, self.mktemp()) - def test_load_verify_directory(self): + def _load_verify_directory_locations_capath(self, capath): """ - :py:obj:`Context.load_verify_locations` accepts a directory name and uses - the certificates within for verification purposes. + Verify that if path to a directory containing certificate files is + passed to ``Context.load_verify_locations`` for the ``capath`` + parameter, those certificates are used as trust roots for the purposes + of verifying connections created using that ``Context``. """ - capath = self.mktemp() makedirs(capath) # Hash values computed manually with c_rehash to avoid depending on # c_rehash in the test suite. One is from OpenSSL 0.9.8, the other # from OpenSSL 1.0.0. for name in [b'c7adac82.0', b'c3705638.0']: - cafile = join(capath, name) - fObj = open(cafile, 'w') - fObj.write(cleartextCertificatePEM.decode('ascii')) - fObj.close() + cafile = join_bytes_or_unicode(capath, name) + with open(cafile, 'w') as fObj: + fObj.write(cleartextCertificatePEM.decode('ascii')) self._load_verify_locations_test(None, capath) + def test_load_verify_directory_bytes_capath(self): + """ + :py:obj:`Context.load_verify_locations` accepts a directory name as a + ``bytes`` instance and uses the certificates within for verification + purposes. + """ + self._load_verify_directory_locations_capath( + self.mktemp() + NON_ASCII.encode(getfilesystemencoding()) + ) + + + def test_load_verify_directory_unicode_capath(self): + """ + :py:obj:`Context.load_verify_locations` accepts a directory name as a + ``unicode`` instance and uses the certificates within for verification + purposes. + """ + self._load_verify_directory_locations_capath( + self.mktemp().decode(getfilesystemencoding()) + NON_ASCII + ) + + def test_load_verify_locations_wrong_args(self): """ :py:obj:`Context.load_verify_locations` raises :py:obj:`TypeError` if called with @@ -938,8 +1085,8 @@ class ContextTests(TestCase, _LoopbackMixin): # in a unit test is bad, but it's the only way I can think of to # really test this. -exarkun - # Arg, verisign.com doesn't speak TLSv1 - context = Context(SSLv3_METHOD) + # Arg, verisign.com doesn't speak anything newer than TLS 1.0 + context = Context(TLSv1_METHOD) context.set_default_verify_paths() context.set_verify( VERIFY_PEER, @@ -1052,11 +1199,11 @@ class ContextTests(TestCase, _LoopbackMixin): def test_add_extra_chain_cert(self): """ - :py:obj:`Context.add_extra_chain_cert` accepts an :py:obj:`X509` instance to add to - the certificate chain. + :py:obj:`Context.add_extra_chain_cert` accepts an :py:obj:`X509` + instance to add to the certificate chain. - See :py:obj:`_create_certificate_chain` for the details of the certificate - chain tested. + See :py:obj:`_create_certificate_chain` for the details of the + certificate chain tested. The chain is tested by starting a server with scert and connecting to it with a client which trusts cacert and requires verification to @@ -1067,15 +1214,17 @@ class ContextTests(TestCase, _LoopbackMixin): # Dump the CA certificate to a file because that's the only way to load # it as a trusted CA in the client context. - for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]: - fObj = open(name, 'w') - fObj.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii')) - fObj.close() - - for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]: - fObj = open(name, 'w') - fObj.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii')) - fObj.close() + for cert, name in [(cacert, 'ca.pem'), + (icert, 'i.pem'), + (scert, 's.pem')]: + with open(join(self.tmpdir, name), 'w') as f: + f.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii')) + + for key, name in [(cakey, 'ca.key'), + (ikey, 'i.key'), + (skey, 's.key')]: + with open(join(self.tmpdir, name), 'w') as f: + f.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii')) # Create the server context serverContext = Context(TLSv1_METHOD) @@ -1088,49 +1237,73 @@ class ContextTests(TestCase, _LoopbackMixin): clientContext = Context(TLSv1_METHOD) clientContext.set_verify( VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) - clientContext.load_verify_locations(b"ca.pem") + clientContext.load_verify_locations(join(self.tmpdir, "ca.pem")) # Try it out. self._handshake_test(serverContext, clientContext) - def test_use_certificate_chain_file(self): + def _use_certificate_chain_file_test(self, certdir): """ - :py:obj:`Context.use_certificate_chain_file` reads a certificate chain from - the specified file. + Verify that :py:obj:`Context.use_certificate_chain_file` reads a + certificate chain from a specified file. - The chain is tested by starting a server with scert and connecting - to it with a client which trusts cacert and requires verification to + The chain is tested by starting a server with scert and connecting to + it with a client which trusts cacert and requires verification to succeed. """ chain = _create_certificate_chain() [(cakey, cacert), (ikey, icert), (skey, scert)] = chain + makedirs(certdir) + + chainFile = join_bytes_or_unicode(certdir, "chain.pem") + caFile = join_bytes_or_unicode(certdir, "ca.pem") + # Write out the chain file. - chainFile = self.mktemp() - fObj = open(chainFile, 'wb') - # Most specific to least general. - fObj.write(dump_certificate(FILETYPE_PEM, scert)) - fObj.write(dump_certificate(FILETYPE_PEM, icert)) - fObj.write(dump_certificate(FILETYPE_PEM, cacert)) - fObj.close() + with open(chainFile, 'wb') as fObj: + # Most specific to least general. + fObj.write(dump_certificate(FILETYPE_PEM, scert)) + fObj.write(dump_certificate(FILETYPE_PEM, icert)) + fObj.write(dump_certificate(FILETYPE_PEM, cacert)) + + with open(caFile, 'w') as fObj: + fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii')) serverContext = Context(TLSv1_METHOD) serverContext.use_certificate_chain_file(chainFile) serverContext.use_privatekey(skey) - fObj = open('ca.pem', 'w') - fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii')) - fObj.close() - clientContext = Context(TLSv1_METHOD) clientContext.set_verify( VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) - clientContext.load_verify_locations(b"ca.pem") + clientContext.load_verify_locations(caFile) self._handshake_test(serverContext, clientContext) + def test_use_certificate_chain_file_bytes(self): + """ + ``Context.use_certificate_chain_file`` accepts the name of a file (as + an instance of ``bytes``) to specify additional certificates to use to + construct and verify a trust chain. + """ + self._use_certificate_chain_file_test( + self.mktemp() + NON_ASCII.encode(getfilesystemencoding()) + ) + + + def test_use_certificate_chain_file_unicode(self): + """ + ``Context.use_certificate_chain_file`` accepts the name of a file (as + an instance of ``unicode``) to specify additional certificates to use + to construct and verify a trust chain. + """ + self._use_certificate_chain_file_test( + self.mktemp().decode(getfilesystemencoding()) + NON_ASCII + ) + + def test_use_certificate_chain_file_wrong_args(self): """ :py:obj:`Context.use_certificate_chain_file` raises :py:obj:`TypeError` @@ -1205,20 +1378,39 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(Error, context.load_tmp_dh, b"hello") - def test_load_tmp_dh(self): + def _load_tmp_dh_test(self, dhfilename): """ - :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the - specified file. + Verify that calling ``Context.load_tmp_dh`` with the given filename + does not raise an exception. """ context = Context(TLSv1_METHOD) - dhfilename = self.mktemp() - dhfile = open(dhfilename, "w") - dhfile.write(dhparam) - dhfile.close() + with open(dhfilename, "w") as dhfile: + dhfile.write(dhparam) + context.load_tmp_dh(dhfilename) # XXX What should I assert here? -exarkun + def test_load_tmp_dh_bytes(self): + """ + :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the + specified file (given as ``bytes``). + """ + self._load_tmp_dh_test( + self.mktemp() + NON_ASCII.encode(getfilesystemencoding()), + ) + + + def test_load_tmp_dh_unicode(self): + """ + :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the + specified file (given as ``unicode``). + """ + self._load_tmp_dh_test( + self.mktemp().decode(getfilesystemencoding()) + NON_ASCII, + ) + + def test_set_tmp_ecdh(self): """ :py:obj:`Context.set_tmp_ecdh` sets the elliptic curve for @@ -1434,6 +1626,396 @@ class ServerNameCallbackTests(TestCase, _LoopbackMixin): self.assertEqual([(server, b("foo1.example.com"))], args) +class NextProtoNegotiationTests(TestCase, _LoopbackMixin): + """ + Test for Next Protocol Negotiation in PyOpenSSL. + """ + if _lib.Cryptography_HAS_NEXTPROTONEG: + def test_npn_success(self): + """ + Tests that clients and servers that agree on the negotiated next + protocol can correct establish a connection, and that the agreed + protocol is reported by the connections. + """ + advertise_args = [] + select_args = [] + def advertise(conn): + advertise_args.append((conn,)) + return [b'http/1.1', b'spdy/2'] + def select(conn, options): + select_args.append((conn, options)) + return b'spdy/2' + + server_context = Context(TLSv1_METHOD) + server_context.set_npn_advertise_callback(advertise) + + client_context = Context(TLSv1_METHOD) + client_context.set_npn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + self._interactInMemory(server, client) + + self.assertEqual([(server,)], advertise_args) + self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args) + + self.assertEqual(server.get_next_proto_negotiated(), b'spdy/2') + self.assertEqual(client.get_next_proto_negotiated(), b'spdy/2') + + + def test_npn_client_fail(self): + """ + Tests that when clients and servers cannot agree on what protocol + to use next that the TLS connection does not get established. + """ + advertise_args = [] + select_args = [] + def advertise(conn): + advertise_args.append((conn,)) + return [b'http/1.1', b'spdy/2'] + def select(conn, options): + select_args.append((conn, options)) + return b'' + + server_context = Context(TLSv1_METHOD) + server_context.set_npn_advertise_callback(advertise) + + client_context = Context(TLSv1_METHOD) + client_context.set_npn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + # If the client doesn't return anything, the connection will fail. + self.assertRaises(Error, self._interactInMemory, server, client) + + self.assertEqual([(server,)], advertise_args) + self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args) + + + def test_npn_select_error(self): + """ + Test that we can handle exceptions in the select callback. If + select fails it should be fatal to the connection. + """ + advertise_args = [] + def advertise(conn): + advertise_args.append((conn,)) + return [b'http/1.1', b'spdy/2'] + def select(conn, options): + raise TypeError + + server_context = Context(TLSv1_METHOD) + server_context.set_npn_advertise_callback(advertise) + + client_context = Context(TLSv1_METHOD) + client_context.set_npn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + # If the callback throws an exception it should be raised here. + self.assertRaises( + TypeError, self._interactInMemory, server, client + ) + self.assertEqual([(server,)], advertise_args) + + + def test_npn_advertise_error(self): + """ + Test that we can handle exceptions in the advertise callback. If + advertise fails no NPN is advertised to the client. + """ + select_args = [] + def advertise(conn): + raise TypeError + def select(conn, options): + select_args.append((conn, options)) + return b'' + + server_context = Context(TLSv1_METHOD) + server_context.set_npn_advertise_callback(advertise) + + client_context = Context(TLSv1_METHOD) + client_context.set_npn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + # If the client doesn't return anything, the connection will fail. + self.assertRaises( + TypeError, self._interactInMemory, server, client + ) + self.assertEqual([], select_args) + + else: + # No NPN. + def test_npn_not_implemented(self): + # Test the context methods first. + context = Context(TLSv1_METHOD) + fail_methods = [ + context.set_npn_advertise_callback, + context.set_npn_select_callback, + ] + for method in fail_methods: + self.assertRaises( + NotImplementedError, method, None + ) + + # Now test a connection. + conn = Connection(context) + fail_methods = [ + conn.get_next_proto_negotiated, + ] + for method in fail_methods: + self.assertRaises(NotImplementedError, method) + + + +class ApplicationLayerProtoNegotiationTests(TestCase, _LoopbackMixin): + """ + Tests for ALPN in PyOpenSSL. + """ + # Skip tests on versions that don't support ALPN. + if _lib.Cryptography_HAS_ALPN: + + def test_alpn_success(self): + """ + Clients and servers that agree on the negotiated ALPN protocol can + correct establish a connection, and the agreed protocol is reported + by the connections. + """ + select_args = [] + def select(conn, options): + select_args.append((conn, options)) + return b'spdy/2' + + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + self._interactInMemory(server, client) + + self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args) + + self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2') + self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2') + + + def test_alpn_set_on_connection(self): + """ + The same as test_alpn_success, but setting the ALPN protocols on + the connection rather than the context. + """ + select_args = [] + def select(conn, options): + select_args.append((conn, options)) + return b'spdy/2' + + # Setup the client context but don't set any ALPN protocols. + client_context = Context(TLSv1_METHOD) + + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + # Set the ALPN protocols on the client connection. + client = Connection(client_context, None) + client.set_alpn_protos([b'http/1.1', b'spdy/2']) + client.set_connect_state() + + self._interactInMemory(server, client) + + self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args) + + self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2') + self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2') + + + def test_alpn_server_fail(self): + """ + When clients and servers cannot agree on what protocol to use next + the TLS connection does not get established. + """ + select_args = [] + def select(conn, options): + select_args.append((conn, options)) + return b'' + + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + # If the client doesn't return anything, the connection will fail. + self.assertRaises(Error, self._interactInMemory, server, client) + + self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args) + + + def test_alpn_no_server(self): + """ + When clients and servers cannot agree on what protocol to use next + because the server doesn't offer ALPN, no protocol is negotiated. + """ + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + + server_context = Context(TLSv1_METHOD) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + # Do the dance. + self._interactInMemory(server, client) + + self.assertEqual(client.get_alpn_proto_negotiated(), b'') + + + def test_alpn_callback_exception(self): + """ + We can handle exceptions in the ALPN select callback. + """ + select_args = [] + def select(conn, options): + select_args.append((conn, options)) + raise TypeError() + + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) + + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() + + client = Connection(client_context, None) + client.set_connect_state() + + self.assertRaises( + TypeError, self._interactInMemory, server, client + ) + self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args) + + else: + # No ALPN. + def test_alpn_not_implemented(self): + """ + If ALPN is not in OpenSSL, we should raise NotImplementedError. + """ + # Test the context methods first. + context = Context(TLSv1_METHOD) + self.assertRaises( + NotImplementedError, context.set_alpn_protos, None + ) + self.assertRaises( + NotImplementedError, context.set_alpn_select_callback, None + ) + + # Now test a connection. + conn = Connection(context) + self.assertRaises( + NotImplementedError, context.set_alpn_protos, None + ) + + class SessionTests(TestCase): """ @@ -1555,7 +2137,7 @@ class ConnectionTests(TestCase, _LoopbackMixin): self.assertRaises( TypeError, conn.set_tlsext_host_name, b("with\0null")) - if version_info >= (3,): + if PY3: # On Python 3.x, don't accidentally implicitly convert from text. self.assertRaises( TypeError, @@ -1709,6 +2291,40 @@ class ConnectionTests(TestCase, _LoopbackMixin): self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN) + def test_shutdown_closed(self): + """ + If the underlying socket is closed, :py:obj:`Connection.shutdown` propagates the + write error from the low level write call. + """ + server, client = self._loopback() + server.sock_shutdown(2) + exc = self.assertRaises(SysCallError, server.shutdown) + if platform == "win32": + self.assertEqual(exc.args[0], ESHUTDOWN) + else: + self.assertEqual(exc.args[0], EPIPE) + + + def test_shutdown_truncated(self): + """ + If the underlying connection is truncated, :obj:`Connection.shutdown` + raises an :obj:`Error`. + """ + server_ctx = Context(TLSv1_METHOD) + client_ctx = Context(TLSv1_METHOD) + server_ctx.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_ctx.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) + server = Connection(server_ctx, None) + client = Connection(client_ctx, None) + self._handshakeInMemory(client, server) + self.assertEqual(server.shutdown(), False) + self.assertRaises(WantReadError, server.shutdown) + server.bio_shutdown() + self.assertRaises(Error, server.shutdown) + + def test_set_shutdown(self): """ :py:obj:`Connection.set_shutdown` sets the state of the SSL connection shutdown @@ -2182,6 +2798,26 @@ class ConnectionSendTests(TestCase, _LoopbackMixin): self.assertEquals(count, 2) self.assertEquals(client.recv(2), b('xy')) + + def test_text(self): + """ + When passed a text, :py:obj:`Connection.send` transmits all of it and + returns the number of bytes sent. It also raises a DeprecationWarning. + """ + server, client = self._loopback() + with catch_warnings(record=True) as w: + simplefilter("always") + count = server.send(b"xy".decode("ascii")) + self.assertEqual( + "{0} for buf is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + self.assertEquals(count, 2) + self.assertEquals(client.recv(2), b"xy") + try: memoryview except NameError: @@ -2217,6 +2853,164 @@ class ConnectionSendTests(TestCase, _LoopbackMixin): +def _make_memoryview(size): + """ + Create a new ``memoryview`` wrapped around a ``bytearray`` of the given + size. + """ + return memoryview(bytearray(size)) + + + +class ConnectionRecvIntoTests(TestCase, _LoopbackMixin): + """ + Tests for :py:obj:`Connection.recv_into` + """ + def _no_length_test(self, factory): + """ + Assert that when the given buffer is passed to + ``Connection.recv_into``, whatever bytes are available to be received + that fit into that buffer are written into that buffer. + """ + output_buffer = factory(5) + + server, client = self._loopback() + server.send(b('xy')) + + self.assertEqual(client.recv_into(output_buffer), 2) + self.assertEqual(output_buffer, bytearray(b('xy\x00\x00\x00'))) + + + def test_bytearray_no_length(self): + """ + :py:obj:`Connection.recv_into` can be passed a ``bytearray`` instance + and data in the receive buffer is written to it. + """ + self._no_length_test(bytearray) + + + def _respects_length_test(self, factory): + """ + Assert that when the given buffer is passed to ``Connection.recv_into`` + along with a value for ``nbytes`` that is less than the size of that + buffer, only ``nbytes`` bytes are written into the buffer. + """ + output_buffer = factory(10) + + server, client = self._loopback() + server.send(b('abcdefghij')) + + self.assertEqual(client.recv_into(output_buffer, 5), 5) + self.assertEqual( + output_buffer, bytearray(b('abcde\x00\x00\x00\x00\x00')) + ) + + + def test_bytearray_respects_length(self): + """ + When called with a ``bytearray`` instance, + :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter and + doesn't copy in more than that number of bytes. + """ + self._respects_length_test(bytearray) + + + def _doesnt_overfill_test(self, factory): + """ + Assert that if there are more bytes available to be read from the + receive buffer than would fit into the buffer passed to + :py:obj:`Connection.recv_into`, only as many as fit are written into + it. + """ + output_buffer = factory(5) + + server, client = self._loopback() + server.send(b('abcdefghij')) + + self.assertEqual(client.recv_into(output_buffer), 5) + self.assertEqual(output_buffer, bytearray(b('abcde'))) + rest = client.recv(5) + self.assertEqual(b('fghij'), rest) + + + def test_bytearray_doesnt_overfill(self): + """ + When called with a ``bytearray`` instance, + :py:obj:`Connection.recv_into` respects the size of the array and + doesn't write more bytes into it than will fit. + """ + self._doesnt_overfill_test(bytearray) + + + def _really_doesnt_overfill_test(self, factory): + """ + Assert that if the value given by ``nbytes`` is greater than the actual + size of the output buffer passed to :py:obj:`Connection.recv_into`, the + behavior is as if no value was given for ``nbytes`` at all. + """ + output_buffer = factory(5) + + server, client = self._loopback() + server.send(b('abcdefghij')) + + self.assertEqual(client.recv_into(output_buffer, 50), 5) + self.assertEqual(output_buffer, bytearray(b('abcde'))) + rest = client.recv(5) + self.assertEqual(b('fghij'), rest) + + + def test_bytearray_really_doesnt_overfill(self): + """ + When called with a ``bytearray`` instance and an ``nbytes`` value that + is too large, :py:obj:`Connection.recv_into` respects the size of the + array and not the ``nbytes`` value and doesn't write more bytes into + the buffer than will fit. + """ + self._doesnt_overfill_test(bytearray) + + + try: + memoryview + except NameError: + "cannot test recv_into memoryview without memoryview" + else: + def test_memoryview_no_length(self): + """ + :py:obj:`Connection.recv_into` can be passed a ``memoryview`` + instance and data in the receive buffer is written to it. + """ + self._no_length_test(_make_memoryview) + + + def test_memoryview_respects_length(self): + """ + When called with a ``memoryview`` instance, + :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter + and doesn't copy more than that number of bytes in. + """ + self._respects_length_test(_make_memoryview) + + + def test_memoryview_doesnt_overfill(self): + """ + When called with a ``memoryview`` instance, + :py:obj:`Connection.recv_into` respects the size of the array and + doesn't write more bytes into it than will fit. + """ + self._doesnt_overfill_test(_make_memoryview) + + + def test_memoryview_really_doesnt_overfill(self): + """ + When called with a ``memoryview`` instance and an ``nbytes`` value + that is too large, :py:obj:`Connection.recv_into` respects the size + of the array and not the ``nbytes`` value and doesn't write more + bytes into the buffer than will fit. + """ + self._doesnt_overfill_test(_make_memoryview) + + + class ConnectionSendallTests(TestCase, _LoopbackMixin): """ Tests for :py:obj:`Connection.sendall`. @@ -2244,6 +3038,25 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin): self.assertEquals(client.recv(1), b('x')) + def test_text(self): + """ + :py:obj:`Connection.sendall` transmits all the content in the string + passed to it raising a DeprecationWarning in case of this being a text. + """ + server, client = self._loopback() + with catch_warnings(record=True) as w: + simplefilter("always") + server.sendall(b"x".decode("ascii")) + self.assertEqual( + "{0} for buf is no longer accepted, use bytes".format( + WARNING_TYPE_EXPECTED + ), + str(w[-1].message) + ) + self.assertIs(w[-1].category, DeprecationWarning) + self.assertEquals(client.recv(1), b"x") + + try: memoryview except NameError: diff --git a/OpenSSL/test/test_util.py b/OpenSSL/test/test_util.py new file mode 100644 index 0000000..8d92a3c --- /dev/null +++ b/OpenSSL/test/test_util.py @@ -0,0 +1,17 @@ +from OpenSSL._util import exception_from_error_queue, lib +from OpenSSL.test.util import TestCase + + + +class ErrorTests(TestCase): + """ + Tests for handling of certain OpenSSL error cases. + """ + def test_exception_from_error_queue_nonexistent_reason(self): + """ + :py:func:`exception_from_error_queue` raises ``ValueError`` when it + encounters an OpenSSL error code which does not have a reason string. + """ + lib.ERR_put_error(lib.ERR_LIB_EVP, 0, 1112, b"", 10) + exc = self.assertRaises(ValueError, exception_from_error_queue, ValueError) + self.assertEqual(exc.args[0][0][2], "") diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py index 21bbdc4..78b4a3f 100644 --- a/OpenSSL/test/util.py +++ b/OpenSSL/test/util.py @@ -7,16 +7,20 @@ Helpers for the OpenSSL test suite, largely copied from U{Twisted<http://twistedmatrix.com/>}. """ +import os import shutil +import sys import traceback -import os, os.path -from tempfile import mktemp + +from tempfile import mktemp, mkdtemp from unittest import TestCase -import sys + +from six import PY3 from OpenSSL._util import exception_from_error_queue from OpenSSL.crypto import Error + try: import memdbg except Exception: @@ -25,10 +29,17 @@ except Exception: from OpenSSL._util import ffi, lib, byte_string as b + + +# This is the UTF-8 encoding of the SNOWMAN unicode code point. +NON_ASCII = b("\xe2\x98\x83").decode("utf-8") + + + class TestCase(TestCase): """ - :py:class:`TestCase` adds useful testing functionality beyond what is available - from the standard library :py:class:`unittest.TestCase`. + :py:class:`TestCase` adds useful testing functionality beyond what is + available from the standard library :py:class:`unittest.TestCase`. """ def run(self, result): run = super(TestCase, self).run @@ -150,24 +161,38 @@ class TestCase(TestCase): (None, Exception(stack % (allocs_report,)), None)) + _tmpdir = None + + + @property + def tmpdir(self): + """ + On demand create a temporary directory. + """ + if self._tmpdir is not None: + return self._tmpdir + + self._tmpdir = mkdtemp(dir=".") + return self._tmpdir + + def tearDown(self): """ - Clean up any files or directories created using :py:meth:`TestCase.mktemp`. - Subclasses must invoke this method if they override it or the - cleanup will not occur. + Clean up any files or directories created using + :py:meth:`TestCase.mktemp`. Subclasses must invoke this method if they + override it or the cleanup will not occur. """ - if False and self._temporaryFiles is not None: - for temp in self._temporaryFiles: - if os.path.isdir(temp): - shutil.rmtree(temp) - elif os.path.exists(temp): - os.unlink(temp) + if self._tmpdir is not None: + shutil.rmtree(self._tmpdir) + try: exception_from_error_queue(Error) except Error: e = sys.exc_info()[1] if e.args != ([],): - self.fail("Left over errors in OpenSSL error queue: " + repr(e)) + self.fail( + "Left over errors in OpenSSL error queue: " + repr(e) + ) def assertIsInstance(self, instance, classOrTuple, message=None): @@ -227,7 +252,7 @@ class TestCase(TestCase): failIfIn = assertNotIn - def failUnlessIdentical(self, first, second, msg=None): + def assertIs(self, first, second, msg=None): """ Fail the test if :py:data:`first` is not :py:data:`second`. This is an obect-identity-equality test, not an object equality @@ -239,10 +264,10 @@ class TestCase(TestCase): if first is not second: raise self.failureException(msg or '%r is not %r' % (first, second)) return first - assertIdentical = failUnlessIdentical + assertIdentical = failUnlessIdentical = assertIs - def failIfIdentical(self, first, second, msg=None): + def assertIsNot(self, first, second, msg=None): """ Fail the test if :py:data:`first` is :py:data:`second`. This is an obect-identity-equality test, not an object equality @@ -254,7 +279,7 @@ class TestCase(TestCase): if first is second: raise self.failureException(msg or '%r is %r' % (first, second)) return first - assertNotIdentical = failIfIdentical + assertNotIdentical = failIfIdentical = assertIsNot def failUnlessRaises(self, exception, f, *args, **kwargs): @@ -288,16 +313,13 @@ class TestCase(TestCase): assertRaises = failUnlessRaises - _temporaryFiles = None def mktemp(self): """ - Pathetic substitute for twisted.trial.unittest.TestCase.mktemp. + Return UTF-8-encoded bytes of a path to a tmp file. + + The file will be cleaned up after the test run. """ - if self._temporaryFiles is None: - self._temporaryFiles = [] - temp = b(mktemp(dir=".")) - self._temporaryFiles.append(temp) - return temp + return mktemp(dir=self.tmpdir).encode("utf-8") # Other stuff @@ -447,3 +469,10 @@ class EqualityTestsMixin(object): a = self.anInstance() b = Delegate() self.assertEqual(a != b, [b]) + + +# The type name expected in warnings about using the wrong string type. +if PY3: + WARNING_TYPE_EXPECTED = "str" +else: + WARNING_TYPE_EXPECTED = "unicode" diff --git a/OpenSSL/version.py b/OpenSSL/version.py index 307dba0..eb3b736 100644 --- a/OpenSSL/version.py +++ b/OpenSSL/version.py @@ -6,4 +6,4 @@ pyOpenSSL - A simple wrapper around the OpenSSL library """ -__version__ = '0.14' +__version__ = '0.15.1' @@ -1,14 +1,21 @@ pyOpenSSL - A Python wrapper around the OpenSSL library ------------------------------------------------------------------------------- +------------------------------------------------------- -See the file INSTALL for installation instructions. +.. image:: https://coveralls.io/repos/pyca/pyopenssl/badge.svg + :target: https://coveralls.io/r/pyca/pyopenssl + +.. image:: https://readthedocs.org/projects/pyopenssl/badge/?version=latest + :target: https://pyopenssl.readthedocs.org/ + :alt: Latest Docs + +.. image:: https://travis-ci.org/pyca/pyopenssl.svg?branch=master + :target: https://travis-ci.org/pyca/pyopenssl + +See the file INSTALL.rst for installation instructions. See https://github.com/pyca/pyopenssl for development. See https://pyopenssl.readthedocs.org for documentation. See https://mail.python.org/mailman/listinfo/pyopenssl-users for the discussion mailing list. - -.. image:: https://coveralls.io/repos/pyca/pyopenssl/badge.png - :target: https://coveralls.io/r/pyca/pyopenssl diff --git a/doc/api/crypto.rst b/doc/api/crypto.rst index b360e89..57a60f3 100644 --- a/doc/api/crypto.rst +++ b/doc/api/crypto.rst @@ -42,7 +42,17 @@ .. py:data:: X509StoreType - A Python type object representing the X509Store object type. + See :py:class:`X509Store` + + +.. py:data X509Store + + A class representing the X.509 store. + + +.. py:data:: X509StoreContext + + A class representing the X.509 store context. .. py:data:: PKeyType @@ -257,7 +267,7 @@ X509 objects have the following methods: Return the signature algorithm used in the certificate. If the algorithm is undefined, raise :py:data:`ValueError`. - ..versionadded:: 0.13 + .. versionadded:: 0.13 .. py:method:: X509.get_subject() @@ -526,6 +536,36 @@ The X509Store object has currently just one method: Add the certificate *cert* to the certificate store. +X509StoreContextError objects +----------------------------- + +The X509StoreContextError is an exception raised from +`X509StoreContext.verify_certificate` in circumstances where a certificate +cannot be verified in a provided context. + +The certificate for which the verification error was detected is given by the +``certificate`` attribute of the exception instance as a :class:`X509` +instance. + +Details about the verification error are given in the exception's ``args`` attribute. + + +X509StoreContext objects +------------------------ + +The X509StoreContext object is used for verifying a certificate against a set +of trusted certificates. + + +.. py:method:: X509StoreContext.verify_certificate() + + Verify a certificate in the context of this initialized `X509StoreContext`. + On error, raises `X509StoreContextError`, otherwise does nothing. + + .. versionadded:: 0.15 + + + .. _openssl-pkey: PKey objects @@ -724,10 +764,11 @@ CRL objects have the following methods: Add a Revoked object to the CRL, by value not reference. -.. py:method:: CRL.export(cert, key[, type=FILETYPE_PEM][, days=100]) +.. py:method:: CRL.export(cert, key[, type=FILETYPE_PEM][, days=100][, digest=b'md5']) Use *cert* and *key* to sign the CRL and return the CRL as a string. *days* is the number of days before the next CRL is due. + *digest* is the algorithm that will be used to sign CRL. .. py:method:: CRL.get_revoked() diff --git a/doc/api/ssl.rst b/doc/api/ssl.rst index a75af1f..2929305 100644 --- a/doc/api/ssl.rst +++ b/doc/api/ssl.rst @@ -472,6 +472,53 @@ Context objects have the following methods: .. versionadded:: 0.13 +.. py:method:: Context.set_npn_advertise_callback(callback) + + Specify a callback function that will be called when offering `Next + Protocol Negotiation + <https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server. + + *callback* should be the callback function. It will be invoked with one + argument, the :py:class:`Connection` instance. It should return a list of + bytestrings representing the advertised protocols, like + ``[b'http/1.1', b'spdy/2']``. + + .. versionadded:: 0.15 + + +.. py:method:: Context.set_npn_select_callback(callback): + + Specify a callback function that will be called when a server offers Next + Protocol Negotiation options. + + *callback* should be the callback function. It will be invoked with two + arguments: the :py:class:`Connection`, and a list of offered protocols as + bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return one of + those bytestrings, the chosen protocol. + + .. versionadded:: 0.15 + +.. py:method:: Context.set_alpn_protos(protos) + + Specify the protocols that the client is prepared to speak after the TLS + connection has been negotiated using Application Layer Protocol + Negotiation. + + *protos* should be a list of protocols that the client is offering, each + as a bytestring. For example, ``[b'http/1.1', b'spdy/2']``. + + +.. py:method:: Context.set_alpn_select_callback(callback) + + Specify a callback function that will be called on the server when a client + offers protocols using Application Layer Protocol Negotiation. + + *callback* should be the callback function. It will be invoked with two + arguments: the :py:class:`Connection` and a list of offered protocols as + bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return one of + these bytestrings, the chosen protocol. + + .. _openssl-session: Session objects @@ -614,6 +661,14 @@ Connection objects have the following methods: by *bufsize*. +.. py:method:: Connection.recv_into(buffer[, nbytes[, flags]]) + + Receive data from the Connection and copy it directly into the provided + buffer. The return value is the number of bytes read from the connection. + The maximum amount of data to be received at once is specified by *nbytes*. + *flags* is accepted for compatibility with ``socket.recv_into`` but its + value is ignored. + .. py:method:: Connection.bio_write(bytes) If the Connection was created with a memory BIO, this method can be used to add @@ -806,6 +861,31 @@ Connection objects have the following methods: .. versionadded:: 0.15 +.. py:method:: Connection.get_next_proto_negotiated(): + + Get the protocol that was negotiated by Next Protocol Negotiation. Returns + a bytestring of the protocol name. If no protocol has been negotiated yet, + returns an empty string. + + .. versionadded:: 0.15 + +.. py:method:: Connection.set_alpn_protos(protos) + + Specify the protocols that the client is prepared to speak after the TLS + connection has been negotiated using Application Layer Protocol + Negotiation. + + *protos* should be a list of protocols that the client is offering, each + as a bytestring. For example, ``[b'http/1.1', b'spdy/2']``. + + +.. py:method:: Connection.get_alpn_proto_negotiated() + + Get the protocol that was negotiated by Application Layer Protocol + Negotiation. Returns a bytestring of the protocol name. If no protocol has + been negotiated yet, returns an empty string. + + .. Rubric:: Footnotes .. [#connection-context-socket] Actually, all that is required is an object that diff --git a/doc/conf.py b/doc/conf.py index d9c9a67..b13925f 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -51,9 +51,9 @@ copyright = u'2011, Jean-Paul Calderone' # built documents. # # The short X.Y version. -version = '0.14' +version = '0.15.1' # The full version, including alpha/beta/rc tags. -release = '0.14' +release = version # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -123,7 +123,7 @@ html_theme = 'default' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +# html_static_path = ['_static'] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. @@ -1,17 +1,42 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (C) Jean-Paul Calderone 2008-2014, All rights reserved +# Copyright (C) Jean-Paul Calderone 2008-2015, All rights reserved # """ Installation script for the OpenSSL module """ +import sys + from setuptools import setup +from setuptools.command.test import test as TestCommand + # XXX Deduplicate this -__version__ = '0.14' +__version__ = '0.15.1' + + +class PyTest(TestCommand): + user_options = [("pytest-args=", "a", "Arguments to pass to py.test")] + + def initialize_options(self): + TestCommand.initialize_options(self) + self.pytest_args = None + + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + # import here, cause outside the eggs aren't loaded + import pytest + errno = pytest.main(self.pytest_args or [] + + ["OpenSSL"]) + sys.exit(errno) + setup(name='pyOpenSSL', version=__version__, packages = ['OpenSSL'], @@ -26,7 +51,9 @@ setup(name='pyOpenSSL', version=__version__, 'OpenSSL.test.util', 'OpenSSL.test.test_crypto', 'OpenSSL.test.test_rand', - 'OpenSSL.test.test_ssl'], + 'OpenSSL.test.test_ssl', + 'OpenSSL.test.test_tsafe', + 'OpenSSL.test.test_util',], description = 'Python wrapper module around the OpenSSL library', author = 'Jean-Paul Calderone', author_email = 'exarkun@twistedmatrix.com', @@ -34,7 +61,7 @@ setup(name='pyOpenSSL', version=__version__, maintainer_email = 'exarkun@twistedmatrix.com', url = 'https://github.com/pyca/pyopenssl', license = 'APL2', - install_requires=["cryptography>=0.4", "six>=1.5.2"], + install_requires=["cryptography>=0.7", "six>=1.5.2"], long_description = """\ High-level wrapper around a subset of the OpenSSL library, includes * SSL.Connection objects, wrapping the methods of Python's portable @@ -72,4 +99,10 @@ High-level wrapper around a subset of the OpenSSL library, includes 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Networking', ], - test_suite="OpenSSL") + test_suite="OpenSSL", + tests_require=[ + "pytest", + ], + cmdclass={ + "test": PyTest, + }) @@ -1,13 +1,18 @@ [tox] -envlist = pypy,py26,py27,py32,py33,meta +envlist = pypy,py26,py27,py32,py33,py34,meta [testenv] +deps = + setuptools>=7.0 # older setuptools pollute CWD with egg files of dependencies + coverage setenv = # Do not allowed the executing environment to pollute the test environment # with extra packages. PYTHONPATH= -# The standard library unittest module can run tests on Python 2.7 and newer -commands = python setup.py test +commands = + python -c "import OpenSSL.SSL; print(OpenSSL.SSL.SSLeay_version(OpenSSL.SSL.SSLEAY_VERSION))" + coverage run --branch --source=OpenSSL setup.py test + coverage report -m [testenv:meta] deps = @@ -23,4 +28,3 @@ commands = # return 0, causing tox to believe it failed. # https://bitbucket.org/regebro/pyroma/issue/18 sh -c <<EOT {envbindir}/pyroma . || (( $? == 1 )) EOT -whitelist_externals = sh
\ No newline at end of file |