diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-30 14:16:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-30 14:16:38 +0000 |
commit | a2b413c1c0bea044d0c82ac3e1a99de5036761d1 (patch) | |
tree | 01166e7547caf8e5aee5a8569fa53db75f1d24f3 | |
parent | c86a77f2ce6150ce8fc0770604d92502acd996b8 (diff) | |
download | qpid-python-a2b413c1c0bea044d0c82ac3e1a99de5036761d1.tar.gz |
QPID-988 and QPID-989: fixes to framing for final 0-10 spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652386 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 16 insertions, 24 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 3ebb61feb5..d861251dba 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -37,14 +37,12 @@ void AMQFrame::setBody(const AMQBody& b) { body = new BodyHolder(b); } void AMQFrame::setMethod(ClassId c, MethodId m) { body = new BodyHolder(c,m); } -// This is now misleadingly named as it is not the frame size as -// defined in the spec (as it also includes the end marker) uint32_t AMQFrame::size() const { return frameOverhead() + body->size(); } uint32_t AMQFrame::frameOverhead() { - return 12 /*frame header*/ + 1/*0xCE*/; + return 12 /*frame header*/; } void AMQFrame::encode(Buffer& buffer) const @@ -55,18 +53,17 @@ void AMQFrame::encode(Buffer& buffer) const uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); buffer.putOctet(flags); buffer.putOctet(getBody()->type()); - buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself) + buffer.putShort(size()); buffer.putOctet(0); buffer.putOctet(0x0f & track); buffer.putShort(channel); buffer.putLong(0); body->encode(buffer); - buffer.putOctet(0xCE); } bool AMQFrame::decode(Buffer& buffer) { - if(buffer.available() < frameOverhead() - 1) + if(buffer.available() < frameOverhead()) return false; buffer.record(); @@ -80,7 +77,7 @@ bool AMQFrame::decode(Buffer& buffer) eos = flags & 0x01; uint8_t type = buffer.getOctet(); uint16_t frame_size = buffer.getShort(); - if (frame_size < frameOverhead()-1) + if (frame_size < frameOverhead()) throw FramingErrorException(QPID_MSG("Frame size too small")); uint8_t reserved1 = buffer.getOctet(); uint8_t field1 = buffer.getOctet(); @@ -96,16 +93,13 @@ bool AMQFrame::decode(Buffer& buffer) // TODO: should no longer care about body size and only pass up // B,E,b,e flags - uint16_t body_size = frame_size + 1 - frameOverhead(); - if (buffer.available() < body_size+1u){ + uint16_t body_size = frame_size - frameOverhead(); + if (buffer.available() < body_size){ buffer.restore(); return false; } body = new BodyHolder(); body->decode(type,buffer, body_size); - uint8_t end = buffer.getOctet(); - if (end != 0xCE) - throw FramingErrorException(QPID_MSG("Frame end not found")); return true; } diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index 903c7ed100..1f8ffa72bc 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -31,7 +31,7 @@ namespace framing { FieldTable::~FieldTable() {} uint32_t FieldTable::size() const { - uint32_t len(4); + uint32_t len(4/*size field*/ + 4/*count field*/); for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { // shortstr_len_byte + key size + value size len += 1 + (i->first).size() + (i->second)->size(); @@ -121,8 +121,9 @@ int FieldTable::getInt(const std::string& name) const { // value = getValue<FieldTable>(name); //} -void FieldTable::encode(Buffer& buffer) const{ +void FieldTable::encode(Buffer& buffer) const{ buffer.putLong(size() - 4); + buffer.putLong(values.size()); for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { buffer.putShortString(i->first); i->second->encode(buffer); @@ -132,10 +133,11 @@ void FieldTable::encode(Buffer& buffer) const{ void FieldTable::decode(Buffer& buffer){ uint32_t len = buffer.getLong(); uint32_t available = buffer.available(); + uint32_t count = buffer.getLong(); if (available < len) throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); uint32_t leftover = available - len; - while(buffer.available() > leftover){ + while(buffer.available() > leftover && count--){ std::string name; ValuePtr value(new FieldValue); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index 0f6180f54a..a4c46be89c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -272,6 +272,7 @@ abstract class AbstractDecoder implements Decoder { long size = readUint32(); int start = count; + long fieldCount = readUint32(); Map<String,Object> result = new LinkedHashMap(); while (count < start + size) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index 56b4537719..f68884f812 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -383,6 +383,7 @@ abstract class AbstractEncoder implements Encoder sizer.writeMap(map); // XXX: - 4 writeUint32(sizer.size() - 4); + writeUint32(map.size()); writeMapEntries(map); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 2d41a9f516..c966a111ec 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer> return FRAME_END; } case FRAME_END: - return expect(buf, OutputHandler.FRAME_END, FRAME_HDR); + return FRAME_HDR; default: throw new IllegalStateException(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index 8f615cf80d..64dcdb131f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -86,7 +86,6 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate { hdr.put(buf); } - hdr.put((byte) FRAME_END); hdr.flip(); synchronized (lock) { diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 0ba3341665..1c76666de1 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -156,6 +156,7 @@ class Codec(Packer): def write_map(self, m): sc = StringCodec(self.spec) + sc.write_uint32(len(m)) for k, v in m.items(): type = self.spec.encoding(v.__class__) if type == None: @@ -163,10 +164,10 @@ class Codec(Packer): sc.write_str8(k) sc.write_uint8(type.code) type.encode(sc, v) - # XXX: need to put in count when CPP supports it self.write_vbin32(sc.encoded) def read_map(self): sc = StringCodec(self.spec, self.read_vbin32()) + count = sc.read_uint32() result = {} while sc.encoded: k = sc.read_str8() diff --git a/python/qpid/framer.py b/python/qpid/framer.py index fb0e677cee..78a29235cb 100644 --- a/python/qpid/framer.py +++ b/python/qpid/framer.py @@ -131,8 +131,6 @@ class Framer(Packer): track = frame.track & 0x0F self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) self.write(frame.payload) - # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++ - self.write("\xCE") frm.debug("SENT %s", frame) finally: self.sock_lock.release() @@ -141,10 +139,6 @@ class Framer(Packer): flags, type, size, track, channel = self.unpack(Frame.HEADER) if flags & 0xF0: raise FramingError() payload = self.read(size - struct.calcsize(Frame.HEADER)) - # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++ - end = self.read(1) - if end != "\xCE": - raise FramingError() frame = Frame(flags, type, track, channel, payload) frm.debug("RECV %s", frame) return frame |