diff options
Diffstat (limited to 'tests/run/mockbuffers.pxi')
-rw-r--r-- | tests/run/mockbuffers.pxi | 313 |
1 files changed, 313 insertions, 0 deletions
diff --git a/tests/run/mockbuffers.pxi b/tests/run/mockbuffers.pxi new file mode 100644 index 000000000..acfb813da --- /dev/null +++ b/tests/run/mockbuffers.pxi @@ -0,0 +1,313 @@ +from libc cimport stdlib +from libc cimport stdio +cimport cpython.buffer +cimport cython + +from cpython cimport PyObject, Py_INCREF, Py_DECREF + + +available_flags = ( + ('FORMAT', cpython.buffer.PyBUF_FORMAT), + ('INDIRECT', cpython.buffer.PyBUF_INDIRECT), + ('ND', cpython.buffer.PyBUF_ND), + ('STRIDES', cpython.buffer.PyBUF_STRIDES), + ('C_CONTIGUOUS', cpython.buffer.PyBUF_C_CONTIGUOUS), + ('F_CONTIGUOUS', cpython.buffer.PyBUF_F_CONTIGUOUS), + ('WRITABLE', cpython.buffer.PyBUF_WRITABLE) +) + +cdef class MockBuffer: + cdef object format, offset + cdef void* buffer + cdef int len, itemsize, ndim + cdef Py_ssize_t* strides + cdef Py_ssize_t* shape + cdef Py_ssize_t* suboffsets + cdef object label, log + + cdef readonly object recieved_flags, release_ok + cdef public object fail + + def __init__(self, label, data, shape=None, strides=None, format=None, offset=0): + # It is important not to store references to data after the constructor + # as refcounting is checked on object buffers. + self.label = label + self.release_ok = True + self.log = "" + self.offset = offset + self.itemsize = self.get_itemsize() + if format is None: format = self.get_default_format() + if shape is None: shape = (len(data),) + if strides is None: + strides = [] + cumprod = 1 + rshape = list(shape) + rshape.reverse() + for s in rshape: + strides.append(cumprod) + cumprod *= s + strides.reverse() + strides = [x * self.itemsize for x in strides] + suboffsets = [-1] * len(shape) + datashape = [len(data)] + p = data + while True: + p = p[0] + if isinstance(p, list): datashape.append(len(p)) + else: break + if len(datashape) > 1: + # indirect access + self.ndim = len(datashape) + shape = datashape + self.buffer = self.create_indirect_buffer(data, shape) + suboffsets = [0] * (self.ndim-1) + [-1] + strides = [sizeof(void*)] * (self.ndim-1) + [self.itemsize] + self.suboffsets = self.list_to_sizebuf(suboffsets) + else: + # strided and/or simple access + self.buffer = self.create_buffer(data) + self.ndim = len(shape) + self.suboffsets = NULL + + try: + format = format.encode('ASCII') + except AttributeError: + pass + self.format = format + self.len = len(data) * self.itemsize + + self.strides = self.list_to_sizebuf(strides) + self.shape = self.list_to_sizebuf(shape) + + def __dealloc__(self): + stdlib.free(self.strides) + stdlib.free(self.shape) + if self.suboffsets != NULL: + stdlib.free(self.suboffsets) + # must recursively free indirect... + else: + stdlib.free(self.buffer) + + cdef void* create_buffer(self, data) except NULL: + cdef size_t n = <size_t>(len(data) * self.itemsize) + cdef char* buf = <char*>stdlib.malloc(n) + if buf == NULL: + raise MemoryError + cdef char* it = buf + for value in data: + self.write(it, value) + it += self.itemsize + return buf + + cdef void* create_indirect_buffer(self, data, shape): + cdef size_t n = 0 + cdef void** buf + assert shape[0] == len(data) + if len(shape) == 1: + return self.create_buffer(data) + else: + shape = shape[1:] + n = <size_t>len(data) * sizeof(void*) + buf = <void**>stdlib.malloc(n) + for idx, subdata in enumerate(data): + buf[idx] = self.create_indirect_buffer(subdata, shape) + return buf + + cdef Py_ssize_t* list_to_sizebuf(self, l): + cdef size_t n = <size_t>len(l) * sizeof(Py_ssize_t) + cdef Py_ssize_t* buf = <Py_ssize_t*>stdlib.malloc(n) + for i, x in enumerate(l): + buf[i] = x + return buf + + def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags): + if self.fail: + raise ValueError("Failing on purpose") + + self.recieved_flags = [] + cdef int value + for name, value in available_flags: + if (value & flags) == value: + self.recieved_flags.append(name) + + buffer.buf = <void*>(<char*>self.buffer + (<int>self.offset * self.itemsize)) + buffer.obj = self + buffer.len = self.len + buffer.readonly = 0 + buffer.format = <char*>self.format + buffer.ndim = self.ndim + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = self.suboffsets + buffer.itemsize = self.itemsize + buffer.internal = NULL + if self.label: + msg = "acquired %s" % self.label + print msg + self.log += msg + "\n" + + def __releasebuffer__(MockBuffer self, Py_buffer* buffer): + if buffer.suboffsets != self.suboffsets: + self.release_ok = False + if self.label: + msg = "released %s" % self.label + print msg + self.log += msg + "\n" + + def printlog(self): + print self.log[:-1] + + def resetlog(self): + self.log = "" + + cdef int write(self, char* buf, object value) except -1: raise Exception() + cdef get_itemsize(self): + print "ERROR, not subclassed", self.__class__ + cdef get_default_format(self): + print "ERROR, not subclassed", self.__class__ + +cdef class CharMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<char*>buf)[0] = <char>value + return 0 + cdef get_itemsize(self): return sizeof(char) + cdef get_default_format(self): return b"@b" + +cdef class IntMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<int*>buf)[0] = <int>value + return 0 + cdef get_itemsize(self): return sizeof(int) + cdef get_default_format(self): return b"@i" + +cdef class UnsignedIntMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<unsigned int*>buf)[0] = <unsigned int>value + return 0 + cdef get_itemsize(self): return sizeof(unsigned int) + cdef get_default_format(self): return b"@I" + +cdef class ShortMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<short*>buf)[0] = <short>value + return 0 + cdef get_itemsize(self): return sizeof(short) + cdef get_default_format(self): return b"h" # Try without endian specifier + +cdef class UnsignedShortMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<unsigned short*>buf)[0] = <unsigned short>value + return 0 + cdef get_itemsize(self): return sizeof(unsigned short) + cdef get_default_format(self): return b"@1H" # Try with repeat count + +cdef class FloatMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<float*>buf)[0] = <float>(<double>value) + return 0 + cdef get_itemsize(self): return sizeof(float) + cdef get_default_format(self): return b"f" + +cdef class DoubleMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<double*>buf)[0] = <double>value + return 0 + cdef get_itemsize(self): return sizeof(double) + cdef get_default_format(self): return b"d" + +cdef extern from *: + void* addr_of_pyobject "(void*)"(object) + +cdef class ObjectMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + (<void**>buf)[0] = addr_of_pyobject(value) + return 0 + + cdef get_itemsize(self): return sizeof(void*) + cdef get_default_format(self): return b"@O" + + +cdef class IntStridedMockBuffer(IntMockBuffer): + cdef __cythonbufferdefaults__ = {"mode" : "strided"} + +cdef class ErrorBuffer: + cdef object label + + def __init__(self, label): + self.label = label + + def __getbuffer__(ErrorBuffer self, Py_buffer* buffer, int flags): + raise Exception("acquiring %s" % self.label) + + def __releasebuffer__(ErrorBuffer self, Py_buffer* buffer): + raise Exception("releasing %s" % self.label) + +# +# Structs +# +cdef struct MyStruct: + char a + char b + long long int c + int d + int e + +cdef struct SmallStruct: + int a + int b + +cdef struct NestedStruct: + SmallStruct x + SmallStruct y + int z + +cdef packed struct PackedStruct: + char a + int b + +cdef struct NestedPackedStruct: + char a + int b + PackedStruct sub + int c + +cdef class MyStructMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + cdef MyStruct* s + s = <MyStruct*>buf; + s.a, s.b, s.c, s.d, s.e = value + return 0 + + cdef get_itemsize(self): return sizeof(MyStruct) + cdef get_default_format(self): return b"2bq2i" + +cdef class NestedStructMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + cdef NestedStruct* s + s = <NestedStruct*>buf; + s.x.a, s.x.b, s.y.a, s.y.b, s.z = value + return 0 + + cdef get_itemsize(self): return sizeof(NestedStruct) + cdef get_default_format(self): return b"2T{ii}i" + +cdef class PackedStructMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + cdef PackedStruct* s + s = <PackedStruct*>buf; + s.a, s.b = value + return 0 + + cdef get_itemsize(self): return sizeof(PackedStruct) + cdef get_default_format(self): return b"^ci" + +cdef class NestedPackedStructMockBuffer(MockBuffer): + cdef int write(self, char* buf, object value) except -1: + cdef NestedPackedStruct* s + s = <NestedPackedStruct*>buf; + s.a, s.b, s.sub.a, s.sub.b, s.c = value + return 0 + + cdef get_itemsize(self): return sizeof(NestedPackedStruct) + cdef get_default_format(self): return b"ci^ci@i" + |