diff options
author | Jim Carroll <jim@caroll.com> | 2016-08-01 16:57:59 -0400 |
---|---|---|
committer | Matěj Cepl <mcepl@cepl.eu> | 2017-09-15 14:42:50 +0200 |
commit | 917a99d0cbdcadd6fc85dff44fd92511ab464cf0 (patch) | |
tree | 7636adf113d3d33f8aa3d3967fc70899783c6c81 | |
parent | ae7053e021b08668413f7d2af2364f9fb0b4c9f8 (diff) | |
download | m2crypto-917a99d0cbdcadd6fc85dff44fd92511ab464cf0.tar.gz |
Added test coverage
-rw-r--r-- | M2Crypto/BIO.py | 12 | ||||
-rw-r--r-- | SWIG/_bio.i | 238 | ||||
-rw-r--r-- | tests/test_bio_file.py | 18 |
3 files changed, 268 insertions, 0 deletions
diff --git a/M2Crypto/BIO.py b/M2Crypto/BIO.py index 7634515..cb5ab2d 100644 --- a/M2Crypto/BIO.py +++ b/M2Crypto/BIO.py @@ -158,6 +158,18 @@ class BIO(object): """ return m2.bio_should_write(self.bio) + def tell(self): + """ + Return the current offset. + """ + return m2.bio_tell(self.bio) + + def seek(self, off): + """ + Seek to the specified absolute offset. + """ + return m2.bio_seek(self.bio, off) + def __enter__(self): return self diff --git a/SWIG/_bio.i b/SWIG/_bio.i index aa5a946..b4a4c02 100644 --- a/SWIG/_bio.i +++ b/SWIG/_bio.i @@ -221,6 +221,10 @@ int bio_seek(BIO *bio, int offset) { return (int)BIO_seek(bio, offset); } +int bio_tell(BIO* bio) { + return BIO_tell(bio); +} + void bio_set_flags(BIO *bio, int flags) { BIO_set_flags(bio, flags); } @@ -282,5 +286,239 @@ int bio_should_read(BIO* a) { int bio_should_write(BIO* a) { return BIO_should_write(a); } + +/* implement custom BIO_s_pyfd */ + +#ifdef WIN32 +# define clear_sys_error() SetLastError(0) +#else +# define clear_sys_error() errno=0 +#endif + +#if OPENSSL_VERSION_NUMBER < 0x10100000L +void BIO_set_data(BIO *a, void *ptr) { + a->ptr = ptr; +} +void *BIO_get_data(BIO *a) { + return a->ptr; +} +void BIO_set_init(BIO *a, int init) { + a->init = init; +} +int BIO_get_init(BIO *a) { + return a->init; +} +int BIO_get_shutdown(BIO *a) { + return a->shutdown; +} +void BIO_set_shutdown(BIO *a, int shutdown) { + a->shutdown = shutdown; +} +#endif + +typedef struct pyfd_struct { + int fd; +} BIO_PYFD_CTX; + + +static int pyfd_write(BIO *b, const char *in, int inl) { + int ret, fd; + + if (BIO_get_fd(b, &fd) == -1) + return -1; + clear_sys_error(); + ret = write(fd, in, inl); + BIO_clear_retry_flags(b); + if (ret <= 0) { + if (BIO_fd_should_retry(ret)) + BIO_set_retry_write(b); + } + return ret; +} + +static int pyfd_read(BIO *b, char *out, int outl) { + int ret = 0, fd; + + if (BIO_get_fd(b, &fd) == -1) + return -1; + if (out != NULL) { + clear_sys_error(); + ret = read(fd, out, outl); + BIO_clear_retry_flags(b); + if (ret <= 0) { + if (BIO_fd_should_retry(ret)) + BIO_set_retry_read(b); + } + } + return ret; +} + +static int pyfd_puts(BIO *bp, const char *str) { + int n, ret; + + n = strlen(str); + ret = pyfd_write(bp, str, n); + return ret; +} + +static int pyfd_gets(BIO *bp, char *buf, int size) { + int ret = 0; + char *ptr = buf; + char *end = buf + size - 1; + + while ((ptr < end) && (pyfd_read(bp, ptr, 1) > 0) && (ptr[0] != '\n')) + ptr++; + + ptr[0] = '\0'; + + if (buf[0] != '\0') + ret = strlen(buf); + return ret; +} + +static int pyfd_new(BIO* b) { + BIO_PYFD_CTX* ctx; + + ctx = OPENSSL_malloc(sizeof(*ctx)); + if (ctx == NULL) + return 0; + + ctx->fd = -1; + + BIO_set_data(b, ctx); + BIO_set_shutdown(b, 0); + BIO_set_init(b, 1); + + return 1; + } + +static int pyfd_free(BIO* b) { + BIO_PYFD_CTX* ctx; + + if (b == 0) + return 0; + + ctx = BIO_get_data(b); + if (ctx == NULL) + return 0; + + if (BIO_get_shutdown(b) && BIO_get_init(b)) + close(ctx->fd); + + BIO_set_data(b, NULL); + BIO_set_shutdown(b, 0); + BIO_set_init(b, 0); + + OPENSSL_free(ctx); + + return 1; +} + +static long pyfd_ctrl(BIO *b, int cmd, long num, void *ptr) { + BIO_PYFD_CTX* ctx; + int *ip; + long ret = 1; + + ctx = BIO_get_data(b); + if (ctx == NULL) + return 0; + + switch (cmd) { + case BIO_CTRL_RESET: + num = 0; + case BIO_C_FILE_SEEK: + ret = (long)lseek(ctx->fd, num, 0); + break; + case BIO_C_FILE_TELL: + case BIO_CTRL_INFO: + ret = (long)lseek(ctx->fd, 0, 1); + break; + case BIO_C_SET_FD: + pyfd_free(b); + if (*((int *)ptr) > -1) { + if (!pyfd_new(b) || !(ctx = BIO_get_data(b))) + return 0; + ctx->fd = *((int *)ptr); + BIO_set_shutdown(b, (int)num); + BIO_set_init(b, 1); + } + break; + case BIO_C_GET_FD: + if (BIO_get_init(b)) { + ip = (int *)ptr; + if (ip != NULL) + *ip = ctx->fd; + ret = ctx->fd; + } else + ret = -1; + break; + case BIO_CTRL_GET_CLOSE: + ret = BIO_get_shutdown(b); + break; + case BIO_CTRL_SET_CLOSE: + BIO_set_shutdown(b, (int)num); + break; + case BIO_CTRL_PENDING: + case BIO_CTRL_WPENDING: + ret = 0; + break; + case BIO_CTRL_DUP: + case BIO_CTRL_FLUSH: + ret = 1; + break; + default: + ret = 0; + break; + } + return ret; +} + + +#if OPENSSL_VERSION_NUMBER >= 0x10100000L +BIO* BIO_new_pyfd(int fd, int close_flag) { + BIO *ret; + BIO_METHOD *methods_fdp; + + methods_fdp = BIO_meth_new(35|0x400|0x100, + "python file descriptor"); + + BIO_meth_set_write(methods_fdp, pyfd_write); + BIO_meth_set_read(methods_fdp, pyfd_read); + BIO_meth_set_puts(methods_fdp, pyfd_puts); + BIO_meth_set_gets(methods_fdp, pyfd_gets); + BIO_meth_set_ctrl(methods_fdp, pyfd_ctrl); + BIO_meth_set_create(methods_fdp, pyfd_new); + BIO_meth_set_destroy(methods_fdp, pyfd_free); + + ret = BIO_new(methods_fdp); + BIO_set_fd(ret, fd, close_flag); + return ret; + } +#else +static BIO_METHOD methods_pyfdp = { + 35|0x400|0x100, + "python file descriptor", + pyfd_write, + pyfd_read, + pyfd_puts, + pyfd_gets, + pyfd_ctrl, + pyfd_new, + pyfd_free, + NULL +}; +BIO_METHOD *BIO_s_pyfd(void) { + return (&methods_pyfdp); +} + +BIO *BIO_new_pyfd(int fd, int close_flag) { + BIO *ret; + ret = BIO_new(BIO_s_pyfd()); + if (ret == NULL) + return NULL; + BIO_set_fd(ret, fd, close_flag); + return ret; +} +#endif %} diff --git a/tests/test_bio_file.py b/tests/test_bio_file.py index b819a63..36516b7 100644 --- a/tests/test_bio_file.py +++ b/tests/test_bio_file.py @@ -88,6 +88,24 @@ class FileTestCase(unittest.TestCase): self.assertEqual(len(in_data), len(self.data)) self.assertEqual(in_data, self.data) + def test_readline(self): + with open(self.fname, 'w') as f: + f.write('hello\nworld\n') + with openfile(self.fname, 'r') as f: + self.assertTrue(f.readable()) + self.assertEqual(f.readline(), 'hello\n') + self.assertEqual(f.readline(), 'world\n') + with openfile(self.fname, 'r') as f: + self.assertEqual(f.readlines(), ['hello\n', 'world\n']) + + def test_tell_seek(self): + with open(self.fname, 'w') as f: + f.write('hello world') + with openfile(self.fname, 'r') as f: + # Seek absolute + f.seek(6) + self.assertEqual(f.tell(), 6) + def suite(): return unittest.makeSuite(FileTestCase) |