summaryrefslogtreecommitdiff
path: root/msgpack/_msgpack.pyx
diff options
context:
space:
mode:
authorNaoki INADA <inada-n@eagle>2009-06-26 14:10:20 +0900
committerNaoki INADA <inada-n@eagle>2009-06-26 14:10:20 +0900
commit1781092bd8da6e014f78873516dd1e403205fcd7 (patch)
treed4390a56b73d751dc3dbc276cda21e18f6a9f439 /msgpack/_msgpack.pyx
parentb944eefb96a0f3ad907c8fe704fd14ba87384ed5 (diff)
downloadmsgpack-python-1781092bd8da6e014f78873516dd1e403205fcd7.tar.gz
Implement streaming deserializer.
Diffstat (limited to 'msgpack/_msgpack.pyx')
-rw-r--r--msgpack/_msgpack.pyx162
1 files changed, 147 insertions, 15 deletions
diff --git a/msgpack/_msgpack.pyx b/msgpack/_msgpack.pyx
index cbdcfc5..c37f8ba 100644
--- a/msgpack/_msgpack.pyx
+++ b/msgpack/_msgpack.pyx
@@ -6,13 +6,16 @@ cdef extern from "Python.h":
ctypedef char* const_char_ptr "const char*"
ctypedef struct PyObject
cdef object PyString_FromStringAndSize(const_char_ptr b, Py_ssize_t len)
+ char* PyString_AsString(object o)
cdef extern from "stdlib.h":
- void* malloc(int)
+ void* malloc(size_t)
+ void* realloc(void*, size_t)
void free(void*)
cdef extern from "string.h":
- int memcpy(char*dst, char*src, unsigned int size)
+ void* memcpy(char* dst, char* src, size_t size)
+ void* memmove(char* dst, char* src, size_t size)
cdef extern from "pack.h":
ctypedef int (*msgpack_packer_write)(void* data, const_char_ptr buf, unsigned int len)
@@ -34,8 +37,6 @@ cdef extern from "pack.h":
void msgpack_pack_raw_body(msgpack_packer* pk, char* body, size_t l)
-cdef int BUFF_SIZE=2*1024
-
cdef class Packer:
"""Packer that pack data into strm.
@@ -48,10 +49,7 @@ cdef class Packer:
cdef msgpack_packer pk
cdef object strm
- def __init__(self, strm, int size=0):
- if size <= 0:
- size = BUFF_SIZE
-
+ def __init__(self, strm, int size=4*1024):
self.strm = strm
self.buff = <char*> malloc(size)
self.allocated = size
@@ -147,6 +145,8 @@ cdef class Packer:
if flush:
self.flush()
+ close = flush
+
cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l):
if packer.length + l > packer.allocated:
if packer.length > 0:
@@ -163,20 +163,28 @@ cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l):
return 0
def pack(object o, object stream):
+ u"""pack o and write to stream)."""
packer = Packer(stream)
packer.pack(o)
packer.flush()
-def packs(object o):
+def packb(object o):
+ u"""pack o and return packed bytes."""
buf = StringIO()
packer = Packer(buf)
packer.pack(o)
packer.flush()
return buf.getvalue()
+packs = packb
+
cdef extern from "unpack.h":
ctypedef struct template_context:
- pass
+ PyObject* obj
+ size_t count
+ unsigned int ct
+ PyObject* key
+
int template_execute(template_context* ctx, const_char_ptr data,
size_t len, size_t* off)
void template_init(template_context* ctx)
@@ -188,15 +196,139 @@ def unpacks(object packed_bytes):
cdef const_char_ptr p = packed_bytes
cdef template_context ctx
cdef size_t off = 0
+ cdef int ret
template_init(&ctx)
- template_execute(&ctx, p, len(packed_bytes), &off)
- return template_data(&ctx)
+ ret = template_execute(&ctx, p, len(packed_bytes), &off)
+ if ret == 1:
+ return template_data(&ctx)
+ else:
+ return None
def unpack(object stream):
"""unpack from stream."""
packed = stream.read()
return unpacks(packed)
-cdef class Unpacker:
- """Do nothing. This function is for symmetric to Packer"""
- unpack = staticmethod(unpacks)
+cdef class UnpackIterator(object):
+ cdef object unpacker
+
+ def __init__(self, unpacker):
+ self.unpacker = unpacker
+
+ def __next__(self):
+ return self.unpacker.unpack()
+
+ def __iter__(self):
+ return self
+
+cdef class Unpacker(object):
+ """Unpacker(file_like=None, read_size=4096)
+
+ Streaming unpacker.
+ file_like must have read(n) method.
+ read_size is used like file_like.read(read_size)
+
+ If file_like is None, you can feed() bytes. feed() is useful
+ for unpack from non-blocking stream.
+
+ exsample 1:
+ unpacker = Unpacker(afile)
+ for o in unpacker:
+ do_something(o)
+
+ example 2:
+ unpacker = Unpacker()
+ while 1:
+ buf = astream.read()
+ unpacker.feed(buf)
+ for o in unpacker:
+ do_something(o)
+ """
+
+ cdef template_context ctx
+ cdef char* buf
+ cdef size_t buf_size, buf_head, buf_tail
+ cdef object file_like
+ cdef int read_size
+ cdef object waiting_bytes
+
+ def __init__(self, file_like=None, int read_size=4096):
+ self.file_like = file_like
+ self.read_size = read_size
+ self.waiting_bytes = []
+ self.buf = <char*>malloc(read_size)
+ self.buf_size = read_size
+ self.buf_head = 0
+ self.buf_tail = 0
+ template_init(&self.ctx)
+
+ def feed(self, next_bytes):
+ if not isinstance(next_bytes, str):
+ raise ValueError, "Argument must be bytes object"
+ self.waiting_bytes.append(next_bytes)
+
+ cdef append_buffer(self):
+ cdef char* buf = self.buf
+ cdef Py_ssize_t tail = self.buf_tail
+ cdef Py_ssize_t l
+
+ for b in self.waiting_bytes:
+ l = len(b)
+ memcpy(buf + tail, PyString_AsString(b), l)
+ tail += l
+ self.buf_tail = tail
+ del self.waiting_bytes[:]
+
+ # prepare self.buf
+ cdef fill_buffer(self):
+ cdef Py_ssize_t add_size
+
+ if self.file_like is not None:
+ self.waiting_bytes.append(self.file_like.read(self.read_size))
+
+ if not self.waiting_bytes:
+ return
+
+ add_size = 0
+ for b in self.waiting_bytes:
+ add_size += len(b)
+
+ cdef char* buf = self.buf
+ cdef size_t head = self.buf_head
+ cdef size_t tail = self.buf_tail
+ cdef size_t size = self.buf_size
+
+ if self.buf_tail + add_size <= self.buf_size:
+ # do nothing.
+ pass
+ if self.buf_tail - self.buf_head + add_size < self.buf_size:
+ # move to front.
+ memmove(buf, buf + head, tail - head)
+ tail -= head
+ head = 0
+ else:
+ # expand buffer
+ size = tail + add_size
+ buf = <char*>realloc(<void*>buf, size)
+
+ self.buf = buf
+ self.buf_head = head
+ self.buf_tail = tail
+ self.buf_size = size
+
+ self.append_buffer()
+
+ cpdef unpack(self):
+ """unpack one object"""
+ cdef int ret
+ self.fill_buffer()
+ ret = template_execute(&self.ctx, self.buf, self.buf_tail, &self.buf_head)
+ if ret == 1:
+ return template_data(&self.ctx)
+ elif ret == 0:
+ raise StopIteration, "No more unpack data."
+ else:
+ raise ValueError, "Unpack failed."
+
+ def __iter__(self):
+ return UnpackIterator(self)