diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-09-12 00:37:17 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-09-12 00:37:17 +0000 |
commit | e69abcd0ae75a32c6a618c9b86a8a550fb4a8db7 (patch) | |
tree | a759c750ee6abe74c98f6a2bb1c341e35551e09a /cpp | |
parent | bdba810bed79ed16ba0e73ff852c7cd41aa39c8b (diff) | |
download | qpid-python-e69abcd0ae75a32c6a618c9b86a8a550fb4a8db7.tar.gz |
* python/qpid/codec.py
Comment typo
* cpp/src/qpid/broker/RecoveryManagerImpl.cpp
Cruft removal
* python/qpid/codec.py
* python/qpid/connection.py
* cpp/src/qpid/framing/AMQFrame.h
* cpp/src/qpid/framing/AMQFrame.cpp
Initial implementation of 0-10 framing -
This uses the new 12 byte frame header, but doesn't
support splitting segments/framesets over multiple
frames yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 62 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 11 |
3 files changed, 54 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 29390a6452..4bd82a321d 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -107,8 +107,6 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - buffer.record(); - //peek at type: Message::shared_ptr message(new Message()); message->decodeHeader(buffer); return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index a7fd068ee4..52425f28b7 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -70,40 +70,70 @@ const AMQBody* AMQFrame::getBody() const { return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); } +// This is now misleadingly named as it is not the frame size as defined in the spec +// (as it also includes the end marker) +uint32_t AMQFrame::size() const{ + return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); +} + +uint32_t AMQFrame::frameOverhead() { + return 12 /*frame header*/ + 1/*0xCE*/; +} + void AMQFrame::encode(Buffer& buffer) const { + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); + buffer.putOctet(flags); buffer.putOctet(getBody()->type()); + buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself) + buffer.putOctet(0); + buffer.putOctet(0x0f & subchannel); buffer.putShort(channel); - buffer.putLong(boost::apply_visitor(SizeVisitor(), body)); + buffer.putLong(0); boost::apply_visitor(EncodeVisitor(buffer), body); buffer.putOctet(0xCE); } -uint32_t AMQFrame::size() const{ - return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); -} - -uint32_t AMQFrame::frameOverhead() { - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/; -} - bool AMQFrame::decode(Buffer& buffer) { - if(buffer.available() < 7) + if(buffer.available() < frameOverhead() - 1) return false; buffer.record(); - uint8_t type = buffer.getOctet(); + uint8_t flags = buffer.getOctet(); + uint8_t framing_version = (flags & 0xc0) >> 6; + if (framing_version != 0) + THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported"); + bof = flags & 0x08; + eof = flags & 0x04; + bos = flags & 0x02; + bos = flags & 0x01; + uint8_t type = buffer.getOctet(); + uint16_t frame_size = buffer.getShort(); + if (frame_size < frameOverhead()-1) + THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small"); + uint8_t reserved1 = buffer.getOctet(); + uint8_t field1 = buffer.getOctet(); + subchannel = field1 & 0x0f; channel = buffer.getShort(); - uint32_t size = buffer.getLong(); - - if(buffer.available() < size+1){ + (void) buffer.getLong(); // reserved2 + + // Verify that the protocol header meets current spec + // TODO: should we check reserved2 against zero as well? - the spec isn't clear + if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) + THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero"); + + // TODO: should no longer care about body size and only pass up B,E,b,e flags + uint16_t body_size = frame_size + 1 - frameOverhead(); + if (buffer.available() < body_size+1u){ buffer.restore(); return false; } - decodeBody(buffer, size, type); + decodeBody(buffer, body_size, type); + uint8_t end = buffer.getOctet(); - if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + if (end != 0xCE) + THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 8c18475d29..a96b0483b7 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -37,14 +37,14 @@ namespace framing { class AMQFrame : public AMQDataBlock { public: - AMQFrame() : channel(0) {} + AMQFrame() : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(0) {} /** Construct a frame with a copy of b */ - AMQFrame(ChannelId c, const AMQBody* b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody* b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) { setBody(*b); } - AMQFrame(ChannelId c, const AMQBody& b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody& b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) { setBody(b); } @@ -97,6 +97,11 @@ class AMQFrame : public AMQDataBlock void decodeBody(Buffer& buffer, uint32_t size, uint8_t type); + bool bof; + bool eof; + bool bos; + bool eos; + uint8_t subchannel; uint16_t channel; Variant body; }; |