summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-30 14:16:38 +0000
committerGordon Sim <gsim@apache.org>2008-04-30 14:16:38 +0000
commita2b413c1c0bea044d0c82ac3e1a99de5036761d1 (patch)
tree01166e7547caf8e5aee5a8569fa53db75f1d24f3
parentc86a77f2ce6150ce8fc0770604d92502acd996b8 (diff)
downloadqpid-python-a2b413c1c0bea044d0c82ac3e1a99de5036761d1.tar.gz
QPID-988 and QPID-989: fixes to framing for final 0-10 spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652386 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp18
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java1
-rw-r--r--python/qpid/codec010.py3
-rw-r--r--python/qpid/framer.py6
8 files changed, 16 insertions, 24 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 3ebb61feb5..d861251dba 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -37,14 +37,12 @@ void AMQFrame::setBody(const AMQBody& b) { body = new BodyHolder(b); }
void AMQFrame::setMethod(ClassId c, MethodId m) { body = new BodyHolder(c,m); }
-// This is now misleadingly named as it is not the frame size as
-// defined in the spec (as it also includes the end marker)
uint32_t AMQFrame::size() const {
return frameOverhead() + body->size();
}
uint32_t AMQFrame::frameOverhead() {
- return 12 /*frame header*/ + 1/*0xCE*/;
+ return 12 /*frame header*/;
}
void AMQFrame::encode(Buffer& buffer) const
@@ -55,18 +53,17 @@ void AMQFrame::encode(Buffer& buffer) const
uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0);
buffer.putOctet(flags);
buffer.putOctet(getBody()->type());
- buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself)
+ buffer.putShort(size());
buffer.putOctet(0);
buffer.putOctet(0x0f & track);
buffer.putShort(channel);
buffer.putLong(0);
body->encode(buffer);
- buffer.putOctet(0xCE);
}
bool AMQFrame::decode(Buffer& buffer)
{
- if(buffer.available() < frameOverhead() - 1)
+ if(buffer.available() < frameOverhead())
return false;
buffer.record();
@@ -80,7 +77,7 @@ bool AMQFrame::decode(Buffer& buffer)
eos = flags & 0x01;
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
- if (frame_size < frameOverhead()-1)
+ if (frame_size < frameOverhead())
throw FramingErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
@@ -96,16 +93,13 @@ bool AMQFrame::decode(Buffer& buffer)
// TODO: should no longer care about body size and only pass up
// B,E,b,e flags
- uint16_t body_size = frame_size + 1 - frameOverhead();
- if (buffer.available() < body_size+1u){
+ uint16_t body_size = frame_size - frameOverhead();
+ if (buffer.available() < body_size){
buffer.restore();
return false;
}
body = new BodyHolder();
body->decode(type,buffer, body_size);
- uint8_t end = buffer.getOctet();
- if (end != 0xCE)
- throw FramingErrorException(QPID_MSG("Frame end not found"));
return true;
}
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 903c7ed100..1f8ffa72bc 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -31,7 +31,7 @@ namespace framing {
FieldTable::~FieldTable() {}
uint32_t FieldTable::size() const {
- uint32_t len(4);
+ uint32_t len(4/*size field*/ + 4/*count field*/);
for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
// shortstr_len_byte + key size + value size
len += 1 + (i->first).size() + (i->second)->size();
@@ -121,8 +121,9 @@ int FieldTable::getInt(const std::string& name) const {
// value = getValue<FieldTable>(name);
//}
-void FieldTable::encode(Buffer& buffer) const{
+void FieldTable::encode(Buffer& buffer) const{
buffer.putLong(size() - 4);
+ buffer.putLong(values.size());
for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
buffer.putShortString(i->first);
i->second->encode(buffer);
@@ -132,10 +133,11 @@ void FieldTable::encode(Buffer& buffer) const{
void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
+ uint32_t count = buffer.getLong();
if (available < len)
throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
- while(buffer.available() > leftover){
+ while(buffer.available() > leftover && count--){
std::string name;
ValuePtr value(new FieldValue);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index 0f6180f54a..a4c46be89c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -272,6 +272,7 @@ abstract class AbstractDecoder implements Decoder
{
long size = readUint32();
int start = count;
+ long fieldCount = readUint32();
Map<String,Object> result = new LinkedHashMap();
while (count < start + size)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index 56b4537719..f68884f812 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -383,6 +383,7 @@ abstract class AbstractEncoder implements Encoder
sizer.writeMap(map);
// XXX: - 4
writeUint32(sizer.size() - 4);
+ writeUint32(map.size());
writeMapEntries(map);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index 2d41a9f516..c966a111ec 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer>
return FRAME_END;
}
case FRAME_END:
- return expect(buf, OutputHandler.FRAME_END, FRAME_HDR);
+ return FRAME_HDR;
default:
throw new IllegalStateException();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
index 8f615cf80d..64dcdb131f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -86,7 +86,6 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
{
hdr.put(buf);
}
- hdr.put((byte) FRAME_END);
hdr.flip();
synchronized (lock)
{
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index 0ba3341665..1c76666de1 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -156,6 +156,7 @@ class Codec(Packer):
def write_map(self, m):
sc = StringCodec(self.spec)
+ sc.write_uint32(len(m))
for k, v in m.items():
type = self.spec.encoding(v.__class__)
if type == None:
@@ -163,10 +164,10 @@ class Codec(Packer):
sc.write_str8(k)
sc.write_uint8(type.code)
type.encode(sc, v)
- # XXX: need to put in count when CPP supports it
self.write_vbin32(sc.encoded)
def read_map(self):
sc = StringCodec(self.spec, self.read_vbin32())
+ count = sc.read_uint32()
result = {}
while sc.encoded:
k = sc.read_str8()
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index fb0e677cee..78a29235cb 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -131,8 +131,6 @@ class Framer(Packer):
track = frame.track & 0x0F
self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
self.write(frame.payload)
- # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++
- self.write("\xCE")
frm.debug("SENT %s", frame)
finally:
self.sock_lock.release()
@@ -141,10 +139,6 @@ class Framer(Packer):
flags, type, size, track, channel = self.unpack(Frame.HEADER)
if flags & 0xF0: raise FramingError()
payload = self.read(size - struct.calcsize(Frame.HEADER))
- # XXX: NOT 0-10 FINAL, TEMPORARY WORKAROUND for C++
- end = self.read(1)
- if end != "\xCE":
- raise FramingError()
frame = Frame(flags, type, track, channel, payload)
frm.debug("RECV %s", frame)
return frame