summaryrefslogtreecommitdiff
path: root/src/M2Crypto/BIO.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/M2Crypto/BIO.py')
-rw-r--r--src/M2Crypto/BIO.py390
1 files changed, 390 insertions, 0 deletions
diff --git a/src/M2Crypto/BIO.py b/src/M2Crypto/BIO.py
new file mode 100644
index 0000000..090064b
--- /dev/null
+++ b/src/M2Crypto/BIO.py
@@ -0,0 +1,390 @@
+from __future__ import absolute_import
+
+"""M2Crypto wrapper for OpenSSL BIO API.
+
+Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved."""
+
+import io
+import logging
+from typing import Any, AnyStr, Callable, Iterable, Optional, Union # noqa
+
+from M2Crypto import m2, six
+
+log = logging.getLogger('BIO')
+
+
+class BIOError(ValueError):
+ pass
+
+
+m2.bio_init(BIOError)
+
+
+class BIO(object):
+ """Abstract object interface to the BIO API."""
+
+ m2_bio_free = m2.bio_free
+
+ def __init__(self, bio=None, _pyfree=0, _close_cb=None):
+ # type: (Optional[BIO], int, Optional[Callable]) -> None
+ self.bio = bio
+ self._pyfree = _pyfree
+ self._close_cb = _close_cb
+ self.closed = 0
+ self.write_closed = 0
+
+ def __del__(self):
+ if self._pyfree:
+ self.m2_bio_free(self.bio)
+
+ def _ptr(self):
+ return self.bio
+
+ # Deprecated.
+ bio_ptr = _ptr
+
+ def fileno(self):
+ # type: () -> int
+ return m2.bio_get_fd(self.bio)
+
+ def readable(self):
+ # type: () -> bool
+ return not self.closed
+
+ def read(self, size=None):
+ # type: (int) -> Union[bytes, bytearray]
+ if not self.readable():
+ raise IOError('cannot read')
+ if size is None:
+ buf = bytearray()
+ while 1:
+ data = m2.bio_read(self.bio, 4096)
+ if not data:
+ break
+ buf += data
+ return buf
+ elif size == 0:
+ return b''
+ elif size < 0:
+ raise ValueError('read count is negative')
+ else:
+ return bytes(m2.bio_read(self.bio, size))
+
+ def readline(self, size=4096):
+ # type: (int) -> bytes
+ if not self.readable():
+ raise IOError('cannot read')
+ buf = m2.bio_gets(self.bio, size)
+ buf = '' if buf is None else buf
+ return six.ensure_binary(buf)
+
+ def readlines(self, sizehint='ignored'):
+ # type: (Union[AnyStr, int]) -> Iterable[bytes]
+ if not self.readable():
+ raise IOError('cannot read')
+ lines = []
+ while 1:
+ buf = m2.bio_gets(self.bio, 4096)
+ if buf is None:
+ break
+ lines.append(six.ensure_binary(buf))
+ return lines
+
+ def writeable(self):
+ # type: () -> bool
+ return (not self.closed) and (not self.write_closed)
+
+ def write(self, data):
+ # type: (AnyStr) -> int
+ """Write data to BIO.
+
+ :return: either data written, or [0, -1] for nothing written,
+ -2 not implemented
+ """
+ if not self.writeable():
+ raise IOError('cannot write')
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return m2.bio_write(self.bio, data)
+
+ def write_close(self):
+ # type: () -> None
+ self.write_closed = 1
+
+ def flush(self):
+ # type: () -> None
+ """Flush the buffers.
+
+ :return: 1 for success, and 0 or -1 for failure
+ """
+ m2.bio_flush(self.bio)
+
+ def reset(self):
+ # type: () -> int
+ """Set the bio to its initial state.
+
+ :return: 1 for success, and 0 or -1 for failure
+ """
+ return m2.bio_reset(self.bio)
+
+ def close(self):
+ # type: () -> None
+ self.closed = 1
+ if self._close_cb:
+ self._close_cb()
+
+ def should_retry(self):
+ # type: () -> int
+ """
+ Can the call be attempted again, or was there an error
+ ie do_handshake
+
+ """
+ return m2.bio_should_retry(self.bio)
+
+ def should_read(self):
+ # type: () -> int
+ """Should we read more data?"""
+
+ return m2.bio_should_read(self.bio)
+
+ def should_write(self):
+ # type: () -> int
+ """Should we write more data?"""
+ return m2.bio_should_write(self.bio)
+
+ def tell(self):
+ """Return the current offset."""
+ return m2.bio_tell(self.bio)
+
+ def seek(self, off):
+ """Seek to the specified absolute offset."""
+ return m2.bio_seek(self.bio, off)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ # type: (*Any) -> int
+ self.close()
+
+
+class MemoryBuffer(BIO):
+ """Object interface to BIO_s_mem.
+
+ Empirical testing suggests that this class performs less well than
+ cStringIO, because cStringIO is implemented in C, whereas this class
+ is implemented in Python. Thus, the recommended practice is to use
+ cStringIO for regular work and convert said cStringIO object to
+ a MemoryBuffer object only when necessary.
+ """
+
+ def __init__(self, data=None):
+ # type: (Optional[bytes]) -> None
+ super(MemoryBuffer, self).__init__(self)
+ if data is not None and not isinstance(data, bytes):
+ raise TypeError(
+ "data must be bytes or None, not %s" % (type(data).__name__, ))
+ self.bio = m2.bio_new(m2.bio_s_mem())
+ self._pyfree = 1
+ if data is not None:
+ m2.bio_write(self.bio, data)
+
+ def __len__(self):
+ # type: () -> int
+ return m2.bio_ctrl_pending(self.bio)
+
+ def read(self, size=0):
+ # type: (int) -> bytes
+ if not self.readable():
+ raise IOError('cannot read')
+ if size:
+ return m2.bio_read(self.bio, size)
+ else:
+ return m2.bio_read(self.bio, m2.bio_ctrl_pending(self.bio))
+
+ # Backwards-compatibility.
+ getvalue = read_all = read
+
+ def write_close(self):
+ # type: () -> None
+ super(MemoryBuffer, self).write_close()
+ m2.bio_set_mem_eof_return(self.bio, 0)
+
+ close = write_close
+
+
+class File(BIO):
+ """Object interface to BIO_s_pyfd.
+
+ This class interfaces Python to OpenSSL functions that expect BIO. For
+ general file manipulation in Python, use Python's builtin file object.
+ """
+
+ def __init__(self, pyfile, close_pyfile=1, mode='rb'):
+ # type: (Union[io.BytesIO, AnyStr], int, AnyStr) -> None
+ super(File, self).__init__(self, _pyfree=1)
+
+ if isinstance(pyfile, six.string_types):
+ pyfile = open(pyfile, mode)
+
+ # This is for downward compatibility, but I don't think, that it is
+ # good practice to have two handles for the same file. Whats about
+ # concurrent write access? Last write, last wins? Especially since Py3
+ # has its own buffer management. See:
+ #
+ # https://docs.python.org/3.3/c-api/file.html
+ #
+ pyfile.flush()
+ self.fname = pyfile.name
+ self.pyfile = pyfile
+ # Be wary of https://github.com/openssl/openssl/pull/1925
+ # BIO_new_fd is NEVER to be used before OpenSSL 1.1.1
+ if hasattr(m2, "bio_new_pyfd"):
+ self.bio = m2.bio_new_pyfd(pyfile.fileno(), m2.bio_noclose)
+ else:
+ self.bio = m2.bio_new_pyfile(pyfile, m2.bio_noclose)
+
+ self.close_pyfile = close_pyfile
+ self.closed = False
+
+ def flush(self):
+ # type: () -> None
+ super(File, self).flush()
+ self.pyfile.flush()
+
+ def close(self):
+ # type: () -> None
+ self.flush()
+ super(File, self).close()
+ if self.close_pyfile:
+ self.pyfile.close()
+
+ def reset(self):
+ # type: () -> int
+ """Set the bio to its initial state.
+
+ :return: 0 for success, and -1 for failure
+ """
+ return super(File, self).reset()
+
+ def __del__(self):
+ if not self.closed:
+ m2.bio_free(self.bio)
+
+
+def openfile(filename, mode='rb'):
+ # type: (AnyStr, AnyStr) -> File
+ try:
+ f = open(filename, mode)
+ except IOError as ex:
+ raise BIOError(ex.args)
+
+ return File(f)
+
+
+class IOBuffer(BIO):
+ """Object interface to BIO_f_buffer.
+
+ Its principal function is to be BIO_push()'ed on top of a BIO_f_ssl, so
+ that makefile() of said underlying SSL socket works.
+ """
+
+ m2_bio_pop = m2.bio_pop
+ m2_bio_free = m2.bio_free
+
+ def __init__(self, under_bio, mode='rwb', _pyfree=1):
+ # type: (BIO, str, int) -> None
+ super(IOBuffer, self).__init__(self, _pyfree=_pyfree)
+ self.io = m2.bio_new(m2.bio_f_buffer())
+ self.bio = m2.bio_push(self.io, under_bio._ptr())
+ # This reference keeps the underlying BIO alive while we're not closed.
+ self._under_bio = under_bio
+ if 'w' in mode:
+ self.write_closed = 0
+ else:
+ self.write_closed = 1
+
+ def __del__(self):
+ # type: () -> None
+ if getattr(self, '_pyfree', 0):
+ self.m2_bio_pop(self.bio)
+ self.m2_bio_free(self.io)
+
+ def close(self):
+ # type: () -> None
+ BIO.close(self)
+
+
+class CipherStream(BIO):
+ """Object interface to BIO_f_cipher."""
+
+ SALT_LEN = m2.PKCS5_SALT_LEN
+
+ m2_bio_pop = m2.bio_pop
+ m2_bio_free = m2.bio_free
+
+ def __init__(self, obio):
+ # type: (BIO) -> None
+ super(CipherStream, self).__init__(self, _pyfree=1)
+ self.obio = obio
+ self.bio = m2.bio_new(m2.bio_f_cipher())
+ self.closed = 0
+
+ def __del__(self):
+ # type: () -> None
+ if not getattr(self, 'closed', 1):
+ self.close()
+
+ def close(self):
+ # type: () -> None
+ self.m2_bio_pop(self.bio)
+ self.m2_bio_free(self.bio)
+ self.closed = 1
+
+ def write_close(self):
+ # type: () -> None
+ self.obio.write_close()
+
+ def set_cipher(self, algo, key, iv, op):
+ # type: (str, AnyStr, AnyStr, int) -> None
+ cipher = getattr(m2, algo, None)
+ if cipher is None:
+ raise ValueError('unknown cipher', algo)
+ else:
+ if not isinstance(key, bytes):
+ key = key.encode('utf8')
+ if not isinstance(iv, bytes):
+ iv = iv.encode('utf8')
+ m2.bio_set_cipher(self.bio, cipher(), key, iv, int(op))
+ m2.bio_push(self.bio, self.obio._ptr())
+
+
+class SSLBio(BIO):
+ """Object interface to BIO_f_ssl."""
+
+ def __init__(self, _pyfree=1):
+ # type: (int) -> None
+ super(SSLBio, self).__init__(self, _pyfree=_pyfree)
+ self.bio = m2.bio_new(m2.bio_f_ssl())
+ self.closed = 0
+
+ def set_ssl(self, conn, close_flag=m2.bio_noclose):
+ ## type: (Connection, int) -> None
+ """
+ Sets the bio to the SSL pointer which is
+ contained in the connection object.
+ """
+ self._pyfree = 0
+ m2.bio_set_ssl(self.bio, conn.ssl, close_flag)
+ if close_flag == m2.bio_noclose:
+ conn.set_ssl_close_flag(m2.bio_close)
+
+ def do_handshake(self):
+ # type: () -> int
+ """Do the handshake.
+
+ Return 1 if the handshake completes
+ Return 0 or a negative number if there is a problem
+ """
+ return m2.bio_do_handshake(self.bio)