diff options
author | Naoki INADA <inada-n@eagle> | 2009-06-26 14:10:20 +0900 |
---|---|---|
committer | Naoki INADA <inada-n@eagle> | 2009-06-26 14:10:20 +0900 |
commit | 1781092bd8da6e014f78873516dd1e403205fcd7 (patch) | |
tree | d4390a56b73d751dc3dbc276cda21e18f6a9f439 /msgpack/_msgpack.pyx | |
parent | b944eefb96a0f3ad907c8fe704fd14ba87384ed5 (diff) | |
download | msgpack-python-1781092bd8da6e014f78873516dd1e403205fcd7.tar.gz |
Implement streaming deserializer.
Diffstat (limited to 'msgpack/_msgpack.pyx')
-rw-r--r-- | msgpack/_msgpack.pyx | 162 |
1 files changed, 147 insertions, 15 deletions
diff --git a/msgpack/_msgpack.pyx b/msgpack/_msgpack.pyx index cbdcfc5..c37f8ba 100644 --- a/msgpack/_msgpack.pyx +++ b/msgpack/_msgpack.pyx @@ -6,13 +6,16 @@ cdef extern from "Python.h": ctypedef char* const_char_ptr "const char*" ctypedef struct PyObject cdef object PyString_FromStringAndSize(const_char_ptr b, Py_ssize_t len) + char* PyString_AsString(object o) cdef extern from "stdlib.h": - void* malloc(int) + void* malloc(size_t) + void* realloc(void*, size_t) void free(void*) cdef extern from "string.h": - int memcpy(char*dst, char*src, unsigned int size) + void* memcpy(char* dst, char* src, size_t size) + void* memmove(char* dst, char* src, size_t size) cdef extern from "pack.h": ctypedef int (*msgpack_packer_write)(void* data, const_char_ptr buf, unsigned int len) @@ -34,8 +37,6 @@ cdef extern from "pack.h": void msgpack_pack_raw_body(msgpack_packer* pk, char* body, size_t l) -cdef int BUFF_SIZE=2*1024 - cdef class Packer: """Packer that pack data into strm. @@ -48,10 +49,7 @@ cdef class Packer: cdef msgpack_packer pk cdef object strm - def __init__(self, strm, int size=0): - if size <= 0: - size = BUFF_SIZE - + def __init__(self, strm, int size=4*1024): self.strm = strm self.buff = <char*> malloc(size) self.allocated = size @@ -147,6 +145,8 @@ cdef class Packer: if flush: self.flush() + close = flush + cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l): if packer.length + l > packer.allocated: if packer.length > 0: @@ -163,20 +163,28 @@ cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l): return 0 def pack(object o, object stream): + u"""pack o and write to stream).""" packer = Packer(stream) packer.pack(o) packer.flush() -def packs(object o): +def packb(object o): + u"""pack o and return packed bytes.""" buf = StringIO() packer = Packer(buf) packer.pack(o) packer.flush() return buf.getvalue() +packs = packb + cdef extern from "unpack.h": ctypedef struct template_context: - pass + PyObject* obj + size_t count + unsigned int ct + PyObject* key + int template_execute(template_context* ctx, const_char_ptr data, size_t len, size_t* off) void template_init(template_context* ctx) @@ -188,15 +196,139 @@ def unpacks(object packed_bytes): cdef const_char_ptr p = packed_bytes cdef template_context ctx cdef size_t off = 0 + cdef int ret template_init(&ctx) - template_execute(&ctx, p, len(packed_bytes), &off) - return template_data(&ctx) + ret = template_execute(&ctx, p, len(packed_bytes), &off) + if ret == 1: + return template_data(&ctx) + else: + return None def unpack(object stream): """unpack from stream.""" packed = stream.read() return unpacks(packed) -cdef class Unpacker: - """Do nothing. This function is for symmetric to Packer""" - unpack = staticmethod(unpacks) +cdef class UnpackIterator(object): + cdef object unpacker + + def __init__(self, unpacker): + self.unpacker = unpacker + + def __next__(self): + return self.unpacker.unpack() + + def __iter__(self): + return self + +cdef class Unpacker(object): + """Unpacker(file_like=None, read_size=4096) + + Streaming unpacker. + file_like must have read(n) method. + read_size is used like file_like.read(read_size) + + If file_like is None, you can feed() bytes. feed() is useful + for unpack from non-blocking stream. + + exsample 1: + unpacker = Unpacker(afile) + for o in unpacker: + do_something(o) + + example 2: + unpacker = Unpacker() + while 1: + buf = astream.read() + unpacker.feed(buf) + for o in unpacker: + do_something(o) + """ + + cdef template_context ctx + cdef char* buf + cdef size_t buf_size, buf_head, buf_tail + cdef object file_like + cdef int read_size + cdef object waiting_bytes + + def __init__(self, file_like=None, int read_size=4096): + self.file_like = file_like + self.read_size = read_size + self.waiting_bytes = [] + self.buf = <char*>malloc(read_size) + self.buf_size = read_size + self.buf_head = 0 + self.buf_tail = 0 + template_init(&self.ctx) + + def feed(self, next_bytes): + if not isinstance(next_bytes, str): + raise ValueError, "Argument must be bytes object" + self.waiting_bytes.append(next_bytes) + + cdef append_buffer(self): + cdef char* buf = self.buf + cdef Py_ssize_t tail = self.buf_tail + cdef Py_ssize_t l + + for b in self.waiting_bytes: + l = len(b) + memcpy(buf + tail, PyString_AsString(b), l) + tail += l + self.buf_tail = tail + del self.waiting_bytes[:] + + # prepare self.buf + cdef fill_buffer(self): + cdef Py_ssize_t add_size + + if self.file_like is not None: + self.waiting_bytes.append(self.file_like.read(self.read_size)) + + if not self.waiting_bytes: + return + + add_size = 0 + for b in self.waiting_bytes: + add_size += len(b) + + cdef char* buf = self.buf + cdef size_t head = self.buf_head + cdef size_t tail = self.buf_tail + cdef size_t size = self.buf_size + + if self.buf_tail + add_size <= self.buf_size: + # do nothing. + pass + if self.buf_tail - self.buf_head + add_size < self.buf_size: + # move to front. + memmove(buf, buf + head, tail - head) + tail -= head + head = 0 + else: + # expand buffer + size = tail + add_size + buf = <char*>realloc(<void*>buf, size) + + self.buf = buf + self.buf_head = head + self.buf_tail = tail + self.buf_size = size + + self.append_buffer() + + cpdef unpack(self): + """unpack one object""" + cdef int ret + self.fill_buffer() + ret = template_execute(&self.ctx, self.buf, self.buf_tail, &self.buf_head) + if ret == 1: + return template_data(&self.ctx) + elif ret == 0: + raise StopIteration, "No more unpack data." + else: + raise ValueError, "Unpack failed." + + def __iter__(self): + return UnpackIterator(self) |