summaryrefslogtreecommitdiff
path: root/tests/run/mockbuffers.pxi
diff options
context:
space:
mode:
Diffstat (limited to 'tests/run/mockbuffers.pxi')
-rw-r--r--tests/run/mockbuffers.pxi313
1 files changed, 313 insertions, 0 deletions
diff --git a/tests/run/mockbuffers.pxi b/tests/run/mockbuffers.pxi
new file mode 100644
index 000000000..acfb813da
--- /dev/null
+++ b/tests/run/mockbuffers.pxi
@@ -0,0 +1,313 @@
+from libc cimport stdlib
+from libc cimport stdio
+cimport cpython.buffer
+cimport cython
+
+from cpython cimport PyObject, Py_INCREF, Py_DECREF
+
+
+available_flags = (
+ ('FORMAT', cpython.buffer.PyBUF_FORMAT),
+ ('INDIRECT', cpython.buffer.PyBUF_INDIRECT),
+ ('ND', cpython.buffer.PyBUF_ND),
+ ('STRIDES', cpython.buffer.PyBUF_STRIDES),
+ ('C_CONTIGUOUS', cpython.buffer.PyBUF_C_CONTIGUOUS),
+ ('F_CONTIGUOUS', cpython.buffer.PyBUF_F_CONTIGUOUS),
+ ('WRITABLE', cpython.buffer.PyBUF_WRITABLE)
+)
+
+cdef class MockBuffer:
+ cdef object format, offset
+ cdef void* buffer
+ cdef int len, itemsize, ndim
+ cdef Py_ssize_t* strides
+ cdef Py_ssize_t* shape
+ cdef Py_ssize_t* suboffsets
+ cdef object label, log
+
+ cdef readonly object recieved_flags, release_ok
+ cdef public object fail
+
+ def __init__(self, label, data, shape=None, strides=None, format=None, offset=0):
+ # It is important not to store references to data after the constructor
+ # as refcounting is checked on object buffers.
+ self.label = label
+ self.release_ok = True
+ self.log = ""
+ self.offset = offset
+ self.itemsize = self.get_itemsize()
+ if format is None: format = self.get_default_format()
+ if shape is None: shape = (len(data),)
+ if strides is None:
+ strides = []
+ cumprod = 1
+ rshape = list(shape)
+ rshape.reverse()
+ for s in rshape:
+ strides.append(cumprod)
+ cumprod *= s
+ strides.reverse()
+ strides = [x * self.itemsize for x in strides]
+ suboffsets = [-1] * len(shape)
+ datashape = [len(data)]
+ p = data
+ while True:
+ p = p[0]
+ if isinstance(p, list): datashape.append(len(p))
+ else: break
+ if len(datashape) > 1:
+ # indirect access
+ self.ndim = len(datashape)
+ shape = datashape
+ self.buffer = self.create_indirect_buffer(data, shape)
+ suboffsets = [0] * (self.ndim-1) + [-1]
+ strides = [sizeof(void*)] * (self.ndim-1) + [self.itemsize]
+ self.suboffsets = self.list_to_sizebuf(suboffsets)
+ else:
+ # strided and/or simple access
+ self.buffer = self.create_buffer(data)
+ self.ndim = len(shape)
+ self.suboffsets = NULL
+
+ try:
+ format = format.encode('ASCII')
+ except AttributeError:
+ pass
+ self.format = format
+ self.len = len(data) * self.itemsize
+
+ self.strides = self.list_to_sizebuf(strides)
+ self.shape = self.list_to_sizebuf(shape)
+
+ def __dealloc__(self):
+ stdlib.free(self.strides)
+ stdlib.free(self.shape)
+ if self.suboffsets != NULL:
+ stdlib.free(self.suboffsets)
+ # must recursively free indirect...
+ else:
+ stdlib.free(self.buffer)
+
+ cdef void* create_buffer(self, data) except NULL:
+ cdef size_t n = <size_t>(len(data) * self.itemsize)
+ cdef char* buf = <char*>stdlib.malloc(n)
+ if buf == NULL:
+ raise MemoryError
+ cdef char* it = buf
+ for value in data:
+ self.write(it, value)
+ it += self.itemsize
+ return buf
+
+ cdef void* create_indirect_buffer(self, data, shape):
+ cdef size_t n = 0
+ cdef void** buf
+ assert shape[0] == len(data)
+ if len(shape) == 1:
+ return self.create_buffer(data)
+ else:
+ shape = shape[1:]
+ n = <size_t>len(data) * sizeof(void*)
+ buf = <void**>stdlib.malloc(n)
+ for idx, subdata in enumerate(data):
+ buf[idx] = self.create_indirect_buffer(subdata, shape)
+ return buf
+
+ cdef Py_ssize_t* list_to_sizebuf(self, l):
+ cdef size_t n = <size_t>len(l) * sizeof(Py_ssize_t)
+ cdef Py_ssize_t* buf = <Py_ssize_t*>stdlib.malloc(n)
+ for i, x in enumerate(l):
+ buf[i] = x
+ return buf
+
+ def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
+ if self.fail:
+ raise ValueError("Failing on purpose")
+
+ self.recieved_flags = []
+ cdef int value
+ for name, value in available_flags:
+ if (value & flags) == value:
+ self.recieved_flags.append(name)
+
+ buffer.buf = <void*>(<char*>self.buffer + (<int>self.offset * self.itemsize))
+ buffer.obj = self
+ buffer.len = self.len
+ buffer.readonly = 0
+ buffer.format = <char*>self.format
+ buffer.ndim = self.ndim
+ buffer.shape = self.shape
+ buffer.strides = self.strides
+ buffer.suboffsets = self.suboffsets
+ buffer.itemsize = self.itemsize
+ buffer.internal = NULL
+ if self.label:
+ msg = "acquired %s" % self.label
+ print msg
+ self.log += msg + "\n"
+
+ def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
+ if buffer.suboffsets != self.suboffsets:
+ self.release_ok = False
+ if self.label:
+ msg = "released %s" % self.label
+ print msg
+ self.log += msg + "\n"
+
+ def printlog(self):
+ print self.log[:-1]
+
+ def resetlog(self):
+ self.log = ""
+
+ cdef int write(self, char* buf, object value) except -1: raise Exception()
+ cdef get_itemsize(self):
+ print "ERROR, not subclassed", self.__class__
+ cdef get_default_format(self):
+ print "ERROR, not subclassed", self.__class__
+
+cdef class CharMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<char*>buf)[0] = <char>value
+ return 0
+ cdef get_itemsize(self): return sizeof(char)
+ cdef get_default_format(self): return b"@b"
+
+cdef class IntMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<int*>buf)[0] = <int>value
+ return 0
+ cdef get_itemsize(self): return sizeof(int)
+ cdef get_default_format(self): return b"@i"
+
+cdef class UnsignedIntMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<unsigned int*>buf)[0] = <unsigned int>value
+ return 0
+ cdef get_itemsize(self): return sizeof(unsigned int)
+ cdef get_default_format(self): return b"@I"
+
+cdef class ShortMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<short*>buf)[0] = <short>value
+ return 0
+ cdef get_itemsize(self): return sizeof(short)
+ cdef get_default_format(self): return b"h" # Try without endian specifier
+
+cdef class UnsignedShortMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<unsigned short*>buf)[0] = <unsigned short>value
+ return 0
+ cdef get_itemsize(self): return sizeof(unsigned short)
+ cdef get_default_format(self): return b"@1H" # Try with repeat count
+
+cdef class FloatMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<float*>buf)[0] = <float>(<double>value)
+ return 0
+ cdef get_itemsize(self): return sizeof(float)
+ cdef get_default_format(self): return b"f"
+
+cdef class DoubleMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<double*>buf)[0] = <double>value
+ return 0
+ cdef get_itemsize(self): return sizeof(double)
+ cdef get_default_format(self): return b"d"
+
+cdef extern from *:
+ void* addr_of_pyobject "(void*)"(object)
+
+cdef class ObjectMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<void**>buf)[0] = addr_of_pyobject(value)
+ return 0
+
+ cdef get_itemsize(self): return sizeof(void*)
+ cdef get_default_format(self): return b"@O"
+
+
+cdef class IntStridedMockBuffer(IntMockBuffer):
+ cdef __cythonbufferdefaults__ = {"mode" : "strided"}
+
+cdef class ErrorBuffer:
+ cdef object label
+
+ def __init__(self, label):
+ self.label = label
+
+ def __getbuffer__(ErrorBuffer self, Py_buffer* buffer, int flags):
+ raise Exception("acquiring %s" % self.label)
+
+ def __releasebuffer__(ErrorBuffer self, Py_buffer* buffer):
+ raise Exception("releasing %s" % self.label)
+
+#
+# Structs
+#
+cdef struct MyStruct:
+ char a
+ char b
+ long long int c
+ int d
+ int e
+
+cdef struct SmallStruct:
+ int a
+ int b
+
+cdef struct NestedStruct:
+ SmallStruct x
+ SmallStruct y
+ int z
+
+cdef packed struct PackedStruct:
+ char a
+ int b
+
+cdef struct NestedPackedStruct:
+ char a
+ int b
+ PackedStruct sub
+ int c
+
+cdef class MyStructMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ cdef MyStruct* s
+ s = <MyStruct*>buf;
+ s.a, s.b, s.c, s.d, s.e = value
+ return 0
+
+ cdef get_itemsize(self): return sizeof(MyStruct)
+ cdef get_default_format(self): return b"2bq2i"
+
+cdef class NestedStructMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ cdef NestedStruct* s
+ s = <NestedStruct*>buf;
+ s.x.a, s.x.b, s.y.a, s.y.b, s.z = value
+ return 0
+
+ cdef get_itemsize(self): return sizeof(NestedStruct)
+ cdef get_default_format(self): return b"2T{ii}i"
+
+cdef class PackedStructMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ cdef PackedStruct* s
+ s = <PackedStruct*>buf;
+ s.a, s.b = value
+ return 0
+
+ cdef get_itemsize(self): return sizeof(PackedStruct)
+ cdef get_default_format(self): return b"^ci"
+
+cdef class NestedPackedStructMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ cdef NestedPackedStruct* s
+ s = <NestedPackedStruct*>buf;
+ s.a, s.b, s.sub.a, s.sub.b, s.c = value
+ return 0
+
+ cdef get_itemsize(self): return sizeof(NestedPackedStruct)
+ cdef get_default_format(self): return b"ci^ci@i"
+