diff options
-rw-r--r-- | amqp/async.py | 13 | ||||
-rw-r--r-- | amqp/transport.py | 51 |
2 files changed, 64 insertions, 0 deletions
diff --git a/amqp/async.py b/amqp/async.py new file mode 100644 index 0000000..f1f802c --- /dev/null +++ b/amqp/async.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import + +_current_loop = None + + +def get_event_loop(): + return _current_loop + + +def set_event_loop(loop): + global _current_loop + _current_loop = loop + return loop diff --git a/amqp/transport.py b/amqp/transport.py index 975ced1..79eb2f2 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -157,6 +157,57 @@ class _AbstractTransport(object): self.sock = None self.connected = False + def _read_frame(self, then, unpack_from=unpack_from): + read = self._read + + buf = bytearray(7) + bufv = memoryview(buf) + Hr = Br = Cr = 0 + while Hr < 7: + try: + n = self.read(bufv[Hr:], 7 - Hr) + except (OSError, IOError, socket.error) as exc: + if exc.errno not in _UNAVIL: + raise + yield + if n == 0: + raise ConnectionError('socket disconnected') + Hr += n + + frame_type, channel, size = unpack_from('>BHI', bufv) + + buf = bytearray(size) + bufv = memoryview(buf) + while Br < size: + try: + n = read(bufv[Br:], size - Br) + except (OSError, IOError, socket.error) as exc: + if exc.errno not in _UNAVAIL: + raise + yield + if n == 0: + raise ConnectionError('socket disconnected') + Br += n + + chbuf = bytearray(size) + chbufv = memoryview(chbuf) + + while Cr < 1: + try: + n = read(chbuf[Cr:], 1 - Cr) + except (OSError, IOError, socket.error) as exc: + if exc.errno not in _UNAVAIL: + raise + yield + if n == 0: + raise ConnectionError('socket disconnected') + Cr += n + + if ord(chbuf[0]) != 206: + raise UnexpectedFrame( + 'Received 0x{0:02x} while expecting 0xce'.format(chbuf[0])) + then(frame_type, channel, payload) + def read_frame(self, unpack=unpack): read = self._read try: |