summaryrefslogtreecommitdiff
path: root/tests/test_bio_file.py
blob: f10948f2f4a9d8e85f2501796374f6f69a87843d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python

"""Unit tests for M2Crypto.BIO.File.

Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved."""

import logging
import os
import platform
import tempfile
import ctypes
if platform.system() == 'Windows':
    import ctypes.wintypes

from M2Crypto.BIO import File, openfile
from . import unittest

log = logging.getLogger(__name__)


def getCountProcHandles():
    PROCESS_QUERY_INFORMATION = 0x400
    handle = ctypes.windll.kernel32.OpenProcess(
        PROCESS_QUERY_INFORMATION, 0, os.getpid())
    hndcnt = ctypes.wintypes.DWORD()
    ctypes.windll.kernel32.GetProcessHandleCount(
        handle, ctypes.byref(hndcnt))
    sys_value = hndcnt.value
    ctypes.windll.kernel32.CloseHandle(handle)
    return sys_value + 1


class FileTestCase(unittest.TestCase):

    def setUp(self):
        self.data = b'abcdef' * 64
        self.fd, self.fname = tempfile.mkstemp()

        if platform.system() in ['Linux', 'Darwin', 'FreeBSD']:
            self.__dev_fd = "/dev/fd/"

        self.fd_count = self.__mfd()

    def __mfd(self):
        if hasattr(self, '__dev_fd'):
            return len(os.listdir(self.__dev_fd))
        elif platform.system() == 'Windows':
            return getCountProcHandles()
        else:
            return None

    def tearDown(self):

        self.assertEqual(self.fd_count, self.__mfd(),
                         "last test did not close all file descriptors properly")

        try:
            os.close(self.fd)
        except OSError:
            pass

    def test_openfile_rb(self):
        # First create the file using Python's open().
        with open(self.fname, 'wb') as f:
            f.write(self.data)

        # Now open the file using M2Crypto.BIO.openfile().
        with openfile(self.fname, 'rb') as f:
            data = f.read(len(self.data))

        self.assertEqual(data, self.data)

    def test_openfile_wb(self):
        # First create the file using M2Crypto.BIO.openfile().
        with openfile(self.fname, 'wb') as f:
            f.write(self.data)

        # Now open the file using Python's open().
        with open(self.fname, 'rb') as f:
            data = f.read(len(self.data))

        self.assertEqual(data, self.data)

    def test_closed(self):
        f = openfile(self.fname, 'wb')
        f.write(self.data)
        f.close()
        with self.assertRaises(IOError):
            f.write(self.data)

    def test_use_pyfile(self):
        # First create the file.
        with open(self.fname, 'wb') as f:
            f2 = File(f)
            f2.write(self.data)
            f2.close()

        # Now read the file.
        with open(self.fname, 'rb') as f:
            in_data = f.read(len(self.data))

        self.assertEqual(len(in_data), len(self.data))
        self.assertEqual(in_data, self.data)

    def test_readline_bin(self):
        with open(self.fname, 'wb') as f:
            f.write(b'hello\nworld\n')
        with openfile(self.fname, 'rb') as f:
            self.assertTrue(f.readable())
            self.assertEqual(f.readline(), b'hello\n')
            self.assertEqual(f.readline(), b'world\n')
        with openfile(self.fname, 'rb') as f:
            self.assertEqual(
                f.readlines(),
                [b'hello\n', b'world\n'])

    def test_readline(self):
        sep = os.linesep.encode()
        with open(self.fname, 'w') as f:
            f.write('hello\nworld\n')
        with openfile(self.fname, 'rb') as f:
            self.assertTrue(f.readable())
            self.assertEqual(f.readline(), b'hello' + sep)
            self.assertEqual(f.readline(), b'world' + sep)
        with openfile(self.fname, 'rb') as f:
            self.assertEqual(
                f.readlines(),
                [b'hello' + sep, b'world' + sep])

    def test_tell_seek(self):
        with open(self.fname, 'w') as f:
            f.write('hello world')
        with openfile(self.fname, 'r') as f:
            # Seek absolute
            f.seek(6)
            self.assertEqual(f.tell(), 6)


def suite():
    return unittest.TestLoader().loadTestsFromTestCase(FileTestCase)


if __name__ == '__main__':
    unittest.TextTestRunner().run(suite())