summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@twistedmatrix.com>2015-04-13 20:31:07 -0400
committerJean-Paul Calderone <exarkun@twistedmatrix.com>2015-04-13 20:31:07 -0400
commit17eca487a1e8a6b15cefdf6d61022cddbebde428 (patch)
treefce1fd9090722c3fec64e47cebfe450a7fe83d29
parent8dc37a1ba6276d6a690ed2b21fdb665d6d97ca74 (diff)
parent4ca37f747d9dcc21d6f85655895e1e997937a670 (diff)
downloadpyopenssl-17eca487a1e8a6b15cefdf6d61022cddbebde428.tar.gz
merge master, resolve simple conflicts
-rw-r--r--.travis.yml76
-rw-r--r--ChangeLog30
-rw-r--r--OpenSSL/RATIONALE61
-rw-r--r--OpenSSL/SSL.py280
-rw-r--r--OpenSSL/_util.py49
-rw-r--r--OpenSSL/crypto.py123
-rw-r--r--OpenSSL/rand.py20
-rw-r--r--OpenSSL/test/test_crypto.py172
-rw-r--r--OpenSSL/test/test_rand.py48
-rw-r--r--OpenSSL/test/test_ssl.py580
-rw-r--r--OpenSSL/test/test_util.py17
-rw-r--r--OpenSSL/test/util.py5
-rw-r--r--doc/api/crypto.rst44
-rw-r--r--doc/api/ssl.rst44
14 files changed, 1331 insertions, 218 deletions
diff --git a/.travis.yml b/.travis.yml
index f359de1..eda1242 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,18 +1,23 @@
+sudo: false
language: python
-os:
- - linux
-
-python:
- - "pypy"
- - "2.6"
- - "2.7"
- - "3.2"
- - "3.3"
- - "3.4"
-
matrix:
include:
+ - language: generic
+ os: osx
+ env: TOXENV=py27
+ - python: "2.6" # these are just to make travis's UI a bit prettier
+ env: TOXENV=py26
+ - python: "2.7"
+ env: TOXENV=py27
+ - python: "3.2"
+ env: TOXENV=py32
+ - python: "3.3"
+ env: TOXENV=py33
+ - python: "3.4"
+ env: TOXENV=py34
+ - python: "pypy"
+ env: TOXENV=pypy
# Also run the tests against cryptography master.
- python: "2.6"
env:
@@ -37,10 +42,21 @@ matrix:
- python: "2.7"
env:
OPENSSL=0.9.8
+ addons:
+ apt:
+ sources:
+ - lucid
+ packages:
+ - libssl-dev/lucid
# Let the cryptography master builds fail because they might be triggered by
# cryptography changes beyond our control.
+ # Also allow OS X and 0.9.8 to fail at the moment while we fix these new
+ # build configurations.
allow_failures:
+ - language: generic
+ os: osx
+ env: TOXENV=py27
- env:
CRYPTOGRAPHY_GIT_MASTER=true
- env:
@@ -50,34 +66,40 @@ before_install:
- if [ -n "$CRYPTOGRAPHY_GIT_MASTER" ]; then pip install git+https://github.com/pyca/cryptography.git;fi
install:
- # Install the wheel library explicitly here. It is not really a setup
- # dependency. It is not an install dependency. It is only a dependency for
- # the script directive below - because we want to exercise wheel building on
- # travis.
- - pip install wheel
-
- # Also install some tools for measuring code coverage and sending the results
- # to coveralls.
- - pip install coveralls coverage
+ - |
+ if [[ "$(uname -s)" == 'Darwin' ]]; then
+ brew update
+ brew upgrade openssl
+ curl -O https://bootstrap.pypa.io/get-pip.py
+ python get-pip.py --user
+ pip install --user virtualenv
+ else
+ pip install virtualenv
+ fi
+ python -m virtualenv ~/.venv
+ source ~/.venv/bin/activate
+ pip install coveralls coverage wheel
script:
- |
- if [[ "${OPENSSL}" == "0.9.8" ]]; then
- sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ lucid main"
- sudo apt-get -y update
- sudo apt-get install -y --force-yes libssl-dev/lucid
+ if [[ "$(uname -s)" == "Darwin" ]]; then
+ # set our flags to use homebrew openssl
+ export ARCHFLAGS="-arch x86_64"
+ export LDFLAGS="-L/usr/local/opt/openssl/lib"
+ export CFLAGS="-I/usr/local/opt/openssl/include"
fi
- - |
+ source ~/.venv/bin/activate
pip install -e .
- |
+ source ~/.venv/bin/activate
coverage run --branch --source=OpenSSL setup.py bdist_wheel test
- |
+ source ~/.venv/bin/activate
coverage report -m
- - |
python -c "import OpenSSL.SSL; print(OpenSSL.SSL.SSLeay_version(OpenSSL.SSL.SSLEAY_VERSION))"
after_success:
- - coveralls
+ - source ~/.venv/bin/activate && coveralls
notifications:
email: false
diff --git a/ChangeLog b/ChangeLog
index 482bae4..95c891f 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,17 +1,39 @@
-2014-12-11 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+2015-04-12 Jean-Paul Calderone <exarkun@twistedmatrix.com>
- * OpenSSL/SSL.py: Fixed a regression ``Context.check_privatekey``
- causing it to always succeed - even if it should fail.
+ * OpenSSL/rand.py, OpenSSL/SSL.py: APIs which previously accepted
+ filenames only as bytes now accept them as either bytes or
+ unicode (and respect sys.getfilesystemencoding()).
+
+2015-03-23 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+
+ * OpenSSL/SSL.py: Add Cory Benfield's next-protocol-negotiation
+ (NPN) bindings.
+
+2015-03-15 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+
+ * OpenSSL/SSL.py: Add ``Connection.recv_into``, mirroring the
+ builtin ``socket.recv_into``. Based on work from Cory Benfield.
+ * OpenSSL/test/test_ssl.py: Add tests for ``recv_into``.
+
+2015-01-30 Stephen Holsapple <sholsapp@gmail.com>
+
+ * OpenSSL/crypto.py: Expose ``X509StoreContext`` for verifying certificates.
+ * OpenSSL/test/test_crypto.py: Add intermediate certificates for
2015-01-08 Paul Aurich <paul@darkrain42.org>
* OpenSSL/SSL.py: ``Connection.shutdown`` now propagates errors from the
underlying socket.
+2014-12-11 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+
+ * OpenSSL/SSL.py: Fixed a regression ``Context.check_privatekey``
+ causing it to always succeed - even if it should fail.
+
2014-08-21 Alex Gaynor <alex.gaynor@gmail.com>
* OpenSSL/crypto.py: Fixed a regression where calling ``load_pkcs7_data``
- with ``FILETYPE_ASN1`` would fail with a ``NameError.
+ with ``FILETYPE_ASN1`` would fail with a ``NameError``.
2014-05-05 Jean-Paul Calderone <exarkun@twistedmatrix.com>
diff --git a/OpenSSL/RATIONALE b/OpenSSL/RATIONALE
deleted file mode 100644
index 074422c..0000000
--- a/OpenSSL/RATIONALE
+++ /dev/null
@@ -1,61 +0,0 @@
- RATIONALE
-
-The reason this module exists at all is that the SSL support in the socket
-module in the Python 2.1 distribution (which is what we used, of course I
-cannot speak for later versions) is severely limited.
-
-<FIXME> Update this list whenever needed! The communications module isn't
-written yet, so we don't know exactly how this'll work! </FIXME>
-This is a list of things we need from an OpenSSL module:
- + Context objects (in OpenSSL called SSL_CTX) that can be manipulated from
- Python modules. They must support a number of operations:
- - Loading certificates from file and memory, both the client
- certificate and the certificates used for the verification chain.
- - Loading private keys from file and memory.
- - Setting the verification mode (basically VERIFY_NONE and
- VERIFY_PEER).
- - Callbacks mechanism for prompting for pass phrases and verifying
- certificates. The callbacks have to work under a multi-threaded
- environment (see the comment in ssl/context.c). Of course the
- callbacks will have to be written in Python!
- + The Connection objects (in OpenSSL called SSL) have to support a few
- things:
- - Renegotiation, this is really important, especially for connections
- that are up and running for a long time, since renegotiation
- generates new encryption keys.
- - Server-side SSL must work! As far as I know this doesn't work in
- the SSL support of the socket module as of Python 2.1.
- - Wrapping the methods of the underlying transport object is nice, so
- you don't have to keep track of more than one object per connection.
- This could of course be done a lot better than the way it works now,
- so more transport layers than sockets are possible!
- + A well-organized error system that mimics OpenSSL's error system is
- desirable. Specifically there has to be a way to find out wether the
- operation was successful, or if it failed, why it failed, so some sort
- of interface to OpenSSL's error queue mechanism is needed.
- + Certificate objects (X509) and certificate name objects (X509_NAME) are
- needed, especially for verification purposes. Certificates will
- probably also be generated by the server which is another reason for
- them to exist. The same thing goes for key objects (EVP_PKEY)
- + Since this is an OpenSSL module, there has to be an interface to the
- OpenSSL PRNG, so it can be seeded in a good way.
-
-When asking about SSL on the comp.lang.python newsgroup (or on
-python-list@python.org) people usually pointed you to the M2Crypto package.
-The M2Crypto.SSL module does implement a lot of OpenSSL's functionality but
-unfortunately its error handling system does not seem to be finished,
-especially for non-blocking I/O. I think that much of the reason for this
-is that M2Crypto is developed using SWIG. This makes it awkward to create
-functions that e.g. can return both an integer and NULL since (as far as I
-know) you basically write C functions and SWIG makes wrapper functions that
-parses the Python argument list and calls your C function, and finally
-transforms your return value to a Python object.
-
-Finally, a good book on the topic of SSL (that I read and learned a lot
-from) is "SSL and TLS - Designing and Building Secure Systems" (ISBN
-0201615983) by Eric Rescorla. A good mailinglist to subscribe to is the
-openssl-users@openssl.org list.
-
-This comment was written July 2001, discussing Python 2.1. Feel free to
-modify it as the SSL support in the socket module changes.
-
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index 0e645ef..36ef14a 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -1,18 +1,21 @@
from sys import platform
from functools import wraps, partial
-from itertools import count
+from itertools import count, chain
from weakref import WeakValueDictionary
from errno import errorcode
from six import text_type as _text_type
from six import integer_types as integer_types
+from six import int2byte, indexbytes
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
exception_from_error_queue as _exception_from_error_queue,
native as _native,
- warn_text as _warn_text)
+ warn_text as _warn_text,
+ path_string as _path_string,
+)
from OpenSSL.crypto import (
FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)
@@ -165,10 +168,41 @@ class SysCallError(Error):
pass
+class _CallbackExceptionHelper(object):
+ """
+ A base class for wrapper classes that allow for intelligent exception
+ handling in OpenSSL callbacks.
+
+ :ivar list _problems: Any exceptions that occurred while executing in a
+ context where they could not be raised in the normal way. Typically
+ this is because OpenSSL has called into some Python code and requires a
+ return value. The exceptions are saved to be raised later when it is
+ possible to do so.
+ """
+ def __init__(self):
+ self._problems = []
-class _VerifyHelper(object):
+
+ def raise_if_problem(self):
+ """
+ Raise an exception from the OpenSSL error queue or that was previously
+ captured whe running a callback.
+ """
+ if self._problems:
+ try:
+ _raise_current_error()
+ except Error:
+ pass
+ raise self._problems.pop(0)
+
+
+class _VerifyHelper(_CallbackExceptionHelper):
+ """
+ Wrap a callback such that it can be used as a certificate verification
+ callback.
+ """
def __init__(self, callback):
- self._problems = []
+ _CallbackExceptionHelper.__init__(self)
@wraps(callback)
def wrapper(ok, store_ctx):
@@ -197,14 +231,92 @@ class _VerifyHelper(object):
"int (*)(int, X509_STORE_CTX *)", wrapper)
- def raise_if_problem(self):
- if self._problems:
+class _NpnAdvertiseHelper(_CallbackExceptionHelper):
+ """
+ Wrap a callback such that it can be used as an NPN advertisement callback.
+ """
+ def __init__(self, callback):
+ _CallbackExceptionHelper.__init__(self)
+
+ @wraps(callback)
+ def wrapper(ssl, out, outlen, arg):
try:
- _raise_current_error()
- except Error:
- pass
- raise self._problems.pop(0)
+ conn = Connection._reverse_mapping[ssl]
+ protos = callback(conn)
+
+ # Join the protocols into a Python bytestring, length-prefixing
+ # each element.
+ protostr = b''.join(
+ chain.from_iterable((int2byte(len(p)), p) for p in protos)
+ )
+
+ # Save our callback arguments on the connection object. This is
+ # done to make sure that they don't get freed before OpenSSL
+ # uses them. Then, return them appropriately in the output
+ # parameters.
+ conn._npn_advertise_callback_args = [
+ _ffi.new("unsigned int *", len(protostr)),
+ _ffi.new("unsigned char[]", protostr),
+ ]
+ outlen[0] = conn._npn_advertise_callback_args[0][0]
+ out[0] = conn._npn_advertise_callback_args[1]
+ return 0
+ except Exception as e:
+ self._problems.append(e)
+ return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
+
+ self.callback = _ffi.callback(
+ "int (*)(SSL *, const unsigned char **, unsigned int *, void *)",
+ wrapper
+ )
+
+
+class _NpnSelectHelper(_CallbackExceptionHelper):
+ """
+ Wrap a callback such that it can be used as an NPN selection callback.
+ """
+ def __init__(self, callback):
+ _CallbackExceptionHelper.__init__(self)
+
+ @wraps(callback)
+ def wrapper(ssl, out, outlen, in_, inlen, arg):
+ try:
+ conn = Connection._reverse_mapping[ssl]
+
+ # The string passed to us is actually made up of multiple
+ # length-prefixed bytestrings. We need to split that into a
+ # list.
+ instr = _ffi.buffer(in_, inlen)[:]
+ protolist = []
+ while instr:
+ l = indexbytes(instr, 0)
+ proto = instr[1:l+1]
+ protolist.append(proto)
+ instr = instr[l+1:]
+
+ # Call the callback
+ outstr = callback(conn, protolist)
+
+ # Save our callback arguments on the connection object. This is
+ # done to make sure that they don't get freed before OpenSSL
+ # uses them. Then, return them appropriately in the output
+ # parameters.
+ conn._npn_select_callback_args = [
+ _ffi.new("unsigned char *", len(outstr)),
+ _ffi.new("unsigned char[]", outstr),
+ ]
+ outlen[0] = conn._npn_select_callback_args[0][0]
+ out[0] = conn._npn_select_callback_args[1]
+ return 0
+ except Exception as e:
+ self._problems.append(e)
+ return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
+ self.callback = _ffi.callback(
+ "int (*)(SSL *, unsigned char **, unsigned char *, "
+ "const unsigned char *, unsigned int, void *)",
+ wrapper
+ )
def _asFileDescriptor(obj):
@@ -294,6 +406,10 @@ class Context(object):
self._info_callback = None
self._tlsext_servername_callback = None
self._app_data = None
+ self._npn_advertise_helper = None
+ self._npn_advertise_callback = None
+ self._npn_select_helper = None
+ self._npn_select_callback = None
# SSL_CTX_set_app_data(self->ctx, self);
# SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |
@@ -307,19 +423,22 @@ class Context(object):
Let SSL know where we can find trusted certificates for the certificate
chain
- :param cafile: In which file we can find the certificates
+ :param cafile: In which file we can find the certificates (``bytes`` or
+ ``unicode``).
:param capath: In which directory we can find the certificates
+ (``bytes`` or ``unicode``).
+
:return: None
"""
if cafile is None:
cafile = _ffi.NULL
- elif not isinstance(cafile, bytes):
- raise TypeError("cafile must be None or a byte string")
+ else:
+ cafile = _path_string(cafile)
if capath is None:
capath = _ffi.NULL
- elif not isinstance(capath, bytes):
- raise TypeError("capath must be None or a byte string")
+ else:
+ capath = _path_string(capath)
load_result = _lib.SSL_CTX_load_verify_locations(self._context, cafile, capath)
if not load_result:
@@ -369,15 +488,12 @@ class Context(object):
"""
Load a certificate chain from a file
- :param certfile: The name of the certificate chain file
+ :param certfile: The name of the certificate chain file (``bytes`` or
+ ``unicode``).
+
:return: None
"""
- if isinstance(certfile, _text_type):
- # Perhaps sys.getfilesystemencoding() could be better?
- certfile = certfile.encode("utf-8")
-
- if not isinstance(certfile, bytes):
- raise TypeError("certfile must be bytes or unicode")
+ certfile = _path_string(certfile)
result = _lib.SSL_CTX_use_certificate_chain_file(self._context, certfile)
if not result:
@@ -388,15 +504,13 @@ class Context(object):
"""
Load a certificate from a file
- :param certfile: The name of the certificate file
+ :param certfile: The name of the certificate file (``bytes`` or
+ ``unicode``).
:param filetype: (optional) The encoding of the file, default is PEM
+
:return: None
"""
- if isinstance(certfile, _text_type):
- # Perhaps sys.getfilesystemencoding() could be better?
- certfile = certfile.encode("utf-8")
- if not isinstance(certfile, bytes):
- raise TypeError("certfile must be bytes or unicode")
+ certfile = _path_string(certfile)
if not isinstance(filetype, integer_types):
raise TypeError("filetype must be an integer")
@@ -450,16 +564,12 @@ class Context(object):
"""
Load a private key from a file
- :param keyfile: The name of the key file
+ :param keyfile: The name of the key file (``bytes`` or ``unicode``)
:param filetype: (optional) The encoding of the file, default is PEM
+
:return: None
"""
- if isinstance(keyfile, _text_type):
- # Perhaps sys.getfilesystemencoding() could be better?
- keyfile = keyfile.encode("utf-8")
-
- if not isinstance(keyfile, bytes):
- raise TypeError("keyfile must be a byte string")
+ keyfile = _path_string(keyfile)
if filetype is _unspecified:
filetype = FILETYPE_PEM
@@ -595,11 +705,12 @@ class Context(object):
"""
Load parameters for Ephemeral Diffie-Hellman
- :param dhfile: The file to load EDH parameters from
+ :param dhfile: The file to load EDH parameters from (``bytes`` or
+ ``unicode``).
+
:return: None
"""
- if not isinstance(dhfile, bytes):
- raise TypeError("dhfile must be a byte string")
+ dhfile = _path_string(dhfile)
bio = _lib.BIO_new_file(dhfile, b"r")
if bio == _ffi.NULL:
@@ -813,6 +924,39 @@ class Context(object):
_lib.SSL_CTX_set_tlsext_servername_callback(
self._context, self._tlsext_servername_callback)
+
+ def set_npn_advertise_callback(self, callback):
+ """
+ Specify a callback function that will be called when offering `Next
+ Protocol Negotiation
+ <https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server.
+
+ :param callback: The callback function. It will be invoked with one
+ argument, the Connection instance. It should return a list of
+ bytestrings representing the advertised protocols, like
+ ``[b'http/1.1', b'spdy/2']``.
+ """
+ self._npn_advertise_helper = _NpnAdvertiseHelper(callback)
+ self._npn_advertise_callback = self._npn_advertise_helper.callback
+ _lib.SSL_CTX_set_next_protos_advertised_cb(
+ self._context, self._npn_advertise_callback, _ffi.NULL)
+
+
+ def set_npn_select_callback(self, callback):
+ """
+ Specify a callback function that will be called when a server offers
+ Next Protocol Negotiation options.
+
+ :param callback: The callback function. It will be invoked with two
+ arguments: the Connection, and a list of offered protocols as
+ bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return
+ one of those bytestrings, the chosen protocol.
+ """
+ self._npn_select_helper = _NpnSelectHelper(callback)
+ self._npn_select_callback = self._npn_select_helper.callback
+ _lib.SSL_CTX_set_next_proto_select_cb(
+ self._context, self._npn_select_callback, _ffi.NULL)
+
ContextType = Context
@@ -837,6 +981,13 @@ class Connection(object):
self._ssl = _ffi.gc(ssl, _lib.SSL_free)
self._context = context
+ # References to strings used for Next Protocol Negotiation. OpenSSL's
+ # header files suggest that these might get copied at some point, but
+ # doesn't specify when, so we store them here to make sure they don't
+ # get freed before OpenSSL uses them.
+ self._npn_advertise_callback_args = None
+ self._npn_select_callback_args = None
+
self._reverse_mapping[self._ssl] = self
if socket is None:
@@ -871,6 +1022,10 @@ class Connection(object):
def _raise_ssl_error(self, ssl, result):
if self._context._verify_helper is not None:
self._context._verify_helper.raise_if_problem()
+ if self._context._npn_advertise_helper is not None:
+ self._context._npn_advertise_helper.raise_if_problem()
+ if self._context._npn_select_helper is not None:
+ self._context._npn_select_helper.raise_if_problem()
error = _lib.SSL_get_error(ssl, result)
if error == _lib.SSL_ERROR_WANT_READ:
@@ -1036,6 +1191,45 @@ class Connection(object):
read = recv
+ def recv_into(self, buffer, nbytes=None, flags=None):
+ """
+ Receive data on the connection and store the data into a buffer rather
+ than creating a new string.
+
+ :param buffer: The buffer to copy into.
+ :param nbytes: (optional) The maximum number of bytes to read into the
+ buffer. If not present, defaults to the size of the buffer. If
+ larger than the size of the buffer, is reduced to the size of the
+ buffer.
+ :param flags: (optional) Included for compatibility with the socket
+ API, the value is ignored.
+ :return: The number of bytes read into the buffer.
+ """
+ if nbytes is None:
+ nbytes = len(buffer)
+ else:
+ nbytes = min(nbytes, len(buffer))
+
+ # We need to create a temporary buffer. This is annoying, it would be
+ # better if we could pass memoryviews straight into the SSL_read call,
+ # but right now we can't. Revisit this if CFFI gets that ability.
+ buf = _ffi.new("char[]", nbytes)
+ result = _lib.SSL_read(self._ssl, buf, nbytes)
+ self._raise_ssl_error(self._ssl, result)
+
+ # This strange line is all to avoid a memory copy. The buffer protocol
+ # should allow us to assign a CFFI buffer to the LHS of this line, but
+ # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
+ # wrap it in a memoryview, except on Python 2.6 which doesn't have a
+ # memoryview type.
+ try:
+ buffer[:result] = memoryview(_ffi.buffer(buf, result))
+ except NameError:
+ buffer[:result] = _ffi.buffer(buf, result)
+
+ return result
+
+
def _handle_bio_errors(self, bio, result):
if _lib.BIO_should_retry(bio):
if _lib.BIO_should_read(bio):
@@ -1560,6 +1754,16 @@ class Connection(object):
version =_ffi.string(_lib.SSL_CIPHER_get_version(cipher))
return version.decode("utf-8")
+ def get_next_proto_negotiated(self):
+ """
+ Get the protocol that was negotiated by NPN.
+ """
+ data = _ffi.new("unsigned char **")
+ data_len = _ffi.new("unsigned int *")
+
+ _lib.SSL_get0_next_proto_negotiated(self._ssl, data, data_len)
+
+ return _ffi.buffer(data[0], data_len[0])[:]
ConnectionType = Connection
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
index 2c12e84..9909250 100644
--- a/OpenSSL/_util.py
+++ b/OpenSSL/_util.py
@@ -1,4 +1,5 @@
from warnings import warn
+import sys
from six import PY3, binary_type, text_type
@@ -7,11 +8,34 @@ binding = Binding()
ffi = binding.ffi
lib = binding.lib
-def exception_from_error_queue(exceptionType):
- def text(charp):
- return native(ffi.string(charp))
+
+
+def text(charp):
+ """
+ Get a native string type representing of the given CFFI ``char*`` object.
+
+ :param charp: A C-style string represented using CFFI.
+
+ :return: :class:`str`
+ """
+ if not charp:
+ return ""
+ return native(ffi.string(charp))
+
+
+
+def exception_from_error_queue(exception_type):
+ """
+ Convert an OpenSSL library failure into a Python exception.
+
+ When a call to the native OpenSSL library fails, this is usually signalled
+ by the return value, and an error code is stored in an error queue
+ associated with the current thread. The err library provides functions to
+ obtain these error codes and textual error messages.
+ """
errors = []
+
while True:
error = lib.ERR_get_error()
if error == 0:
@@ -21,7 +45,7 @@ def exception_from_error_queue(exceptionType):
text(lib.ERR_func_error_string(error)),
text(lib.ERR_reason_error_string(error))))
- raise exceptionType(errors)
+ raise exception_type(errors)
@@ -47,6 +71,23 @@ def native(s):
+def path_string(s):
+ """
+ Convert a Python string to a :py:class:`bytes` string identifying the same
+ path and which can be passed into an OpenSSL API accepting a filename.
+
+ :param s: An instance of :py:class:`bytes` or :py:class:`unicode`.
+
+ :return: An instance of :py:class:`bytes`.
+ """
+ if isinstance(s, binary_type):
+ return s
+ elif isinstance(s, text_type):
+ return s.encode(sys.getfilesystemencoding())
+ else:
+ raise TypeError("Path must be represented as bytes or unicode string")
+
+
if PY3:
def byte_string(s):
return s.encode("charmap")
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 52320f3..6013ca3 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -26,6 +26,7 @@ TYPE_RSA = _lib.EVP_PKEY_RSA
TYPE_DSA = _lib.EVP_PKEY_DSA
+
class Error(Exception):
"""
An error occurred in an `OpenSSL.crypto` API.
@@ -34,6 +35,8 @@ class Error(Exception):
_raise_current_error = partial(_exception_from_error_queue, Error)
+
+
def _untested_error(where):
"""
An OpenSSL API failed somehow. Additionally, the failure which was
@@ -1357,6 +1360,125 @@ class X509Store(object):
X509StoreType = X509Store
+class X509StoreContextError(Exception):
+ """
+ An error occurred while verifying a certificate using
+ `OpenSSL.X509StoreContext.verify_certificate`.
+
+ :ivar certificate: The certificate which caused verificate failure.
+ :type cert: :class:`X509`
+
+ """
+ def __init__(self, message, certificate):
+ super(X509StoreContextError, self).__init__(message)
+ self.certificate = certificate
+
+
+class X509StoreContext(object):
+ """
+ An X.509 store context.
+
+ An :py:class:`X509StoreContext` is used to define some of the criteria for
+ certificate verification. The information encapsulated in this object
+ includes, but is not limited to, a set of trusted certificates,
+ verification parameters, and revoked certificates.
+
+ Of these, only the set of trusted certificates is currently exposed.
+
+ :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this
+ instance. It is dynamically allocated and automatically garbage
+ collected.
+
+ :ivar _store: See the ``store`` ``__init__`` parameter.
+
+ :ivar _cert: See the ``certificate`` ``__init__`` parameter.
+ """
+
+ def __init__(self, store, certificate):
+ """
+ :param X509Store store: The certificates which will be trusted for the
+ purposes of any verifications.
+
+ :param X509 certificate: The certificate to be verified.
+ """
+ store_ctx = _lib.X509_STORE_CTX_new()
+ self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
+ self._store = store
+ self._cert = certificate
+ # Make the store context available for use after instantiating this
+ # class by initializing it now. Per testing, subsequent calls to
+ # :py:meth:`_init` have no adverse affect.
+ self._init()
+
+
+ def _init(self):
+ """
+ Set up the store context for a subsequent verification operation.
+ """
+ ret = _lib.X509_STORE_CTX_init(self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL)
+ if ret <= 0:
+ _raise_current_error()
+
+
+ def _cleanup(self):
+ """
+ Internally cleans up the store context.
+
+ The store context can then be reused with a new call to
+ :py:meth:`_init`.
+ """
+ _lib.X509_STORE_CTX_cleanup(self._store_ctx)
+
+
+ def _exception_from_context(self):
+ """
+ Convert an OpenSSL native context error failure into a Python
+ exception.
+
+ When a call to native OpenSSL X509_verify_cert fails, additonal information
+ about the failure can be obtained from the store context.
+ """
+ errors = [
+ _lib.X509_STORE_CTX_get_error(self._store_ctx),
+ _lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
+ _native(_ffi.string(_lib.X509_verify_cert_error_string(
+ _lib.X509_STORE_CTX_get_error(self._store_ctx)))),
+ ]
+ # A context error should always be associated with a certificate, so we
+ # expect this call to never return :class:`None`.
+ _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
+ _cert = _lib.X509_dup(_x509)
+ pycert = X509.__new__(X509)
+ pycert._x509 = _ffi.gc(_cert, _lib.X509_free)
+ return X509StoreContextError(errors, pycert)
+
+
+ def set_store(self, store):
+ """
+ Set the context's trust store.
+
+ :param X509Store store: The certificates which will be trusted for the
+ purposes of any *future* verifications.
+ """
+ self._store = store
+
+
+ def verify_certificate(self):
+ """
+ Verify a certificate in a context.
+
+ :param store_ctx: The :py:class:`X509StoreContext` to verify.
+ :raises: Error
+ """
+ # Always re-initialize the store context in case
+ # :py:meth:`verify_certificate` is called multiple times.
+ self._init()
+ ret = _lib.X509_verify_cert(self._store_ctx)
+ self._cleanup()
+ if ret <= 0:
+ raise self._exception_from_context()
+
+
def load_certificate(type, buffer):
"""
@@ -2311,7 +2433,6 @@ def verify(cert, signature, data, digest):
_raise_current_error()
-
def load_crl(type, buffer):
"""
Load a certificate revocation list from a buffer
diff --git a/OpenSSL/rand.py b/OpenSSL/rand.py
index e754378..3adf693 100644
--- a/OpenSSL/rand.py
+++ b/OpenSSL/rand.py
@@ -11,7 +11,8 @@ from six import integer_types as _integer_types
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
- exception_from_error_queue as _exception_from_error_queue)
+ exception_from_error_queue as _exception_from_error_queue,
+ path_string as _path_string)
class Error(Exception):
@@ -131,13 +132,13 @@ def load_file(filename, maxbytes=_unspecified):
"""
Seed the PRNG with data from a file
- :param filename: The file to read data from
- :param maxbytes: (optional) The number of bytes to read, default is
- to read the entire file
+ :param filename: The file to read data from (``bytes`` or ``unicode``).
+ :param maxbytes: (optional) The number of bytes to read, default is to read
+ the entire file
+
:return: The number of bytes read
"""
- if not isinstance(filename, _builtin_bytes):
- raise TypeError("filename must be a string")
+ filename = _path_string(filename)
if maxbytes is _unspecified:
maxbytes = -1
@@ -152,12 +153,11 @@ def write_file(filename):
"""
Save PRNG state to a file
- :param filename: The file to write data to
+ :param filename: The file to write data to (``bytes`` or ``unicode``).
+
:return: The number of bytes written
"""
- if not isinstance(filename, _builtin_bytes):
- raise TypeError("filename must be a string")
-
+ filename = _path_string(filename)
return _lib.RAND_write_file(filename)
diff --git a/OpenSSL/test/test_crypto.py b/OpenSSL/test/test_crypto.py
index 50da5fe..dea5858 100644
--- a/OpenSSL/test/test_crypto.py
+++ b/OpenSSL/test/test_crypto.py
@@ -19,7 +19,8 @@ from warnings import catch_warnings
from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey, PKeyType
from OpenSSL.crypto import X509, X509Type, X509Name, X509NameType
-from OpenSSL.crypto import X509Store, X509StoreType, X509Req, X509ReqType
+from OpenSSL.crypto import X509Store, X509StoreType, X509StoreContext, X509StoreContextError
+from OpenSSL.crypto import X509Req, X509ReqType
from OpenSSL.crypto import X509Extension, X509ExtensionType
from OpenSSL.crypto import load_certificate, load_privatekey
from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT
@@ -87,6 +88,40 @@ cbvAhow217X9V0dVerEOKxnNYspXRrh36h7k4mQA+sDq
-----END RSA PRIVATE KEY-----
""")
+intermediate_cert_pem = b("""-----BEGIN CERTIFICATE-----
+MIICVzCCAcCgAwIBAgIRAMPzhm6//0Y/g2pmnHR2C4cwDQYJKoZIhvcNAQENBQAw
+WDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAklMMRAwDgYDVQQHEwdDaGljYWdvMRAw
+DgYDVQQKEwdUZXN0aW5nMRgwFgYDVQQDEw9UZXN0aW5nIFJvb3QgQ0EwHhcNMTQw
+ODI4MDIwNDA4WhcNMjQwODI1MDIwNDA4WjBmMRUwEwYDVQQDEwxpbnRlcm1lZGlh
+dGUxDDAKBgNVBAoTA29yZzERMA8GA1UECxMIb3JnLXVuaXQxCzAJBgNVBAYTAlVT
+MQswCQYDVQQIEwJDQTESMBAGA1UEBxMJU2FuIERpZWdvMIGfMA0GCSqGSIb3DQEB
+AQUAA4GNADCBiQKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmK
+FGIbljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT
+21H2qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwID
+AQABoxMwETAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBDQUAA4GBAPIWSkLX
+QRMApOjjyC+tMxumT5e2pMqChHmxobQK4NMdrf2VCx+cRT6EmY8sK3/Xl/X8UBQ+
+9n5zXb1ZwhW/sTWgUvmOceJ4/XVs9FkdWOOn1J0XBch9ZIiFe/s5ASIgG7fUdcUF
+9mAWS6FK2ca3xIh5kIupCXOFa0dPvlw/YUFT
+-----END CERTIFICATE-----
+""")
+
+intermediate_key_pem = b("""-----BEGIN RSA PRIVATE KEY-----
+MIICWwIBAAKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmKFGIb
+ljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT21H2
+qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwIDAQAB
+AoGAfSZVV80pSeOKHTYfbGdNY/jHdU9eFUa/33YWriXU+77EhpIItJjkRRgivIfo
+rhFJpBSGmDLblaqepm8emsXMeH4+2QzOYIf0QGGP6E6scjTt1PLqdqKfVJ1a2REN
+147cujNcmFJb/5VQHHMpaPTgttEjlzuww4+BCDPsVRABWrkCQQD3loH36nLoQTtf
++kQq0T6Bs9/UWkTAGo0ND81ALj0F8Ie1oeZg6RNT96RxZ3aVuFTESTv6/TbjWywO
+wdzlmV1vAkEA38rTJ6PTwaJlw5OttdDzAXGPB9tDmzh9oSi7cHwQQXizYd8MBYx4
+sjHUKD3dCQnb1dxJFhd3BT5HsnkRMbVZXQJAbXduH17ZTzcIOXc9jHDXYiFVZV5D
+52vV0WCbLzVCZc3jMrtSUKa8lPN5EWrdU3UchWybyG0MR5mX8S5lrF4SoQJAIyUD
+DBKaSqpqONCUUx1BTFS9FYrFjzbL4+c1qHCTTPTblt8kUCrDOZjBrKAqeiTmNSum
+/qUot9YUBF8m6BuGsQJATHHmdFy/fG1VLkyBp49CAa8tN3Z5r/CgTznI4DfMTf4C
+NbRHn2UmYlwQBa+L5lg9phewNe8aEwpPyPLoV85U8Q==
+-----END RSA PRIVATE KEY-----
+""")
+
server_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICKDCCAZGgAwIBAgIJAJn/HpR21r/8MA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
@@ -120,6 +155,40 @@ r50+LF74iLXFwqysVCebPKMOpDWp/qQ1BbJQIPs7/A==
-----END RSA PRIVATE KEY-----
"""))
+intermediate_server_cert_pem = b("""-----BEGIN CERTIFICATE-----
+MIICWDCCAcGgAwIBAgIRAPQFY9jfskSihdiNSNdt6GswDQYJKoZIhvcNAQENBQAw
+ZjEVMBMGA1UEAxMMaW50ZXJtZWRpYXRlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT
+CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh
+biBEaWVnbzAeFw0xNDA4MjgwMjEwNDhaFw0yNDA4MjUwMjEwNDhaMG4xHTAbBgNV
+BAMTFGludGVybWVkaWF0ZS1zZXJ2aWNlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT
+CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh
+biBEaWVnbzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAqpJZygd+w1faLOr1
+iOAmbBhx5SZWcTCZ/ZjHQTJM7GuPT624QkqsixFghRKdDROwpwnAP7gMRukLqiy4
++kRuGT5OfyGggL95i2xqA+zehjj08lSTlvGHpePJgCyTavIy5+Ljsj4DKnKyuhxm
+biXTRrH83NDgixVkObTEmh/OVK0CAwEAATANBgkqhkiG9w0BAQ0FAAOBgQBa0Npw
+UkzjaYEo1OUE1sTI6Mm4riTIHMak4/nswKh9hYup//WVOlr/RBSBtZ7Q/BwbjobN
+3bfAtV7eSAqBsfxYXyof7G1ALANQERkq3+oyLP1iVt08W1WOUlIMPhdCF/QuCwy6
+x9MJLhUCGLJPM+O2rAPWVD9wCmvq10ALsiH3yA==
+-----END CERTIFICATE-----
+""")
+
+intermediate_server_key_pem = b("""-----BEGIN RSA PRIVATE KEY-----
+MIICXAIBAAKBgQCqklnKB37DV9os6vWI4CZsGHHlJlZxMJn9mMdBMkzsa49PrbhC
+SqyLEWCFEp0NE7CnCcA/uAxG6QuqLLj6RG4ZPk5/IaCAv3mLbGoD7N6GOPTyVJOW
+8Yel48mALJNq8jLn4uOyPgMqcrK6HGZuJdNGsfzc0OCLFWQ5tMSaH85UrQIDAQAB
+AoGAIQ594j5zna3/9WaPsTgnmhlesVctt4AAx/n827DA4ayyuHFlXUuVhtoWR5Pk
+5ezj9mtYW8DyeCegABnsu2vZni/CdvU6uiS1Hv6qM1GyYDm9KWgovIP9rQCDSGaz
+d57IWVGxx7ODFkm3gN5nxnSBOFVHytuW1J7FBRnEsehRroECQQDXHFOv82JuXDcz
+z3+4c74IEURdOHcbycxlppmK9kFqm5lsUdydnnGW+mvwDk0APOB7Wg7vyFyr393e
+dpmBDCzNAkEAyv6tVbTKUYhSjW+QhabJo896/EqQEYUmtMXxk4cQnKeR/Ao84Rkf
+EqD5IykMUfUI0jJU4DGX+gWZ10a7kNbHYQJAVFCuHNFxS4Cpwo0aqtnzKoZaHY/8
+X9ABZfafSHCtw3Op92M+7ikkrOELXdS9KdKyyqbKJAKNEHF3LbOfB44WIQJAA2N4
+9UNNVUsXRbElEnYUS529CdUczo4QdVgQjkvk5RiPAUwSdBd9Q0xYnFOlFwEmIowg
+ipWJWe0aAlP18ZcEQQJBAL+5lekZ/GUdQoZ4HAsN5a9syrzavJ9VvU1KOOPorPZK
+nMRZbbQgP+aSB7yl6K0gaLaZ8XaK0pjxNBh6ASqg9f4=
+-----END RSA PRIVATE KEY-----
+""")
+
client_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICJjCCAY+gAwIBAgIJAKxpFI5lODkjMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
@@ -3155,6 +3224,107 @@ class CRLTests(TestCase):
+class X509StoreContextTests(TestCase):
+ """
+ Tests for :py:obj:`OpenSSL.crypto.X509StoreContext`.
+ """
+ root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
+ intermediate_cert = load_certificate(FILETYPE_PEM, intermediate_cert_pem)
+ intermediate_server_cert = load_certificate(FILETYPE_PEM, intermediate_server_cert_pem)
+
+ def test_valid(self):
+ """
+ :py:obj:`verify_certificate` returns ``None`` when called with a certificate
+ and valid chain.
+ """
+ store = X509Store()
+ store.add_cert(self.root_cert)
+ store.add_cert(self.intermediate_cert)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ self.assertEqual(store_ctx.verify_certificate(), None)
+
+
+ def test_reuse(self):
+ """
+ :py:obj:`verify_certificate` can be called multiple times with the same
+ ``X509StoreContext`` instance to produce the same result.
+ """
+ store = X509Store()
+ store.add_cert(self.root_cert)
+ store.add_cert(self.intermediate_cert)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ self.assertEqual(store_ctx.verify_certificate(), None)
+ self.assertEqual(store_ctx.verify_certificate(), None)
+
+
+ def test_trusted_self_signed(self):
+ """
+ :py:obj:`verify_certificate` returns ``None`` when called with a self-signed
+ certificate and itself in the chain.
+ """
+ store = X509Store()
+ store.add_cert(self.root_cert)
+ store_ctx = X509StoreContext(store, self.root_cert)
+ self.assertEqual(store_ctx.verify_certificate(), None)
+
+
+ def test_untrusted_self_signed(self):
+ """
+ :py:obj:`verify_certificate` raises error when a self-signed certificate is
+ verified without itself in the chain.
+ """
+ store = X509Store()
+ store_ctx = X509StoreContext(store, self.root_cert)
+ e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
+ self.assertEqual(e.args[0][2], 'self signed certificate')
+ self.assertEqual(e.certificate.get_subject().CN, 'Testing Root CA')
+
+
+ def test_invalid_chain_no_root(self):
+ """
+ :py:obj:`verify_certificate` raises error when a root certificate is missing
+ from the chain.
+ """
+ store = X509Store()
+ store.add_cert(self.intermediate_cert)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
+ self.assertEqual(e.args[0][2], 'unable to get issuer certificate')
+ self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
+
+
+ def test_invalid_chain_no_intermediate(self):
+ """
+ :py:obj:`verify_certificate` raises error when an intermediate certificate is
+ missing from the chain.
+ """
+ store = X509Store()
+ store.add_cert(self.root_cert)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
+ self.assertEqual(e.args[0][2], 'unable to get local issuer certificate')
+ self.assertEqual(e.certificate.get_subject().CN, 'intermediate-service')
+
+
+ def test_modification_pre_verify(self):
+ """
+ :py:obj:`verify_certificate` can use a store context modified after
+ instantiation.
+ """
+ store_bad = X509Store()
+ store_bad.add_cert(self.intermediate_cert)
+ store_good = X509Store()
+ store_good.add_cert(self.root_cert)
+ store_good.add_cert(self.intermediate_cert)
+ store_ctx = X509StoreContext(store_bad, self.intermediate_server_cert)
+ e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
+ self.assertEqual(e.args[0][2], 'unable to get issuer certificate')
+ self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
+ store_ctx.set_store(store_good)
+ self.assertEqual(store_ctx.verify_certificate(), None)
+
+
+
class SignVerifyTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.sign` and :py:obj:`OpenSSL.crypto.verify`.
diff --git a/OpenSSL/test/test_rand.py b/OpenSSL/test/test_rand.py
index c52cb6b..3d5c290 100644
--- a/OpenSSL/test/test_rand.py
+++ b/OpenSSL/test/test_rand.py
@@ -10,7 +10,7 @@ import os
import stat
import sys
-from OpenSSL.test.util import TestCase, b
+from OpenSSL.test.util import NON_ASCII, TestCase, b
from OpenSSL import rand
@@ -176,27 +176,47 @@ class RandTests(TestCase):
self.assertRaises(TypeError, rand.write_file, None)
self.assertRaises(TypeError, rand.write_file, "foo", None)
-
- def test_files(self):
+ def _read_write_test(self, path):
"""
- Test reading and writing of files via rand functions.
+ Verify that ``rand.write_file`` and ``rand.load_file`` can be used.
"""
- # Write random bytes to a file
- tmpfile = self.mktemp()
- # Make sure it exists (so cleanup definitely succeeds)
- fObj = open(tmpfile, 'w')
- fObj.close()
+ # Create the file so cleanup is more straightforward
+ with open(path, "w"):
+ pass
+
try:
- rand.write_file(tmpfile)
+ # Write random bytes to a file
+ rand.write_file(path)
+
# Verify length of written file
- size = os.stat(tmpfile)[stat.ST_SIZE]
+ size = os.stat(path)[stat.ST_SIZE]
self.assertEqual(1024, size)
+
# Read random bytes from file
- rand.load_file(tmpfile)
- rand.load_file(tmpfile, 4) # specify a length
+ rand.load_file(path)
+ rand.load_file(path, 4) # specify a length
finally:
# Cleanup
- os.unlink(tmpfile)
+ os.unlink(path)
+
+
+ def test_bytes_paths(self):
+ """
+ Random data can be saved and loaded to files with paths specified as
+ bytes.
+ """
+ path = self.mktemp()
+ path += NON_ASCII.encode(sys.getfilesystemencoding())
+ self._read_write_test(path)
+
+
+ def test_unicode_paths(self):
+ """
+ Random data can be saved and loaded to files with paths specified as
+ unicode.
+ """
+ path = self.mktemp().decode('utf-8') + NON_ASCII
+ self._read_write_test(path)
if __name__ == '__main__':
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index 0ea52a4..eae895e 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -9,7 +9,7 @@ Unit tests for :py:obj:`OpenSSL.SSL`.
from gc import collect, get_referrers
from errno import ECONNREFUSED, EINPROGRESS, EWOULDBLOCK, EPIPE, ESHUTDOWN
-from sys import platform
+from sys import platform, version_info, getfilesystemencoding
from socket import SHUT_RDWR, error, socket
from os import makedirs
from os.path import join
@@ -25,7 +25,6 @@ from OpenSSL.crypto import dump_privatekey, load_privatekey
from OpenSSL.crypto import dump_certificate, load_certificate
from OpenSSL.crypto import get_elliptic_curves
-from OpenSSL.SSL import _lib
from OpenSSL.SSL import OPENSSL_VERSION_NUMBER, SSLEAY_VERSION, SSLEAY_CFLAGS
from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
@@ -46,7 +45,7 @@ from OpenSSL.SSL import (
from OpenSSL.SSL import (
Context, ContextType, Session, Connection, ConnectionType, SSLeay_version)
-from OpenSSL.test.util import WARNING_TYPE_EXPECTED, TestCase, b
+from OpenSSL.test.util import WARNING_TYPE_EXPECTED, NON_ASCII, TestCase, b
from OpenSSL.test.test_crypto import (
cleartextCertificatePEM, cleartextPrivateKeyPEM)
from OpenSSL.test.test_crypto import (
@@ -98,6 +97,23 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
"""
+def join_bytes_or_unicode(prefix, suffix):
+ """
+ Join two path components of either ``bytes`` or ``unicode``.
+
+ The return type is the same as the type of ``prefix``.
+ """
+ # If the types are the same, nothing special is necessary.
+ if type(prefix) == type(suffix):
+ return join(prefix, suffix)
+
+ # Otherwise, coerce suffix to the type of prefix.
+ if isinstance(prefix, text_type):
+ return join(prefix, suffix.decode(getfilesystemencoding()))
+ else:
+ return join(prefix, suffix.encode(getfilesystemencoding()))
+
+
def verify_cb(conn, cert, errnum, depth, ok):
return ok
@@ -398,23 +414,52 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(Error, ctx.use_privatekey_file, self.mktemp())
+ def _use_privatekey_file_test(self, pemfile, filetype):
+ """
+ Verify that calling ``Context.use_privatekey_file`` with the given
+ arguments does not raise an exception.
+ """
+ key = PKey()
+ key.generate_key(TYPE_RSA, 128)
+
+ with open(pemfile, "wt") as pem:
+ pem.write(
+ dump_privatekey(FILETYPE_PEM, key).decode("ascii")
+ )
+
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_privatekey_file(pemfile, filetype)
+
+
+ def test_use_privatekey_file_bytes(self):
+ """
+ A private key can be specified from a file by passing a ``bytes``
+ instance giving the file name to ``Context.use_privatekey_file``.
+ """
+ self._use_privatekey_file_test(
+ self.mktemp() + NON_ASCII.encode(getfilesystemencoding()),
+ FILETYPE_PEM,
+ )
+
+
+ def test_use_privatekey_file_unicode(self):
+ """
+ A private key can be specified from a file by passing a ``unicode``
+ instance giving the file name to ``Context.use_privatekey_file``.
+ """
+ self._use_privatekey_file_test(
+ self.mktemp().decode(getfilesystemencoding()) + NON_ASCII,
+ FILETYPE_PEM,
+ )
+
+
if not PY3:
def test_use_privatekey_file_long(self):
"""
On Python 2 :py:obj:`Context.use_privatekey_file` accepts a
filetype of type :py:obj:`long` as well as :py:obj:`int`.
"""
- pemfile = self.mktemp()
-
- key = PKey()
- key.generate_key(TYPE_RSA, 128)
-
- with open(pemfile, "wt") as pem:
- pem.write(
- dump_privatekey(FILETYPE_PEM, key).decode("ascii"))
-
- ctx = Context(TLSv1_METHOD)
- ctx.use_privatekey_file(pemfile, long(FILETYPE_PEM))
+ self._use_privatekey_file_test(self.mktemp(), long(FILETYPE_PEM))
def test_use_certificate_wrong_args(self):
@@ -479,21 +524,40 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(Error, ctx.use_certificate_file, self.mktemp())
- def test_use_certificate_file(self):
+ def _use_certificate_file_test(self, certificate_file):
"""
- :py:obj:`Context.use_certificate` sets the certificate which will be
- used to identify connections created using the context.
+ Verify that calling ``Context.use_certificate_file`` with the given
+ filename doesn't raise an exception.
"""
# TODO
# Hard to assert anything. But we could set a privatekey then ask
# OpenSSL if the cert and key agree using check_privatekey. Then as
# long as check_privatekey works right we're good...
- pem_filename = self.mktemp()
- with open(pem_filename, "wb") as pem_file:
+ with open(certificate_file, "wb") as pem_file:
pem_file.write(cleartextCertificatePEM)
ctx = Context(TLSv1_METHOD)
- ctx.use_certificate_file(pem_filename)
+ ctx.use_certificate_file(certificate_file)
+
+
+ def test_use_certificate_file_bytes(self):
+ """
+ :py:obj:`Context.use_certificate_file` sets the certificate (given as a
+ ``bytes`` filename) which will be used to identify connections created
+ using the context.
+ """
+ filename = self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
+ self._use_certificate_file_test(filename)
+
+
+ def test_use_certificate_file_unicode(self):
+ """
+ :py:obj:`Context.use_certificate_file` sets the certificate (given as a
+ ``bytes`` filename) which will be used to identify connections created
+ using the context.
+ """
+ filename = self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
+ self._use_certificate_file_test(filename)
if not PY3:
@@ -907,12 +971,13 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
- def test_load_verify_file(self):
+ def _load_verify_cafile(self, cafile):
"""
- :py:obj:`Context.load_verify_locations` accepts a file name and uses the
- certificates within for verification purposes.
+ Verify that if path to a file containing a certificate is passed to
+ ``Context.load_verify_locations`` for the ``cafile`` parameter, that
+ certificate is used as a trust root for the purposes of verifying
+ connections created using that ``Context``.
"""
- cafile = self.mktemp()
fObj = open(cafile, 'w')
fObj.write(cleartextCertificatePEM.decode('ascii'))
fObj.close()
@@ -920,6 +985,27 @@ class ContextTests(TestCase, _LoopbackMixin):
self._load_verify_locations_test(cafile)
+ def test_load_verify_bytes_cafile(self):
+ """
+ :py:obj:`Context.load_verify_locations` accepts a file name as a
+ ``bytes`` instance and uses the certificates within for verification
+ purposes.
+ """
+ cafile = self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
+ self._load_verify_cafile(cafile)
+
+
+ def test_load_verify_unicode_cafile(self):
+ """
+ :py:obj:`Context.load_verify_locations` accepts a file name as a
+ ``unicode`` instance and uses the certificates within for verification
+ purposes.
+ """
+ self._load_verify_cafile(
+ self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
+ )
+
+
def test_load_verify_invalid_file(self):
"""
:py:obj:`Context.load_verify_locations` raises :py:obj:`Error` when passed a
@@ -930,25 +1016,47 @@ class ContextTests(TestCase, _LoopbackMixin):
Error, clientContext.load_verify_locations, self.mktemp())
- def test_load_verify_directory(self):
+ def _load_verify_directory_locations_capath(self, capath):
"""
- :py:obj:`Context.load_verify_locations` accepts a directory name and uses
- the certificates within for verification purposes.
+ Verify that if path to a directory containing certificate files is
+ passed to ``Context.load_verify_locations`` for the ``capath``
+ parameter, those certificates are used as trust roots for the purposes
+ of verifying connections created using that ``Context``.
"""
- capath = self.mktemp()
makedirs(capath)
# Hash values computed manually with c_rehash to avoid depending on
# c_rehash in the test suite. One is from OpenSSL 0.9.8, the other
# from OpenSSL 1.0.0.
for name in [b'c7adac82.0', b'c3705638.0']:
- cafile = join(capath, name)
- fObj = open(cafile, 'w')
- fObj.write(cleartextCertificatePEM.decode('ascii'))
- fObj.close()
+ cafile = join_bytes_or_unicode(capath, name)
+ with open(cafile, 'w') as fObj:
+ fObj.write(cleartextCertificatePEM.decode('ascii'))
self._load_verify_locations_test(None, capath)
+ def test_load_verify_directory_bytes_capath(self):
+ """
+ :py:obj:`Context.load_verify_locations` accepts a directory name as a
+ ``bytes`` instance and uses the certificates within for verification
+ purposes.
+ """
+ self._load_verify_directory_locations_capath(
+ self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
+ )
+
+
+ def test_load_verify_directory_unicode_capath(self):
+ """
+ :py:obj:`Context.load_verify_locations` accepts a directory name as a
+ ``unicode`` instance and uses the certificates within for verification
+ purposes.
+ """
+ self._load_verify_directory_locations_capath(
+ self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
+ )
+
+
def test_load_verify_locations_wrong_args(self):
"""
:py:obj:`Context.load_verify_locations` raises :py:obj:`TypeError` if called with
@@ -1134,43 +1242,67 @@ class ContextTests(TestCase, _LoopbackMixin):
self._handshake_test(serverContext, clientContext)
- def test_use_certificate_chain_file(self):
+ def _use_certificate_chain_file_test(self, certdir):
"""
- :py:obj:`Context.use_certificate_chain_file` reads a certificate chain from
- the specified file.
+ Verify that :py:obj:`Context.use_certificate_chain_file` reads a
+ certificate chain from a specified file.
- The chain is tested by starting a server with scert and connecting
- to it with a client which trusts cacert and requires verification to
+ The chain is tested by starting a server with scert and connecting to
+ it with a client which trusts cacert and requires verification to
succeed.
"""
chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
+ makedirs(certdir)
+
+ chainFile = join_bytes_or_unicode(certdir, "chain.pem")
+ caFile = join_bytes_or_unicode(certdir, "ca.pem")
+
# Write out the chain file.
- chainFile = self.mktemp()
- fObj = open(chainFile, 'wb')
- # Most specific to least general.
- fObj.write(dump_certificate(FILETYPE_PEM, scert))
- fObj.write(dump_certificate(FILETYPE_PEM, icert))
- fObj.write(dump_certificate(FILETYPE_PEM, cacert))
- fObj.close()
+ with open(chainFile, 'wb') as fObj:
+ # Most specific to least general.
+ fObj.write(dump_certificate(FILETYPE_PEM, scert))
+ fObj.write(dump_certificate(FILETYPE_PEM, icert))
+ fObj.write(dump_certificate(FILETYPE_PEM, cacert))
+
+ with open(caFile, 'w') as fObj:
+ fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii'))
serverContext = Context(TLSv1_METHOD)
serverContext.use_certificate_chain_file(chainFile)
serverContext.use_privatekey(skey)
- fObj = open('ca.pem', 'w')
- fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii'))
- fObj.close()
-
clientContext = Context(TLSv1_METHOD)
clientContext.set_verify(
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
- clientContext.load_verify_locations(b"ca.pem")
+ clientContext.load_verify_locations(caFile)
self._handshake_test(serverContext, clientContext)
+ def test_use_certificate_chain_file_bytes(self):
+ """
+ ``Context.use_certificate_chain_file`` accepts the name of a file (as
+ an instance of ``bytes``) to specify additional certificates to use to
+ construct and verify a trust chain.
+ """
+ self._use_certificate_chain_file_test(
+ self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
+ )
+
+
+ def test_use_certificate_chain_file_unicode(self):
+ """
+ ``Context.use_certificate_chain_file`` accepts the name of a file (as
+ an instance of ``unicode``) to specify additional certificates to use
+ to construct and verify a trust chain.
+ """
+ self._use_certificate_chain_file_test(
+ self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
+ )
+
+
def test_use_certificate_chain_file_wrong_args(self):
"""
:py:obj:`Context.use_certificate_chain_file` raises :py:obj:`TypeError`
@@ -1245,20 +1377,39 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(Error, context.load_tmp_dh, b"hello")
- def test_load_tmp_dh(self):
+ def _load_tmp_dh_test(self, dhfilename):
"""
- :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the
- specified file.
+ Verify that calling ``Context.load_tmp_dh`` with the given filename
+ does not raise an exception.
"""
context = Context(TLSv1_METHOD)
- dhfilename = self.mktemp()
- dhfile = open(dhfilename, "w")
- dhfile.write(dhparam)
- dhfile.close()
+ with open(dhfilename, "w") as dhfile:
+ dhfile.write(dhparam)
+
context.load_tmp_dh(dhfilename)
# XXX What should I assert here? -exarkun
+ def test_load_tmp_dh_bytes(self):
+ """
+ :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the
+ specified file (given as ``bytes``).
+ """
+ self._load_tmp_dh_test(
+ self.mktemp() + NON_ASCII.encode(getfilesystemencoding()),
+ )
+
+
+ def test_load_tmp_dh_unicode(self):
+ """
+ :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the
+ specified file (given as ``unicode``).
+ """
+ self._load_tmp_dh_test(
+ self.mktemp().decode(getfilesystemencoding()) + NON_ASCII,
+ )
+
+
def test_set_tmp_ecdh(self):
"""
:py:obj:`Context.set_tmp_ecdh` sets the elliptic curve for
@@ -1474,6 +1625,165 @@ class ServerNameCallbackTests(TestCase, _LoopbackMixin):
self.assertEqual([(server, b("foo1.example.com"))], args)
+class NextProtoNegotiationTests(TestCase, _LoopbackMixin):
+ """
+ Test for Next Protocol Negotiation in PyOpenSSL.
+ """
+ def test_npn_success(self):
+ """
+ Tests that clients and servers that agree on the negotiated next
+ protocol can correct establish a connection, and that the agreed
+ protocol is reported by the connections.
+ """
+ advertise_args = []
+ select_args = []
+ def advertise(conn):
+ advertise_args.append((conn,))
+ return [b'http/1.1', b'spdy/2']
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b'spdy/2'
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ self._interactInMemory(server, client)
+
+ self.assertEqual([(server,)], advertise_args)
+ self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args)
+
+ self.assertEqual(server.get_next_proto_negotiated(), b'spdy/2')
+ self.assertEqual(client.get_next_proto_negotiated(), b'spdy/2')
+
+
+ def test_npn_client_fail(self):
+ """
+ Tests that when clients and servers cannot agree on what protocol to
+ use next that the TLS connection does not get established.
+ """
+ advertise_args = []
+ select_args = []
+ def advertise(conn):
+ advertise_args.append((conn,))
+ return [b'http/1.1', b'spdy/2']
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b''
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ # If the client doesn't return anything, the connection will fail.
+ self.assertRaises(Error, self._interactInMemory, server, client)
+
+ self.assertEqual([(server,)], advertise_args)
+ self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args)
+
+
+ def test_npn_select_error(self):
+ """
+ Test that we can handle exceptions in the select callback. If select
+ fails it should be fatal to the connection.
+ """
+ advertise_args = []
+ def advertise(conn):
+ advertise_args.append((conn,))
+ return [b'http/1.1', b'spdy/2']
+ def select(conn, options):
+ raise TypeError
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ # If the callback throws an exception it should be raised here.
+ self.assertRaises(TypeError, self._interactInMemory, server, client)
+ self.assertEqual([(server,)], advertise_args)
+
+
+ def test_npn_advertise_error(self):
+ """
+ Test that we can handle exceptions in the advertise callback. If
+ advertise fails no NPN is advertised to the client.
+ """
+ select_args = []
+ def advertise(conn):
+ raise TypeError
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b''
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ # If the client doesn't return anything, the connection will fail.
+ self.assertRaises(TypeError, self._interactInMemory, server, client)
+ self.assertEqual([], select_args)
+
+
class SessionTests(TestCase):
"""
@@ -2291,6 +2601,164 @@ class ConnectionSendTests(TestCase, _LoopbackMixin):
+def _make_memoryview(size):
+ """
+ Create a new ``memoryview`` wrapped around a ``bytearray`` of the given
+ size.
+ """
+ return memoryview(bytearray(size))
+
+
+
+class ConnectionRecvIntoTests(TestCase, _LoopbackMixin):
+ """
+ Tests for :py:obj:`Connection.recv_into`
+ """
+ def _no_length_test(self, factory):
+ """
+ Assert that when the given buffer is passed to
+ ``Connection.recv_into``, whatever bytes are available to be received
+ that fit into that buffer are written into that buffer.
+ """
+ output_buffer = factory(5)
+
+ server, client = self._loopback()
+ server.send(b('xy'))
+
+ self.assertEqual(client.recv_into(output_buffer), 2)
+ self.assertEqual(output_buffer, bytearray(b('xy\x00\x00\x00')))
+
+
+ def test_bytearray_no_length(self):
+ """
+ :py:obj:`Connection.recv_into` can be passed a ``bytearray`` instance
+ and data in the receive buffer is written to it.
+ """
+ self._no_length_test(bytearray)
+
+
+ def _respects_length_test(self, factory):
+ """
+ Assert that when the given buffer is passed to ``Connection.recv_into``
+ along with a value for ``nbytes`` that is less than the size of that
+ buffer, only ``nbytes`` bytes are written into the buffer.
+ """
+ output_buffer = factory(10)
+
+ server, client = self._loopback()
+ server.send(b('abcdefghij'))
+
+ self.assertEqual(client.recv_into(output_buffer, 5), 5)
+ self.assertEqual(
+ output_buffer, bytearray(b('abcde\x00\x00\x00\x00\x00'))
+ )
+
+
+ def test_bytearray_respects_length(self):
+ """
+ When called with a ``bytearray`` instance,
+ :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter and
+ doesn't copy in more than that number of bytes.
+ """
+ self._respects_length_test(bytearray)
+
+
+ def _doesnt_overfill_test(self, factory):
+ """
+ Assert that if there are more bytes available to be read from the
+ receive buffer than would fit into the buffer passed to
+ :py:obj:`Connection.recv_into`, only as many as fit are written into
+ it.
+ """
+ output_buffer = factory(5)
+
+ server, client = self._loopback()
+ server.send(b('abcdefghij'))
+
+ self.assertEqual(client.recv_into(output_buffer), 5)
+ self.assertEqual(output_buffer, bytearray(b('abcde')))
+ rest = client.recv(5)
+ self.assertEqual(b('fghij'), rest)
+
+
+ def test_bytearray_doesnt_overfill(self):
+ """
+ When called with a ``bytearray`` instance,
+ :py:obj:`Connection.recv_into` respects the size of the array and
+ doesn't write more bytes into it than will fit.
+ """
+ self._doesnt_overfill_test(bytearray)
+
+
+ def _really_doesnt_overfill_test(self, factory):
+ """
+ Assert that if the value given by ``nbytes`` is greater than the actual
+ size of the output buffer passed to :py:obj:`Connection.recv_into`, the
+ behavior is as if no value was given for ``nbytes`` at all.
+ """
+ output_buffer = factory(5)
+
+ server, client = self._loopback()
+ server.send(b('abcdefghij'))
+
+ self.assertEqual(client.recv_into(output_buffer, 50), 5)
+ self.assertEqual(output_buffer, bytearray(b('abcde')))
+ rest = client.recv(5)
+ self.assertEqual(b('fghij'), rest)
+
+
+ def test_bytearray_really_doesnt_overfill(self):
+ """
+ When called with a ``bytearray`` instance and an ``nbytes`` value that
+ is too large, :py:obj:`Connection.recv_into` respects the size of the
+ array and not the ``nbytes`` value and doesn't write more bytes into
+ the buffer than will fit.
+ """
+ self._doesnt_overfill_test(bytearray)
+
+
+ try:
+ memoryview
+ except NameError:
+ "cannot test recv_into memoryview without memoryview"
+ else:
+ def test_memoryview_no_length(self):
+ """
+ :py:obj:`Connection.recv_into` can be passed a ``memoryview``
+ instance and data in the receive buffer is written to it.
+ """
+ self._no_length_test(_make_memoryview)
+
+
+ def test_memoryview_respects_length(self):
+ """
+ When called with a ``memoryview`` instance,
+ :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter
+ and doesn't copy more than that number of bytes in.
+ """
+ self._respects_length_test(_make_memoryview)
+
+
+ def test_memoryview_doesnt_overfill(self):
+ """
+ When called with a ``memoryview`` instance,
+ :py:obj:`Connection.recv_into` respects the size of the array and
+ doesn't write more bytes into it than will fit.
+ """
+ self._doesnt_overfill_test(_make_memoryview)
+
+
+ def test_memoryview_really_doesnt_overfill(self):
+ """
+ When called with a ``memoryview`` instance and an ``nbytes`` value
+ that is too large, :py:obj:`Connection.recv_into` respects the size
+ of the array and not the ``nbytes`` value and doesn't write more
+ bytes into the buffer than will fit.
+ """
+ self._doesnt_overfill_test(_make_memoryview)
+
+
+
class ConnectionSendallTests(TestCase, _LoopbackMixin):
"""
Tests for :py:obj:`Connection.sendall`.
diff --git a/OpenSSL/test/test_util.py b/OpenSSL/test/test_util.py
new file mode 100644
index 0000000..8d92a3c
--- /dev/null
+++ b/OpenSSL/test/test_util.py
@@ -0,0 +1,17 @@
+from OpenSSL._util import exception_from_error_queue, lib
+from OpenSSL.test.util import TestCase
+
+
+
+class ErrorTests(TestCase):
+ """
+ Tests for handling of certain OpenSSL error cases.
+ """
+ def test_exception_from_error_queue_nonexistent_reason(self):
+ """
+ :py:func:`exception_from_error_queue` raises ``ValueError`` when it
+ encounters an OpenSSL error code which does not have a reason string.
+ """
+ lib.ERR_put_error(lib.ERR_LIB_EVP, 0, 1112, b"", 10)
+ exc = self.assertRaises(ValueError, exception_from_error_queue, ValueError)
+ self.assertEqual(exc.args[0][0][2], "")
diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py
index b69e538..b8be91d 100644
--- a/OpenSSL/test/util.py
+++ b/OpenSSL/test/util.py
@@ -27,6 +27,11 @@ except Exception:
from OpenSSL._util import ffi, lib, byte_string as b
+
+# This is the UTF-8 encoding of the SNOWMAN unicode code point.
+NON_ASCII = b("\xe2\x98\x83").decode("utf-8")
+
+
class TestCase(TestCase):
"""
:py:class:`TestCase` adds useful testing functionality beyond what is available
diff --git a/doc/api/crypto.rst b/doc/api/crypto.rst
index b360e89..ee261c5 100644
--- a/doc/api/crypto.rst
+++ b/doc/api/crypto.rst
@@ -42,7 +42,17 @@
.. py:data:: X509StoreType
- A Python type object representing the X509Store object type.
+ See :py:class:`X509Store`
+
+
+.. py:data X509Store
+
+ A class representing the X.509 store.
+
+
+.. py:data:: X509StoreContext
+
+ A class representing the X.509 store context.
.. py:data:: PKeyType
@@ -257,7 +267,7 @@ X509 objects have the following methods:
Return the signature algorithm used in the certificate. If the algorithm is
undefined, raise :py:data:`ValueError`.
- ..versionadded:: 0.13
+ .. versionadded:: 0.13
.. py:method:: X509.get_subject()
@@ -526,6 +536,36 @@ The X509Store object has currently just one method:
Add the certificate *cert* to the certificate store.
+X509StoreContextError objects
+-----------------------------
+
+The X509StoreContextError is an exception raised from
+`X509StoreContext.verify_certificate` in circumstances where a certificate
+cannot be verified in a provided context.
+
+The certificate for which the verification error was detected is given by the
+``certificate`` attribute of the exception instance as a :class:`X509`
+instance.
+
+Details about the verification error are given in the exception's ``args`` attribute.
+
+
+X509StoreContext objects
+------------------------
+
+The X509StoreContext object is used for verifying a certificate against a set
+of trusted certificates.
+
+
+.. py:method:: X509StoreContext.verify_certificate()
+
+ Verify a certificate in the context of this initialized `X509StoreContext`.
+ On error, raises `X509StoreContextError`, otherwise does nothing.
+
+ .. versionadded:: 0.15
+
+
+
.. _openssl-pkey:
PKey objects
diff --git a/doc/api/ssl.rst b/doc/api/ssl.rst
index a75af1f..e6a0775 100644
--- a/doc/api/ssl.rst
+++ b/doc/api/ssl.rst
@@ -472,6 +472,33 @@ Context objects have the following methods:
.. versionadded:: 0.13
+.. py:method:: Context.set_npn_advertise_callback(callback)
+
+ Specify a callback function that will be called when offering `Next
+ Protocol Negotiation
+ <https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server.
+
+ *callback* should be the callback function. It will be invoked with one
+ argument, the :py:class:`Connection` instance. It should return a list of
+ bytestrings representing the advertised protocols, like
+ ``[b'http/1.1', b'spdy/2']``.
+
+ .. versionadded:: 0.15
+
+
+.. py:method:: Context.set_npn_select_callback(callback):
+
+ Specify a callback function that will be called when a server offers Next
+ Protocol Negotiation options.
+
+ *callback* should be the callback function. It will be invoked with two
+ arguments: the :py:class:`Connection`, and a list of offered protocols as
+ bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return one of
+ those bytestrings, the chosen protocol.
+
+ .. versionadded:: 0.15
+
+
.. _openssl-session:
Session objects
@@ -614,6 +641,14 @@ Connection objects have the following methods:
by *bufsize*.
+.. py:method:: Connection.recv_into(buffer[, nbytes[, flags]])
+
+ Receive data from the Connection and copy it directly into the provided
+ buffer. The return value is the number of bytes read from the connection.
+ The maximum amount of data to be received at once is specified by *nbytes*.
+ *flags* is accepted for compatibility with ``socket.recv_into`` but its
+ value is ignored.
+
.. py:method:: Connection.bio_write(bytes)
If the Connection was created with a memory BIO, this method can be used to add
@@ -806,6 +841,15 @@ Connection objects have the following methods:
.. versionadded:: 0.15
+.. py:method:: Connection.get_next_proto_negotiated():
+
+ Get the protocol that was negotiated by Next Protocol Negotiation. Returns
+ a bytestring of the protocol name. If no protocol has been negotiated yet,
+ returns an empty string.
+
+ .. versionadded:: 0.15
+
+
.. Rubric:: Footnotes
.. [#connection-context-socket] Actually, all that is required is an object that