summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-09-12 00:37:17 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-09-12 00:37:17 +0000
commite69abcd0ae75a32c6a618c9b86a8a550fb4a8db7 (patch)
treea759c750ee6abe74c98f6a2bb1c341e35551e09a /cpp
parentbdba810bed79ed16ba0e73ff852c7cd41aa39c8b (diff)
downloadqpid-python-e69abcd0ae75a32c6a618c9b86a8a550fb4a8db7.tar.gz
* python/qpid/codec.py
Comment typo * cpp/src/qpid/broker/RecoveryManagerImpl.cpp Cruft removal * python/qpid/codec.py * python/qpid/connection.py * cpp/src/qpid/framing/AMQFrame.h * cpp/src/qpid/framing/AMQFrame.cpp Initial implementation of 0-10 framing - This uses the new 12 byte frame header, but doesn't support splitting segments/framesets over multiple frames yet. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp2
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp62
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h11
3 files changed, 54 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 29390a6452..4bd82a321d 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -107,8 +107,6 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer&
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- buffer.record();
- //peek at type:
Message::shared_ptr message(new Message());
message->decodeHeader(buffer);
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index a7fd068ee4..52425f28b7 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -70,40 +70,70 @@ const AMQBody* AMQFrame::getBody() const {
return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
}
+// This is now misleadingly named as it is not the frame size as defined in the spec
+// (as it also includes the end marker)
+uint32_t AMQFrame::size() const{
+ return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+ return 12 /*frame header*/ + 1/*0xCE*/;
+}
+
void AMQFrame::encode(Buffer& buffer) const
{
+ uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0);
+ buffer.putOctet(flags);
buffer.putOctet(getBody()->type());
+ buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself)
+ buffer.putOctet(0);
+ buffer.putOctet(0x0f & subchannel);
buffer.putShort(channel);
- buffer.putLong(boost::apply_visitor(SizeVisitor(), body));
+ buffer.putLong(0);
boost::apply_visitor(EncodeVisitor(buffer), body);
buffer.putOctet(0xCE);
}
-uint32_t AMQFrame::size() const{
- return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
-}
-
-uint32_t AMQFrame::frameOverhead() {
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
-}
-
bool AMQFrame::decode(Buffer& buffer)
{
- if(buffer.available() < 7)
+ if(buffer.available() < frameOverhead() - 1)
return false;
buffer.record();
- uint8_t type = buffer.getOctet();
+ uint8_t flags = buffer.getOctet();
+ uint8_t framing_version = (flags & 0xc0) >> 6;
+ if (framing_version != 0)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported");
+ bof = flags & 0x08;
+ eof = flags & 0x04;
+ bos = flags & 0x02;
+ bos = flags & 0x01;
+ uint8_t type = buffer.getOctet();
+ uint16_t frame_size = buffer.getShort();
+ if (frame_size < frameOverhead()-1)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small");
+ uint8_t reserved1 = buffer.getOctet();
+ uint8_t field1 = buffer.getOctet();
+ subchannel = field1 & 0x0f;
channel = buffer.getShort();
- uint32_t size = buffer.getLong();
-
- if(buffer.available() < size+1){
+ (void) buffer.getLong(); // reserved2
+
+ // Verify that the protocol header meets current spec
+ // TODO: should we check reserved2 against zero as well? - the spec isn't clear
+ if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero");
+
+ // TODO: should no longer care about body size and only pass up B,E,b,e flags
+ uint16_t body_size = frame_size + 1 - frameOverhead();
+ if (buffer.available() < body_size+1u){
buffer.restore();
return false;
}
- decodeBody(buffer, size, type);
+ decodeBody(buffer, body_size, type);
+
uint8_t end = buffer.getOctet();
- if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ if (end != 0xCE)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
return true;
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 8c18475d29..a96b0483b7 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -37,14 +37,14 @@ namespace framing {
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame() : channel(0) {}
+ AMQFrame() : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(0) {}
/** Construct a frame with a copy of b */
- AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody* b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) {
setBody(*b);
}
- AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody& b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) {
setBody(b);
}
@@ -97,6 +97,11 @@ class AMQFrame : public AMQDataBlock
void decodeBody(Buffer& buffer, uint32_t size, uint8_t type);
+ bool bof;
+ bool eof;
+ bool bos;
+ bool eos;
+ uint8_t subchannel;
uint16_t channel;
Variant body;
};