diff options
Diffstat (limited to 'qpid/cpp/src/qpid/framing')
68 files changed, 5186 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/framing/AMQBody.cpp b/qpid/cpp/src/qpid/framing/AMQBody.cpp new file mode 100644 index 0000000000..b3eeae0615 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQBody.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include <iostream> + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQBody& body) +{ + body.print(out); + return out; +} + +AMQBody::~AMQBody() {} + +namespace { +struct MatchBodies : public AMQBodyConstVisitor { + const AMQBody& body; + bool match; + + MatchBodies(const AMQBody& b) : body(b), match(false) {} + virtual ~MatchBodies() {} + + virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); } + virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); } + virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); } + virtual void visit(const AMQMethodBody& x) { + const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body); + match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId()); + } +}; + +} +bool AMQBody::match(const AMQBody& a, const AMQBody& b) { + MatchBodies matcher(a); + b.accept(matcher); + return matcher.match; +} + +}} // namespace diff --git a/qpid/cpp/src/qpid/framing/AMQBody.h b/qpid/cpp/src/qpid/framing/AMQBody.h new file mode 100644 index 0000000000..56d1d250c1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQBody.h @@ -0,0 +1,86 @@ +#ifndef QPID_FRAMING_AMQBODY_H +#define QPID_FRAMING_AMQBODY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/RefCounted.h" +#include "qpid/framing/BodyFactory.h" +#include <boost/intrusive_ptr.hpp> +#include <ostream> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class Buffer; + +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +struct AMQBodyConstVisitor { + virtual ~AMQBodyConstVisitor() {} + virtual void visit(const AMQHeaderBody&) = 0; + virtual void visit(const AMQContentBody&) = 0; + virtual void visit(const AMQHeartbeatBody&) = 0; + virtual void visit(const AMQMethodBody&) = 0; +}; + +class QPID_COMMON_CLASS_EXTERN AMQBody : public RefCounted { + public: + AMQBody() {} + QPID_COMMON_EXTERN virtual ~AMQBody(); + + // Make AMQBody copyable even though RefCounted. + AMQBody(const AMQBody&) : RefCounted() {} + AMQBody& operator=(const AMQBody&) { return *this; } + + virtual uint8_t type() const = 0; + + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t=0) = 0; + virtual uint32_t encodedSize() const = 0; + + virtual void print(std::ostream& out) const = 0; + virtual void accept(AMQBodyConstVisitor&) const = 0; + + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ); + virtual boost::intrusive_ptr<AMQBody> clone() const = 0; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; + +enum BodyTypes { + METHOD_BODY = 1, + HEADER_BODY = 2, + CONTENT_BODY = 3, + HEARTBEAT_BODY = 8 +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_AMQBODY_H*/ diff --git a/qpid/cpp/src/qpid/framing/AMQCommandControlBody.h b/qpid/cpp/src/qpid/framing/AMQCommandControlBody.h new file mode 100644 index 0000000000..d12b70a168 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQCommandControlBody.h @@ -0,0 +1,70 @@ +#ifndef QPID_FRAMING_AMQCOMMANDCONTROLBODY_H +#define QPID_FRAMING_AMQCOMMANDCONTROLBODY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/amqp_0_10/helpers.h" +#include "qpid/framing/AMQBody.h" + +namespace qpid { +namespace framing { + +/** + * AMQBody wrapper for Command and Control. + * Temporary measure to fit with old code. + */ +template <class T> class AMQCommandControlBody : public AMQBody, public T +{ + public: + virtual uint8_t type() const { return 100+T::SEGMENT_TYPE; } + + virtual void encode(Buffer& buffer) const { + Codec::encode(buffer.getIterator(), static_cast<const T&>(*this)); + } + virtual void decode(Buffer& buffer, uint32_t=0) { + Codec::decode(buffer.getIterator(), static_cast<T&>(*this)); + } + virtual uint32_t encodedSize() const { + Codec::size(buffer.getIterator(), static_cast<const T&>(*this)); + } + + virtual void print(std::ostream& out) const { + out << static_cast<const T&>(*this) << endl; + } + virtual void AMQBody::accept(AMQBodyConstVisitor&) const { assert(0); } +}; + +class CommandBody : public AMQCommandControlBody<amqp_0_10::Command> { + using Command::accept; // Hide AMQBody::accept + virtual Command* getCommand() { return this; } + virtual const Command* getCommand() const { return this; } +}; + +class ControlBody : public AMQCommandControlBody<amqp_0_10::Control> { + using Control::accept; // Hide AMQBody::accept + virtual Control* getControl() { return this; } + virtual const Control* getControl() const { return this; } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_AMQCOMMANDCONTROLBODY_H*/ diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.cpp b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp new file mode 100644 index 0000000000..72f7d9978e --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQContentBody.h" +#include <iostream> + +qpid::framing::AMQContentBody::AMQContentBody(){ +} + +qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){ +} + +uint32_t qpid::framing::AMQContentBody::encodedSize() const{ + return data.size(); +} +void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{ + buffer.putRawData(data); +} +void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ + buffer.getRawData(data, _size); +} + +void qpid::framing::AMQContentBody::print(std::ostream& out) const +{ + out << "content (" << encodedSize() << " bytes)"; + const size_t max = 32; + out << " " << data.substr(0, max); + if (data.size() > max) out << "..."; +} diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.h b/qpid/cpp/src/qpid/framing/AMQContentBody.h new file mode 100644 index 0000000000..e25451e354 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/CommonImportExport.h" + +#ifndef _AMQContentBody_ +#define _AMQContentBody_ + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQContentBody : public AMQBody +{ + string data; + +public: + QPID_COMMON_EXTERN AMQContentBody(); + QPID_COMMON_EXTERN AMQContentBody(const string& data); + inline virtual ~AMQContentBody(){} + inline uint8_t type() const { return CONTENT_BODY; }; + inline const string& getData() const { return data; } + inline string& getData() { return data; } + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size); + QPID_COMMON_EXTERN void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQDataBlock.h b/qpid/cpp/src/qpid/framing/AMQDataBlock.h new file mode 100644 index 0000000000..7f0d0dc2b5 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQDataBlock.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Buffer.h" + +#ifndef _AMQDataBlock_ +#define _AMQDataBlock_ + +namespace qpid { +namespace framing { + +class AMQDataBlock +{ +public: + virtual ~AMQDataBlock() {} + virtual void encode(Buffer& buffer) const = 0; + virtual bool decode(Buffer& buffer) = 0; + virtual uint32_t encodedSize() const = 0; +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp new file mode 100644 index 0000000000..cd60cd971f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQFrame.h" + +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/BodyFactory.h" +#include "qpid/framing/MethodBodyFactory.h" +#include "qpid/Msg.h" + +#include <boost/format.hpp> +#include <iostream> + +namespace qpid { +namespace framing { + +void AMQFrame::init() { bof = eof = bos = eos = true; subchannel=0; channel=0; } + +AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); } + +AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); } + +AMQFrame::~AMQFrame() {} + +void AMQFrame::setMethod(ClassId c, MethodId m) { body = MethodBodyFactory::create(c,m); } + +uint32_t AMQFrame::encodedSize() const { + uint32_t size = frameOverhead() + body->encodedSize(); + if (body->getMethod()) + size += sizeof(ClassId)+sizeof(MethodId); + return size; +} + +uint32_t AMQFrame::frameOverhead() { + return 12 /*frame header*/; +} + +uint16_t AMQFrame::DECODE_SIZE_MIN=4; + +uint16_t AMQFrame::decodeSize(char* data) { + Buffer buf(data+2, DECODE_SIZE_MIN); + return buf.getShort(); +} + +void AMQFrame::encode(Buffer& buffer) const +{ + //set track first (controls on track 0, everything else on 1): + uint8_t track = getBody()->type() ? 1 : 0; + + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); + buffer.putOctet(flags); + buffer.putOctet(getBody()->type()); + buffer.putShort(encodedSize()); + buffer.putOctet(0); + buffer.putOctet(0x0f & track); + buffer.putShort(channel); + buffer.putLong(0); + const AMQMethodBody* method=getMethod(); + if (method) { + buffer.putOctet(method->amqpClassId()); + buffer.putOctet(method->amqpMethodId()); + } + body->encode(buffer); +} + +bool AMQFrame::decode(Buffer& buffer) +{ + if(buffer.available() < frameOverhead()) + return false; + buffer.record(); + + uint8_t flags = buffer.getOctet(); + uint8_t framing_version = (flags & 0xc0) >> 6; + if (framing_version != 0) + throw FramingErrorException(QPID_MSG("Framing version unsupported")); + bof = flags & 0x08; + eof = flags & 0x04; + bos = flags & 0x02; + eos = flags & 0x01; + uint8_t type = buffer.getOctet(); + uint16_t frame_size = buffer.getShort(); + if (frame_size < frameOverhead()) + throw FramingErrorException(QPID_MSG("Frame size too small " << frame_size)); + uint8_t reserved1 = buffer.getOctet(); + uint8_t field1 = buffer.getOctet(); + subchannel = field1 & 0x0f; + channel = buffer.getShort(); + (void) buffer.getLong(); // reserved2 + + // Verify that the protocol header meets current spec + // TODO: should we check reserved2 against zero as well? - the + // spec isn't clear + if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) + throw FramingErrorException(QPID_MSG("Reserved bits not zero")); + + // TODO: should no longer care about body size and only pass up + // B,E,b,e flags + uint16_t body_size = frame_size - frameOverhead(); + if (buffer.available() < body_size){ + buffer.restore(); + return false; + } + + switch(type) + { + case 0://CONTROL + case METHOD_BODY: { + ClassId c = buffer.getOctet(); + MethodId m = buffer.getOctet(); + body = MethodBodyFactory::create(c, m); + break; + } + case HEADER_BODY: body = BodyFactory::create<AMQHeaderBody>(); break; + case CONTENT_BODY: body = BodyFactory::create<AMQContentBody>(); break; + case HEARTBEAT_BODY: body = BodyFactory::create<AMQHeartbeatBody>(); break; + default: + throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type)); + } + body->decode(buffer, body_size); + + return true; +} + +std::ostream& operator<<(std::ostream& out, const AMQFrame& f) +{ + return + out << "Frame[" + << (f.getBof() ? "B" : "") << (f.getEof() ? "E" : "") + << (f.getBos() ? "b" : "") << (f.getEos() ? "e" : "") << "; " + << "channel=" << f.getChannel() << "; " << *f.getBody() + << "]"; +} + + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h new file mode 100644 index 0000000000..c669d12bc0 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -0,0 +1,112 @@ +#ifndef _AMQFrame_ +#define _AMQFrame_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQDataBlock.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include "qpid/framing/ProtocolVersion.h" +#include <boost/intrusive_ptr.hpp> +#include <boost/cast.hpp> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock +{ + public: + QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0); + QPID_COMMON_EXTERN AMQFrame(const AMQBody& b); + QPID_COMMON_EXTERN ~AMQFrame(); + + ChannelId getChannel() const { return channel; } + void setChannel(ChannelId c) { channel = c; } + + AMQBody* getBody() { return body.get(); } + const AMQBody* getBody() const { return body.get(); } + + AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; } + const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; } + + void setMethod(ClassId c, MethodId m); + + template <class T> T* castBody() { + return boost::polymorphic_downcast<T*>(getBody()); + } + + template <class T> const T* castBody() const { + return boost::polymorphic_downcast<const T*>(getBody()); + } + + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN bool decode(Buffer& buffer); + QPID_COMMON_EXTERN uint32_t encodedSize() const; + + // 0-10 terminology: first/last frame (in segment) first/last segment (in assembly) + + bool isFirstSegment() const { return bof; } + bool isLastSegment() const { return eof; } + bool isFirstFrame() const { return bos; } + bool isLastFrame() const { return eos; } + + void setFirstSegment(bool set=true) { bof = set; } + void setLastSegment(bool set=true) { eof = set; } + void setFirstFrame(bool set=true) { bos = set; } + void setLastFrame(bool set=true) { eos = set; } + + // 0-9 terminology: beginning/end of frameset, beginning/end of segment. + + bool getBof() const { return bof; } + void setBof(bool isBof) { bof = isBof; } + bool getEof() const { return eof; } + void setEof(bool isEof) { eof = isEof; } + + bool getBos() const { return bos; } + void setBos(bool isBos) { bos = isBos; } + bool getEos() const { return eos; } + void setEos(bool isEos) { eos = isEos; } + + static uint16_t DECODE_SIZE_MIN; + QPID_COMMON_EXTERN static uint32_t frameOverhead(); + /** Must point to at least DECODE_SIZE_MIN bytes of data */ + static uint16_t decodeSize(char* data); + + private: + void init(); + + boost::intrusive_ptr<AMQBody> body; + uint16_t channel : 16; + uint8_t subchannel : 8; + bool bof : 1; + bool eof : 1; + bool bos : 1; + bool eos : 1; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AMQFrame&); + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp new file mode 100644 index 0000000000..14218f1b45 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +uint32_t qpid::framing::AMQHeaderBody::encodedSize() const { + return properties.encodedSize(); +} + +void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { + properties.encode(buffer); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size) { + uint32_t limit = buffer.available() - size; + while (buffer.available() > limit + 2) { + uint32_t len = buffer.getLong(); + uint16_t type = buffer.getShort(); + if (!properties.decode(buffer, len, type)) { + // TODO: should just skip & keep for later dispatch. + throw Exception(QPID_MSG("Unexpected property type: " << type)); + } + } +} + +uint64_t qpid::framing::AMQHeaderBody::getContentLength() const +{ + const MessageProperties* mProps = get<MessageProperties>(); + if (mProps) + return mProps->getContentLength(); + return 0; +} + +void qpid::framing::AMQHeaderBody::print(std::ostream& out) const +{ + out << "header (" << encodedSize() << " bytes)"; + out << "; properties={"; + properties.print(out); + out << "}"; +} + +void qpid::framing::AMQHeaderBody::accept(AMQBodyConstVisitor& v) const { + v.visit(*this); +} diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h new file mode 100644 index 0000000000..a8c326969a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -0,0 +1,109 @@ +#ifndef QPID_FRAMING_AMQHEADERBODY_H +#define QPID_FRAMING_AMQHEADERBODY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/CommonImportExport.h" +#include <iostream> + +#include <boost/optional.hpp> + + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody +{ + template <class T> struct OptProps { boost::optional<T> props; }; + template <class Base, class T> + struct PropSet : public Base, public OptProps<T> { + uint32_t encodedSize() const { + const boost::optional<T>& p=this->OptProps<T>::props; + return (p ? p->encodedSize() : 0) + Base::encodedSize(); + } + void encode(Buffer& buffer) const { + const boost::optional<T>& p=this->OptProps<T>::props; + if (p) p->encode(buffer); + Base::encode(buffer); + } + bool decode(Buffer& buffer, uint32_t size, uint16_t type) { + boost::optional<T>& p=this->OptProps<T>::props; + if (type == T::TYPE) { + p=T(); + p->decodeStructBody(buffer, size); + return true; + } + else + return Base::decode(buffer, size, type); + } + void print(std::ostream& out) const { + const boost::optional<T>& p=this->OptProps<T>::props; + if (p) out << *p; + Base::print(out); + } + }; + + struct Empty { + uint32_t encodedSize() const { return 0; } + void encode(Buffer&) const {}; + bool decode(Buffer&, uint32_t, uint16_t) const { return false; }; + void print(std::ostream&) const {} + }; + + // Could use boost::mpl::fold to construct a larger set. + typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties; + + Properties properties; + +public: + + inline uint8_t type() const { return HEADER_BODY; } + + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size); + QPID_COMMON_EXTERN uint64_t getContentLength() const; + QPID_COMMON_EXTERN void print(std::ostream& out) const; + QPID_COMMON_EXTERN void accept(AMQBodyConstVisitor&) const; + + template <class T> T* get(bool create) { + boost::optional<T>& p=properties.OptProps<T>::props; + if (create && !p) p=T(); + return p.get_ptr(); + } + + template <class T> const T* get() const { + return properties.OptProps<T>::props.get_ptr(); + } + + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +}} + + + +#endif /*!QPID_FRAMING_AMQHEADERBODY_H*/ diff --git a/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp new file mode 100644 index 0000000000..477616221c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQHeartbeatBody.h" +#include <iostream> + +qpid::framing::AMQHeartbeatBody::~AMQHeartbeatBody() {} + +void qpid::framing::AMQHeartbeatBody::print(std::ostream& out) const { + out << "heartbeat"; +} diff --git a/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h new file mode 100644 index 0000000000..19ac2be013 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/CommonImportExport.h" + +#ifndef _AMQHeartbeatBody_ +#define _AMQHeartbeatBody_ + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQHeartbeatBody : public AMQBody +{ +public: + QPID_COMMON_EXTERN virtual ~AMQHeartbeatBody(); + inline uint32_t encodedSize() const { return 0; } + inline uint8_t type() const { return HEARTBEAT_BODY; } + inline void encode(Buffer& ) const {} + inline void decode(Buffer& , uint32_t /*size*/) {} + QPID_COMMON_EXTERN virtual void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +} +} + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp new file mode 100644 index 0000000000..594af4c6dc --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" + +namespace qpid { +namespace framing { + +AMQMethodBody::~AMQMethodBody() {} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.h b/qpid/cpp/src/qpid/framing/AMQMethodBody.h new file mode 100644 index 0000000000..c634180712 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.h @@ -0,0 +1,72 @@ +#ifndef _AMQMethodBody_ +#define _AMQMethodBody_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/CommonImportExport.h" + +#include <boost/shared_ptr.hpp> +#include <ostream> +#include <assert.h> + +namespace qpid { +namespace framing { + +class Buffer; +class AMQP_ServerOperations; +class MethodBodyConstVisitor; + +class AMQMethodBody : public AMQBody { + public: + AMQMethodBody() {} + QPID_COMMON_EXTERN virtual ~AMQMethodBody(); + + virtual void accept(MethodBodyConstVisitor&) const = 0; + + virtual MethodId amqpMethodId() const = 0; + virtual ClassId amqpClassId() const = 0; + virtual bool isContentBearing() const = 0; + virtual bool resultExpected() const = 0; + virtual bool responseExpected() const = 0; + + template <class T> bool isA() const { + return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; + } + + virtual uint32_t encodedSize() const = 0; + virtual uint8_t type() const { return METHOD_BODY; } + + virtual bool isSync() const { return false; /*only ModelMethods can have the sync flag set*/ } + virtual void setSync(bool) const { /*only ModelMethods can have the sync flag set*/ } + + AMQMethodBody* getMethod() { return this; } + const AMQMethodBody* getMethod() const { return this; } + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h new file mode 100644 index 0000000000..42139c7937 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file used to be auto-generated by Qpid Gentools v.0.1 + * its here temporarily until we get a full solution to multi-version support + */ +#ifndef qpid_framing_highestProtocolVersion__ +#define qpid_framing_highestProtocolVersion__ + +#include "qpid/framing/ProtocolVersion.h" + + +namespace qpid { +namespace framing { + +static ProtocolVersion highestProtocolVersion(0, 10); + +} /* namespace framing */ +} /* namespace qpid */ + +#endif diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp new file mode 100644 index 0000000000..2e6433a82f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AccumulatedAck.h" + +#include <assert.h> +#include <iostream> +#include <boost/bind.hpp> + +using std::list; +using std::max; +using std::min; +using namespace qpid::framing; + +AccumulatedAck::AccumulatedAck(SequenceNumber r) : mark(r) {} + +void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ + assert(first <= last); + if (last < mark) return; + + + Range r(first, last); + bool handled = false; + bool markMerged = false; + list<Range>::iterator merged = ranges.end(); + if (r.mergeable(mark)) { + mark = r.end; + markMerged = true; + handled = true; + } else { + for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { + if (i->merge(r)) { + merged = i; + handled = true; + } else if (r.start < i->start) { + ranges.insert(i, r); + handled = true; + } + } + } + if (!handled) { + ranges.push_back(r); + } else { + while (!ranges.empty() && ranges.front().end <= mark) { + ranges.pop_front(); + } + if (markMerged) { + //new range is incorporated, but may be possible to consolidate + merged = ranges.begin(); + while (merged != ranges.end() && merged->mergeable(mark)) { + mark = merged->end; + merged = ranges.erase(merged); + } + } + if (merged != ranges.end()) { + //consolidate ranges + list<Range>::iterator i = merged; + list<Range>::iterator j = i++; + while (i != ranges.end() && j->merge(*i)) { + j = i++; + } + } + } +} + +void AccumulatedAck::consolidate(){} + +void AccumulatedAck::clear(){ + mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility + ranges.clear(); +} + +bool AccumulatedAck::covers(SequenceNumber tag) const{ + if (tag <= mark) return true; + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(tag)) return true; + } + return false; +} + +void AccumulatedAck::collectRanges(SequenceNumberSet& set) const +{ + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + set.push_back(i->start); + set.push_back(i->end); + } +} + +void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range) +{ + update(mark, cumulative); + range.processRanges(*this); +} + + +bool Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool Range::intersect(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool Range::merge(const Range& r) +{ + if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + + +namespace qpid{ +namespace framing{ + std::ostream& operator<<(std::ostream& out, const Range& r) + { + out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; + return out; + } + + std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) + { + out << "{mark: " << a.mark.getValue() << ", ranges: ("; + for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { + if (i != a.ranges.begin()) out << ", "; + out << *i; + } + out << ")]"; + return out; + } +}} diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.h b/qpid/cpp/src/qpid/framing/AccumulatedAck.h new file mode 100644 index 0000000000..8e241b4ba1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.h @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include <ostream> +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceNumberSet.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { + namespace framing { + + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersect(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + }; + /** + * Keeps an accumulated record of acknowledged messages (by delivery + * tag). + */ + class AccumulatedAck { + public: + /** + * Everything up to this value has been acknowledged. + */ + SequenceNumber mark; + /** + * List of individually acknowledged messages greater than the + * 'mark'. + */ + std::list<Range> ranges; + + QPID_COMMON_EXTERN explicit AccumulatedAck(SequenceNumber r = SequenceNumber()); + QPID_COMMON_EXTERN void update(SequenceNumber firstTag, SequenceNumber lastTag); + QPID_COMMON_EXTERN void consolidate(); + QPID_COMMON_EXTERN void clear(); + QPID_COMMON_EXTERN bool covers(SequenceNumber tag) const; + void collectRanges(SequenceNumberSet& set) const; + QPID_COMMON_EXTERN void update(const SequenceNumber cumulative, const SequenceNumberSet& range); + void operator()(SequenceNumber first, SequenceNumber last) { update(first, last); } + }; + QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const Range&); + QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AccumulatedAck&); + } +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp new file mode 100644 index 0000000000..454e8e298f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Array.cpp @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" +#include <assert.h> + +namespace qpid { +namespace framing { + +Array::Array() : type(TYPE_CODE_VOID) {} + +Array::Array(TypeCode t) : type(t) {} + +Array::Array(uint8_t t) : type(typeCode(t)) {} + +Array::Array(const std::vector<std::string>& in) +{ + type = TYPE_CODE_STR16; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + ValuePtr value(new Str16Value(*i)); + values.push_back(value); + } +} + +uint32_t Array::encodedSize() const { + //note: size is only included when used as a 'top level' type + uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/); + for(ValueVector::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->getData().encodedSize(); + } + return len; +} + +int Array::count() const { + return values.size(); +} + +std::ostream& operator<<(std::ostream& out, const Array& a) { + out << typeName(a.getType()) << "{"; + for(Array::ValueVector::const_iterator i = a.values.begin(); i != a.values.end(); ++i) { + if (i != a.values.begin()) out << ", "; + (*i)->print(out); + } + return out << "}"; +} + +void Array::encode(Buffer& buffer) const{ + buffer.putLong(encodedSize() - 4);//size added only when array is a top-level type + buffer.putOctet(type); + buffer.putLong(count()); + for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->getData().encode(buffer); + } +} + +void Array::decode(Buffer& buffer){ + values.clear(); + uint32_t size = buffer.getLong();//size added only when array is a top-level type + uint32_t available = buffer.available(); + if (available < size) { + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " + << size << " bytes but only " << available << " available")); + } + if (size) { + type = TypeCode(buffer.getOctet()); + uint32_t count = buffer.getLong(); + + FieldValue dummy; + dummy.setType(type); + available = buffer.available(); + if (available < count * dummy.getData().encodedSize()) { + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " + << count << " items of " << dummy.getData().encodedSize() + << " bytes each but only " << available << " bytes available")); + } + + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->setType(type); + value->getData().decode(buffer); + values.push_back(ValuePtr(value)); + } + } +} + + +bool Array::operator==(const Array& x) const { + if (type != x.type) return false; + if (values.size() != x.values.size()) return false; + + for (ValueVector::const_iterator i = values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) { + if (*(i->get()) != *(j->get())) return false; + } + + return true; +} + +void Array::insert(iterator i, ValuePtr value) { + if (type != value->getType()) { + // FIXME aconway 2008-10-31: put meaningful strings in this message. + throw Exception(QPID_MSG("Wrong type of value in Array, expected " << type + << " but found " << TypeCode(value->getType()))); + } + values.insert(i, value); +} + + +} +} diff --git a/qpid/cpp/src/qpid/framing/Blob.cpp b/qpid/cpp/src/qpid/framing/Blob.cpp new file mode 100644 index 0000000000..0c8316f3d2 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Blob.cpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Blob.h" + + +namespace qpid { +namespace framing { + +void BlobHelper<void>::destroy(void*) {} + +void BlobHelper<void>::copy(void*, const void*) {} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Blob.h b/qpid/cpp/src/qpid/framing/Blob.h new file mode 100644 index 0000000000..9878d92fe4 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Blob.h @@ -0,0 +1,21 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + diff --git a/qpid/cpp/src/qpid/framing/BodyFactory.h b/qpid/cpp/src/qpid/framing/BodyFactory.h new file mode 100644 index 0000000000..6a8d9b1988 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/BodyFactory.h @@ -0,0 +1,47 @@ +#ifndef QPID_FRAMING_BODYFACTORY_H +#define QPID_FRAMING_BODYFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace framing { + +/** + * Indirect creation of body types to allow centralized changes to + * memory management strategy. + */ +class BodyFactory { + public: + template <class BodyType> static boost::intrusive_ptr<BodyType> create() { + return new BodyType; + } + + template <class BodyType> static boost::intrusive_ptr<BodyType> copy(const BodyType& body) { + return new BodyType(body); + } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_BODYFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp new file mode 100644 index 0000000000..db302b1e4c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/BodyHandler.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include <boost/cast.hpp> +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +using namespace qpid::framing; +using namespace boost; + +BodyHandler::~BodyHandler() {} + +// TODO aconway 2007-08-13: Replace with visitor. +void BodyHandler::handleBody(AMQBody* body) { + switch(body->type()) + { + case METHOD_BODY: + handleMethod(polymorphic_downcast<AMQMethodBody*>(body)); + break; + case HEADER_BODY: + handleHeader(polymorphic_downcast<AMQHeaderBody*>(body)); + break; + case CONTENT_BODY: + handleContent(polymorphic_downcast<AMQContentBody*>(body)); + break; + case HEARTBEAT_BODY: + handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); + break; + default: + throw FramingErrorException( + QPID_MSG("Invalid frame type " << body->type())); + } +} + diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.h b/qpid/cpp/src/qpid/framing/BodyHandler.h new file mode 100644 index 0000000000..9ded737195 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/BodyHandler.h @@ -0,0 +1,56 @@ +#ifndef _BodyHandler_ +#define _BodyHandler_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace framing { +class AMQBody; +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +// TODO aconway 2007-08-10: rework using Visitor pattern? + +/** + * Interface to handle incoming frame bodies. + * Derived classes provide logic for each frame type. + */ +class BodyHandler { + public: + virtual ~BodyHandler(); + virtual void handleBody(AMQBody* body); + + protected: + virtual void handleMethod(AMQMethodBody*) = 0; + virtual void handleHeader(AMQHeaderBody*) = 0; + virtual void handleContent(AMQContentBody*) = 0; + virtual void handleHeartbeat(AMQHeartbeatBody*) = 0; +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp new file mode 100644 index 0000000000..5a5bc0325e --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -0,0 +1,345 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/Msg.h" +#include <string.h> +#include <boost/format.hpp> +namespace qpid { + +namespace framing { + +Buffer::Buffer(char* _data, uint32_t _size) + : size(_size), data(_data), position(0) { +} + +void Buffer::record(){ + r_position = position; +} + +void Buffer::restore(bool reRecord){ + uint32_t savedPosition = position; + + position = r_position; + + if (reRecord) + r_position = savedPosition; +} + +void Buffer::reset(){ + position = 0; +} + +/////////////////////////////////////////////////// + +void Buffer::putOctet(uint8_t i){ + data[position++] = i; + assert(position <= size); +} + +void Buffer::putShort(uint16_t i){ + uint16_t b = i; + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + assert(position <= size); +} + +void Buffer::putLong(uint32_t i){ + uint32_t b = i; + data[position++] = (uint8_t) (0xFF & (b >> 24)); + data[position++] = (uint8_t) (0xFF & (b >> 16)); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + assert(position <= size); +} + +void Buffer::putLongLong(uint64_t i){ + uint32_t hi = i >> 32; + uint32_t lo = i; + putLong(hi); + putLong(lo); +} + +void Buffer::putInt8(int8_t i){ + data[position++] = (uint8_t) i; + assert(position <= size); +} + +void Buffer::putInt16(int16_t i){ + putShort((uint16_t) i); +} + +void Buffer::putInt32(int32_t i){ + putLong((uint32_t) i); +} + +void Buffer::putInt64(int64_t i){ + putLongLong((uint64_t) i); +} + +void Buffer::putFloat(float f){ + union { + uint32_t i; + float f; + } val; + + val.f = f; + putLong (val.i); +} + +void Buffer::putDouble(double f){ + union { + uint64_t i; + double f; + } val; + + val.f = f; + putLongLong (val.i); +} + +void Buffer::putBin128(const uint8_t* b){ + memcpy (data + position, b, 16); + position += 16; +} + +uint8_t Buffer::getOctet(){ + uint8_t octet = static_cast<uint8_t>(data[position++]); + assert(position <= size); + return octet; +} + +uint16_t Buffer::getShort(){ + uint16_t hi = (unsigned char) data[position++]; + hi = hi << 8; + hi |= (unsigned char) data[position++]; + assert(position <= size); + return hi; +} + +uint32_t Buffer::getLong(){ + uint32_t a = (unsigned char) data[position++]; + uint32_t b = (unsigned char) data[position++]; + uint32_t c = (unsigned char) data[position++]; + uint32_t d = (unsigned char) data[position++]; + assert(position <= size); + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +uint64_t Buffer::getLongLong(){ + uint64_t hi = getLong(); + uint64_t lo = getLong(); + hi = hi << 32; + return hi | lo; +} + +int8_t Buffer::getInt8(){ + int8_t i = static_cast<int8_t>(data[position++]); + assert(position <= size); + return i; +} + +int16_t Buffer::getInt16(){ + return (int16_t) getShort(); +} + +int32_t Buffer::getInt32(){ + return (int32_t) getLong(); +} + +int64_t Buffer::getInt64(){ + return (int64_t) getLongLong(); +} + +float Buffer::getFloat(){ + union { + uint32_t i; + float f; + } val; + val.i = getLong(); + return val.f; +} + +double Buffer::getDouble(){ + union { + uint64_t i; + double f; + } val; + val.i = getLongLong(); + return val.f; +} + +template <> +uint64_t Buffer::getUInt<1>() { + return getOctet(); +} + +template <> +uint64_t Buffer::getUInt<2>() { + return getShort(); +} + +template <> +uint64_t Buffer::getUInt<4>() { + return getLong(); +} + +template <> +uint64_t Buffer::getUInt<8>() { + return getLongLong(); +} + +template <> +void Buffer::putUInt<1>(uint64_t i) { + if (std::numeric_limits<uint8_t>::min() <= i && i <= std::numeric_limits<uint8_t>::max()) { + putOctet(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint8_t.")); +} + +template <> +void Buffer::putUInt<2>(uint64_t i) { + if (std::numeric_limits<uint16_t>::min() <= i && i <= std::numeric_limits<uint16_t>::max()) { + putShort(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint16_t.")); +} + +template <> +void Buffer::putUInt<4>(uint64_t i) { + if (std::numeric_limits<uint32_t>::min() <= i && i <= std::numeric_limits<uint32_t>::max()) { + putLong(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint32_t.")); +} + +template <> +void Buffer::putUInt<8>(uint64_t i) { + putLongLong(i); +} + +void Buffer::putShortString(const string& s){ + size_t slen = s.length(); + if (slen <= std::numeric_limits<uint8_t>::max()) { + uint8_t len = (uint8_t) slen; + checkAvailable(slen + 1); + putOctet(len); + s.copy(data + position, len); + position += len; + return; + } + throw Exception(QPID_MSG("Could not encode string of " << slen << " bytes as uint8_t string.")); +} + +void Buffer::putMediumString(const string& s){ + size_t slen = s.length(); + if (slen <= std::numeric_limits<uint16_t>::max()) { + uint16_t len = (uint16_t) slen; + checkAvailable(slen + 2); + putShort(len); + s.copy(data + position, len); + position += len; + return; + } + throw Exception(QPID_MSG("Could not encode string of " << slen << " bytes as uint16_t string.")); +} + +void Buffer::putLongString(const string& s){ + uint32_t len = s.length(); + checkAvailable(len + 4); + putLong(len); + s.copy(data + position, len); + position += len; +} + +void Buffer::getShortString(string& s){ + uint8_t len = getOctet(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getMediumString(string& s){ + uint16_t len = getShort(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getLongString(string& s){ + uint32_t len = getLong(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getBin128(uint8_t* b){ + memcpy (b, data + position, 16); + position += 16; +} + +void Buffer::putRawData(const string& s){ + uint32_t len = s.length(); + checkAvailable(len); + s.copy(data + position, len); + position += len; +} + +void Buffer::getRawData(string& s, uint32_t len){ + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::putRawData(const uint8_t* s, size_t len){ + checkAvailable(len); + memcpy(data + position, s, len); + position += len; +} + +void Buffer::getRawData(uint8_t* s, size_t len){ + checkAvailable(len); + memcpy(s, data + position, len); + position += len; +} + +void Buffer::dump(std::ostream& out) const { + for (uint32_t i = position; i < size; i++) + { + if (i != position) + out << " "; + out << boost::format("%02x") % ((unsigned) (uint8_t) data[i]); + } +} + +std::ostream& operator<<(std::ostream& out, const Buffer& b){ + out << "Buffer["; + b.dump(out); + return out << "]"; +} + +}} diff --git a/qpid/cpp/src/qpid/framing/ChannelHandler.h b/qpid/cpp/src/qpid/framing/ChannelHandler.h new file mode 100644 index 0000000000..ddab204578 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ChannelHandler.h @@ -0,0 +1,53 @@ +#ifndef QPID_FRAMING_CHANNELHANDLER_H +#define QPID_FRAMING_CHANNELHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AMQFrame.h" + +namespace qpid { +namespace framing { + +/** + * Sets the channel number on outgoing frames. + */ +class ChannelHandler : public FrameHandler +{ + public: + ChannelHandler(uint16_t channelId=0, FrameHandler* next=0) + : FrameHandler(next), channel(channelId) {} + void handle(AMQFrame& frame) { + frame.setChannel(channel); + next->handle(frame); + } + uint16_t get() const { return channel; } + ChannelHandler& set(uint16_t ch) { channel=ch; return *this; } + operator uint16_t() const { return get(); } + ChannelHandler& operator=(uint16_t ch) { return set(ch); } + + private: + uint16_t channel; +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/framing/Endian.cpp b/qpid/cpp/src/qpid/framing/Endian.cpp new file mode 100644 index 0000000000..5acc3c459f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Endian.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Endian.h" + +namespace qpid { +namespace framing { + +Endian::Endian() : littleEndian(!testBigEndian()) {} + +bool Endian::testBigEndian() +{ + uint16_t a = 1; + uint16_t b; + uint8_t* p = (uint8_t*) &b; + p[0] = 0xFF & (a >> 8); + p[1] = 0xFF & (a); + return a == b; +} + +uint8_t* Endian::convertIfRequired(uint8_t* const octets, int width) +{ + if (instance.littleEndian) { + for (int i = 0; i < (width/2); i++) { + uint8_t temp = octets[i]; + octets[i] = octets[width - (1 + i)]; + octets[width - (1 + i)] = temp; + } + } + return octets; +} + +const Endian Endian::instance; + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Endian.h b/qpid/cpp/src/qpid/framing/Endian.h new file mode 100644 index 0000000000..077d5a3e9b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Endian.h @@ -0,0 +1,46 @@ +#ifndef QPID_FRAMING_ENDIAN_H +#define QPID_FRAMING_ENDIAN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace framing { + +/** + * Conversion utility for little-endian platforms that need to convert + * to and from network ordered octet sequences + */ +class Endian +{ + public: + static uint8_t* convertIfRequired(uint8_t* const octets, int width); + private: + const bool littleEndian; + Endian(); + static const Endian instance; + static bool testBigEndian(); +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_ENDIAN_H*/ diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp new file mode 100644 index 0000000000..21eaea0f4d --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Endian.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" +#include <assert.h> + +namespace qpid { +namespace framing { + +FieldTable::FieldTable(const FieldTable& ft) +{ + *this = ft; +} + +FieldTable& FieldTable::operator=(const FieldTable& ft) +{ + clear(); + values = ft.values; + return *this; +} + +FieldTable::~FieldTable() {} + +uint32_t FieldTable::encodedSize() const { + uint32_t len(4/*size field*/ + 4/*count field*/); + for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + // shortstr_len_byte + key size + value size + len += 1 + (i->first).size() + (i->second)->encodedSize(); + } + return len; +} + +int FieldTable::count() const { + return values.size(); +} + +namespace +{ +std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) { + return out << i.first << ":" << *i.second; +} +} + +std::ostream& operator<<(std::ostream& out, const FieldTable& t) { + out << "{"; + FieldTable::ValueMap::const_iterator i = t.begin(); + if (i != t.end()) out << *i++; + while (i != t.end()) + { + out << "," << *i++; + } + return out << "}"; +} + +void FieldTable::set(const std::string& name, const ValuePtr& value){ + values[name] = value; +} + +void FieldTable::setString(const std::string& name, const std::string& value){ + values[name] = ValuePtr(new Str16Value(value)); +} + +void FieldTable::setInt(const std::string& name, const int value){ + values[name] = ValuePtr(new IntegerValue(value)); +} + +void FieldTable::setInt64(const std::string& name, const int64_t value){ + values[name] = ValuePtr(new Integer64Value(value)); +} + +void FieldTable::setTimestamp(const std::string& name, const uint64_t value){ + values[name] = ValuePtr(new TimeValue(value)); +} + +void FieldTable::setUInt64(const std::string& name, const uint64_t value){ + values[name] = ValuePtr(new Unsigned64Value(value)); +} + +void FieldTable::setTable(const std::string& name, const FieldTable& value) +{ + values[name] = ValuePtr(new FieldTableValue(value)); +} +void FieldTable::setArray(const std::string& name, const Array& value) +{ + values[name] = ValuePtr(new ArrayValue(value)); +} + +void FieldTable::setFloat(const std::string& name, const float value){ + values[name] = ValuePtr(new FloatValue(value)); +} + +void FieldTable::setDouble(const std::string& name, double value){ + values[name] = ValuePtr(new DoubleValue(value)); +} + +FieldTable::ValuePtr FieldTable::get(const std::string& name) const +{ + ValuePtr value; + ValueMap::const_iterator i = values.find(name); + if ( i!=values.end() ) + value = i->second; + return value; +} + +namespace { + template <class T> T default_value() { return T(); } + template <> int default_value<int>() { return 0; } + //template <> uint64_t default_value<uint64_t>() { return 0; } +} + +template <class T> +T getValue(const FieldTable::ValuePtr value) +{ + if (!value || !value->convertsTo<T>()) + return default_value<T>(); + + return value->get<T>(); +} + +std::string FieldTable::getAsString(const std::string& name) const { + return getValue<std::string>(get(name)); +} + +int FieldTable::getAsInt(const std::string& name) const { + return getValue<int>(get(name)); +} + +uint64_t FieldTable::getAsUInt64(const std::string& name) const { + return static_cast<uint64_t>( getValue<int64_t>(get(name))); +} + +int64_t FieldTable::getAsInt64(const std::string& name) const { + return getValue<int64_t>(get(name)); +} + +bool FieldTable::getTable(const std::string& name, FieldTable& value) const { + return getEncodedValue<FieldTable>(get(name), value); +} + +bool FieldTable::getArray(const std::string& name, Array& value) const { + return getEncodedValue<Array>(get(name), value); +} + +template <class T, int width, uint8_t typecode> +bool getRawFixedWidthValue(FieldTable::ValuePtr vptr, T& value) +{ + if (vptr && vptr->getType() == typecode) { + value = vptr->get<T>(); + return true; + } + return false; +} + +bool FieldTable::getFloat(const std::string& name, float& value) const { + return getRawFixedWidthValue<float, 4, 0x23>(get(name), value); +} + +bool FieldTable::getDouble(const std::string& name, double& value) const { + return getRawFixedWidthValue<double, 8, 0x33>(get(name), value); +} + +//uint64_t FieldTable::getTimestamp(const std::string& name) const { +// return getValue<uint64_t>(name); +//} + +void FieldTable::encode(Buffer& buffer) const { + buffer.putLong(encodedSize() - 4); + buffer.putLong(values.size()); + for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { + buffer.putShortString(i->first); + i->second->encode(buffer); + } +} + +void FieldTable::decode(Buffer& buffer){ + clear(); + uint32_t len = buffer.getLong(); + if (len) { + uint32_t available = buffer.available(); + if (available < len) + throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); + uint32_t count = buffer.getLong(); + uint32_t leftover = available - len; + while(buffer.available() > leftover && count--){ + std::string name; + ValuePtr value(new FieldValue); + + buffer.getShortString(name); + value->decode(buffer); + values[name] = ValuePtr(value); + } + } +} + +bool FieldTable::operator==(const FieldTable& x) const { + if (values.size() != x.values.size()) return false; + for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + ValueMap::const_iterator j = x.values.find(i->first); + if (j == x.values.end()) return false; + if (*(i->second) != *(j->second)) return false; + } + return true; +} + +void FieldTable::erase(const std::string& name) +{ + if (values.find(name) != values.end()) + values.erase(name); +} + +std::pair<FieldTable::ValueMap::iterator, bool> FieldTable::insert(const ValueMap::value_type& value) +{ + return values.insert(value); +} + +FieldTable::ValueMap::iterator FieldTable::insert(ValueMap::iterator position, const ValueMap::value_type& value) +{ + return values.insert(position, value); +} + + +} +} diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp new file mode 100644 index 0000000000..ce5a50117c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Endian.h" +#include "qpid/framing/List.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +uint8_t FieldValue::getType() const +{ + return typeOctet; +} + +void FieldValue::setType(uint8_t type) +{ + typeOctet = type; + if (typeOctet == 0xA8) { + data.reset(new EncodedValue<FieldTable>()); + } else if (typeOctet == 0xA9) { + data.reset(new EncodedValue<List>()); + } else if (typeOctet == 0xAA) { + data.reset(new EncodedValue<Array>()); + } else { + uint8_t lenType = typeOctet >> 4; + switch(lenType){ + case 0: + data.reset(new FixedWidthValue<1>()); + break; + case 1: + data.reset(new FixedWidthValue<2>()); + break; + case 2: + data.reset(new FixedWidthValue<4>()); + break; + case 3: + data.reset(new FixedWidthValue<8>()); + break; + case 4: + data.reset(new FixedWidthValue<16>()); + break; + case 5: + data.reset(new FixedWidthValue<32>()); + break; + case 6: + data.reset(new FixedWidthValue<64>()); + break; + case 7: + data.reset(new FixedWidthValue<128>()); + break; + case 8: + data.reset(new VariableWidthValue<1>()); + break; + case 9: + data.reset(new VariableWidthValue<2>()); + break; + case 0xA: + data.reset(new VariableWidthValue<4>()); + break; + case 0xC: + data.reset(new FixedWidthValue<5>()); + break; + case 0xD: + data.reset(new FixedWidthValue<9>()); + break; + case 0xF: + data.reset(new FixedWidthValue<0>()); + break; + default: + throw IllegalArgumentException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); + } + } +} + +void FieldValue::decode(Buffer& buffer) +{ + setType(buffer.getOctet()); + data->decode(buffer); +} + +void FieldValue::encode(Buffer& buffer) +{ + buffer.putOctet(typeOctet); + data->encode(buffer); +} + +bool FieldValue::operator==(const FieldValue& v) const +{ + return + typeOctet == v.typeOctet && + *data == *v.data; +} + +Str8Value::Str8Value(const std::string& v) : + FieldValue( + TYPE_CODE_STR8, + new VariableWidthValue<1>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{ +} + +Str16Value::Str16Value(const std::string& v) : + FieldValue( + 0x95, + new VariableWidthValue<2>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +Var16Value::Var16Value(const std::string& v, uint8_t code) : + FieldValue( + code, + new VariableWidthValue<2>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} +Var32Value::Var32Value(const std::string& v, uint8_t code) : + FieldValue( + code, + new VariableWidthValue<4>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +Struct32Value::Struct32Value(const std::string& v) : + FieldValue( + 0xAB, + new VariableWidthValue<4>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +IntegerValue::IntegerValue(int v) : + FieldValue(0x21, new FixedWidthValue<4>(v)) +{} + +FloatValue::FloatValue(float v) : + FieldValue(0x23, new FixedWidthValue<4>(Endian::convertIfRequired(reinterpret_cast<uint8_t*>(&v), 4))) +{} + +DoubleValue::DoubleValue(double v) : + FieldValue(0x33, new FixedWidthValue<8>(Endian::convertIfRequired(reinterpret_cast<uint8_t*>(&v), 8))) +{} + +Integer64Value::Integer64Value(int64_t v) : + FieldValue(0x31, new FixedWidthValue<8>(v)) +{} + +Unsigned64Value::Unsigned64Value(uint64_t v) : + FieldValue(0x32, new FixedWidthValue<8>(v)) +{} + + +TimeValue::TimeValue(uint64_t v) : + FieldValue(0x38, new FixedWidthValue<8>(v)) +{ +} + +FieldTableValue::FieldTableValue(const FieldTable& f) : FieldValue(0xa8, new EncodedValue<FieldTable>(f)) +{ +} + +ListValue::ListValue(const List& l) : FieldValue(0xa9, new EncodedValue<List>(l)) +{ +} + +ArrayValue::ArrayValue(const Array& a) : FieldValue(0xaa, new EncodedValue<Array>(a)) +{ +} + +VoidValue::VoidValue() : FieldValue(0xf0, new FixedWidthValue<0>()) {} + +BoolValue::BoolValue(bool b) : + FieldValue(0x08, new FixedWidthValue<1>(b)) +{} + +Unsigned8Value::Unsigned8Value(uint8_t v) : + FieldValue(0x02, new FixedWidthValue<1>(v)) +{} +Unsigned16Value::Unsigned16Value(uint16_t v) : + FieldValue(0x12, new FixedWidthValue<2>(v)) +{} +Unsigned32Value::Unsigned32Value(uint32_t v) : + FieldValue(0x22, new FixedWidthValue<4>(v)) +{} + +Integer8Value::Integer8Value(int8_t v) : + FieldValue(0x01, new FixedWidthValue<1>(v)) +{} +Integer16Value::Integer16Value(int16_t v) : + FieldValue(0x11, new FixedWidthValue<2>(v)) +{} +UuidValue::UuidValue(const unsigned char* v) : + FieldValue(0x48, new FixedWidthValue<16>(v)) +{} + +void FieldValue::print(std::ostream& out) const { + data->print(out); + out << TypeCode(typeOctet) << '('; + if (data->convertsToString()) out << data->getString(); + else if (data->convertsToInt()) out << data->getInt(); + else data->print(out); + out << ')'; +} + +uint8_t* FieldValue::convertIfRequired(uint8_t* const octets, int width) +{ + return Endian::convertIfRequired(octets, width); +} + +}} diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.cpp b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp new file mode 100644 index 0000000000..90cbbd84a1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FrameDecoder.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" +#include <algorithm> +#include <string.h> + +namespace qpid { +namespace framing { + +namespace { +/** Append up to n bytes from start of buf to end of bytes. */ +void append(std::vector<char>& bytes, Buffer& buffer, size_t n) { + size_t oldSize = bytes.size(); + if ((n = std::min(n, size_t(buffer.available()))) == 0) + return; + bytes.resize(oldSize+n); + char* p = &bytes[oldSize]; + buffer.getRawData(reinterpret_cast<uint8_t*>(p), n); +} +} + +bool FrameDecoder::decode(Buffer& buffer) { + if (buffer.available() == 0) return false; + if (fragment.empty()) { + if (frame.decode(buffer)) // Decode from buffer + return true; + else // Store fragment + append(fragment, buffer, buffer.available()); + } + else { // Already have a fragment + // Get enough data to decode the frame size. + if (fragment.size() < AMQFrame::DECODE_SIZE_MIN) { + append(fragment, buffer, AMQFrame::DECODE_SIZE_MIN - fragment.size()); + } + if (fragment.size() >= AMQFrame::DECODE_SIZE_MIN) { + uint16_t size = AMQFrame::decodeSize(&fragment[0]); + if (size <= fragment.size()) + throw FramingErrorException(QPID_MSG("Frame size " << size << " is too small.")); + append(fragment, buffer, size-fragment.size()); + Buffer b(&fragment[0], fragment.size()); + if (frame.decode(b)) { + assert(b.available() == 0); + fragment.clear(); + return true; + } + } + } + return false; +} + +void FrameDecoder::setFragment(const char* data, size_t size) { + fragment.resize(size); + ::memcpy(&fragment[0], data, size); +} + +std::pair<const char*, size_t> FrameDecoder::getFragment() const { + return std::pair<const char*, size_t>(&fragment[0], fragment.size()); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.h b/qpid/cpp/src/qpid/framing/FrameDecoder.h new file mode 100644 index 0000000000..26bed6c447 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.h @@ -0,0 +1,52 @@ +#ifndef QPID_FRAMING_FRAMEDECODER_H +#define QPID_FRAMING_FRAMEDECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQFrame.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +/** + * Decode a frame from buffer. If buffer does not contain a complete + * frame, caches the fragment for the next call to decode. + */ +class FrameDecoder +{ + public: + QPID_COMMON_EXTERN bool decode(Buffer& buffer); + const AMQFrame& getFrame() const { return frame; } + AMQFrame& getFrame() { return frame; } + + void setFragment(const char*, size_t); + std::pair<const char*, size_t> getFragment() const; + + private: + std::vector<char> fragment; + AMQFrame frame; + +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_FRAMEDECODER_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h b/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h new file mode 100644 index 0000000000..bd676960bf --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h @@ -0,0 +1,60 @@ +#ifndef QPID_FRAMING_FRAMEVISITOR_H +#define QPID_FRAMING_FRAMEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/MethodBodyDefaultVisitor.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" + +namespace qpid { +namespace framing { +/** + * Visitor for all concrete frame body types, which combines + * AMQBodyConstVisitor and MethodBodyDefaultVisitor. + * + * Derived classes can override visit methods to specify actions. + * Derived classes must override defaultVisit(), which is called + * for any non-overridden visit functions. + * + */ +struct FrameDefaultVisitor : public AMQBodyConstVisitor, + protected MethodBodyDefaultVisitor +{ + virtual void defaultVisit(const AMQBody&) = 0; + void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); } + + void visit(const AMQHeaderBody& b) { defaultVisit(b); } + void visit(const AMQContentBody& b) { defaultVisit(b); } + void visit(const AMQHeartbeatBody& b) { defaultVisit(b); } + void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); } + + using AMQBodyConstVisitor::visit; + using MethodBodyDefaultVisitor::visit; +}; + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_FRAMEVISITOR_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameHandler.h b/qpid/cpp/src/qpid/framing/FrameHandler.h new file mode 100644 index 0000000000..fa1fb535ef --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameHandler.h @@ -0,0 +1,33 @@ +#ifndef QPID_FRAMING_FRAMEHANDLER_H +#define QPID_FRAMING_FRAMEHANDLER_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Handler.h" + +namespace qpid { +namespace framing { + +class AMQFrame; +typedef Handler<AMQFrame&> FrameHandler; + + +}} +#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp new file mode 100644 index 0000000000..255aaf6e6b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::framing; +using namespace boost; + +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { } +FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true) +{ + for (Frames::const_iterator i = original.begin(); i != original.end(); ++i) { + parts.push_back(AMQFrame(*(i->getBody()))); + parts.back().setFirstSegment(i->isFirstSegment()); + parts.back().setLastSegment(i->isLastSegment()); + parts.back().setFirstFrame(i->isFirstFrame()); + parts.back().setLastFrame(i->isLastFrame()); + } +} + +void FrameSet::append(const AMQFrame& part) +{ + parts.push_back(part); + recalculateSize = true; +} + +bool FrameSet::isComplete() const +{ + return !parts.empty() && parts.back().getEof() && parts.back().getEos(); +} + +bool FrameSet::isContentBearing() const +{ + const AMQMethodBody* method = getMethod(); + return method && method->isContentBearing(); +} + +const AMQMethodBody* FrameSet::getMethod() const +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + +AMQMethodBody* FrameSet::getMethod() +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + +const AMQHeaderBody* FrameSet::getHeaders() const +{ + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); +} + +AMQHeaderBody* FrameSet::getHeaders() +{ + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); +} + +uint64_t FrameSet::getContentSize() const +{ + if (recalculateSize) + { + SumBodySize sum; + map_if(sum, TypeFilter<CONTENT_BODY>()); + contentSize = sum.getSize(); + recalculateSize = false; + } + return contentSize; +} + +void FrameSet::getContent(std::string& out) const { + out.clear(); + out.reserve(getContentSize()); + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) + out += i->castBody<AMQContentBody>()->getData(); + } +} + +std::string FrameSet::getContent() const { + std::string out; + getContent(out); + return out; +} diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h new file mode 100644 index 0000000000..cae75e5ec8 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/InlineVector.h" +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/CommonImportExport.h" + +#ifndef _FrameSet_ +#define _FrameSet_ + +namespace qpid { +namespace framing { + +/** + * Collects the frames representing a message. + */ +class FrameSet +{ + typedef InlineVector<AMQFrame, 4> Frames; + const SequenceNumber id; + Frames parts; + mutable uint64_t contentSize; + mutable bool recalculateSize; + +public: + typedef boost::shared_ptr<FrameSet> shared_ptr; + + QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id); + QPID_COMMON_EXTERN FrameSet(const FrameSet&); + QPID_COMMON_EXTERN void append(const AMQFrame& part); + QPID_COMMON_EXTERN bool isComplete() const; + + QPID_COMMON_EXTERN uint64_t getContentSize() const; + + QPID_COMMON_EXTERN void getContent(std::string&) const; + QPID_COMMON_EXTERN std::string getContent() const; + + bool isContentBearing() const; + + QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const; + QPID_COMMON_EXTERN AMQMethodBody* getMethod(); + QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const; + QPID_COMMON_EXTERN AMQHeaderBody* getHeaders(); + + template <class T> bool isA() const { + const AMQMethodBody* method = getMethod(); + return method && method->isA<T>(); + } + + template <class T> const T* as() const { + const AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; + } + + template <class T> T* as() { + AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0; + } + + template <class T> const T* getHeaderProperties() const { + const AMQHeaderBody* header = getHeaders(); + return header ? header->get<T>() : 0; + } + + Frames::const_iterator begin() const { return parts.begin(); } + Frames::const_iterator end() const { return parts.end(); } + + const SequenceNumber& getId() const { return id; } + + template <class P> void remove(P predicate) { + parts.erase(std::remove_if(parts.begin(), parts.end(), predicate), parts.end()); + } + + template <class F> void map(F& functor) { + std::for_each(parts.begin(), parts.end(), functor); + } + + template <class F> void map(F& functor) const { + std::for_each(parts.begin(), parts.end(), functor); + } + + template <class F, class P> void map_if(F& functor, P predicate) { + for(Frames::iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } + + template <class F, class P> void map_if(F& functor, P predicate) const { + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Handler.h b/qpid/cpp/src/qpid/framing/Handler.h new file mode 100644 index 0000000000..fa8db36f49 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Handler.h @@ -0,0 +1,101 @@ +#ifndef QPID_FRAMING_HANDLER_H +#define QPID_FRAMING_HANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include <boost/type_traits/remove_reference.hpp> +#include <assert.h> + +namespace qpid { +namespace framing { + +template <class T> +struct Handler { + typedef T HandledType; + typedef void handleFptr(T); + typedef void result_type; // Compatible with std/boost functors. + + Handler(Handler<T>* next_=0) : next(next_) {} + virtual ~Handler() {} + virtual void handle(T) = 0; + + /** Allow functor syntax for calling handle */ + void operator()(T t) { handle(t); } + + + /** Pointer to next handler in a linked list. */ + Handler<T>* next; + + /** Adapt any void(T) functor as a Handler. + * Functor<F>(f) will copy f. + * Functor<F&>(f) will only take a reference to x. + */ + template <class F> class Functor : public Handler<T> { + public: + Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {} + void handle(T t) { functor(t); } + private: + F functor; + }; + + /** Adapt a member function of X as a Handler. + * Only holds a reference to its target, not a copy. + */ + template <class X, void (X::*F)(T)> + class MemFunRef : public Handler<T> { + public: + MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} + void handle(T t) { (target->*F)(t); } + + /** Allow calling with -> syntax */ + MemFunRef* operator->() { return this; } + + private: + X* target; + }; + + /** Interface for a handler that implements a + * pair of in/out handle operations. + * @see InOutHandler + */ + class InOutHandlerInterface { + public: + virtual ~InOutHandlerInterface() {} + virtual void handleIn(T) = 0; + virtual void handleOut(T) = 0; + }; + + /** Support for implementing an in-out handler pair as a single class. + * Overrides handleIn, handleOut functions in a single class. + */ + struct InOutHandler : protected InOutHandlerInterface { + InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in; + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out; + }; +}; + + + +}} +#endif /*!QPID_FRAMING_HANDLER_H*/ +// diff --git a/qpid/cpp/src/qpid/framing/HeaderProperties.h b/qpid/cpp/src/qpid/framing/HeaderProperties.h new file mode 100644 index 0000000000..8b1828daec --- /dev/null +++ b/qpid/cpp/src/qpid/framing/HeaderProperties.h @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" + +#ifndef _HeaderProperties_ +#define _HeaderProperties_ + +namespace qpid { +namespace framing { + + class HeaderProperties + { + + public: + inline virtual ~HeaderProperties(){} + virtual uint8_t classId() const = 0; + virtual uint32_t encodedSize() const = 0; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t size) = 0; + }; +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/InitiationHandler.cpp b/qpid/cpp/src/qpid/framing/InitiationHandler.cpp new file mode 100644 index 0000000000..7ded505a47 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InitiationHandler.cpp @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/InitiationHandler.h" + +qpid::framing::InitiationHandler::~InitiationHandler() {} diff --git a/qpid/cpp/src/qpid/framing/InitiationHandler.h b/qpid/cpp/src/qpid/framing/InitiationHandler.h new file mode 100644 index 0000000000..5dfcc6b468 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InitiationHandler.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +#ifndef _InitiationHandler_ +#define _InitiationHandler_ + +#include "qpid/framing/ProtocolInitiation.h" + +namespace qpid { +namespace framing { + + class InitiationHandler{ + public: + virtual ~InitiationHandler(); + virtual void initiated(const ProtocolInitiation&) = 0; + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/InputHandler.h b/qpid/cpp/src/qpid/framing/InputHandler.h new file mode 100644 index 0000000000..3efb23632a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InputHandler.h @@ -0,0 +1,41 @@ +#ifndef _InputHandler_ +#define _InputHandler_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace framing { + +// TODO aconway 2007-08-29: Eliminate, replace with FrameHandler. +class InputHandler : public FrameHandler { + public: + virtual ~InputHandler() {} + virtual void received(AMQFrame&) = 0; + void handle(AMQFrame& f) { received(f); } +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Invoker.h b/qpid/cpp/src/qpid/framing/Invoker.h new file mode 100644 index 0000000000..4f1cf7c331 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Invoker.h @@ -0,0 +1,86 @@ +#ifndef QPID_FRAMING_INVOKER_H +#define QPID_FRAMING_INVOKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/MethodBodyDefaultVisitor.h" +#include "qpid/framing/StructHelper.h" + +#include <boost/optional.hpp> + +namespace qpid { +namespace framing { + +class AMQMethodBody; + +/** + * Base class for invoker visitors. + */ +class Invoker: public MethodBodyDefaultVisitor, protected StructHelper +{ + public: + struct Result { + public: + Result() : handled(false) {} + const std::string& getResult() const { return result; } + bool hasResult() const { return !result.empty(); } + bool wasHandled() const { return handled; } + operator bool() const { return handled; } + + std::string result; + bool handled; + }; + + void defaultVisit(const AMQMethodBody&) {} + Result getResult() const { return result; } + + protected: + Result result; +}; + +/** + * Invoke an invocable object. + * Invocable classes must provide a nested type Invoker. + */ +template <class Invocable> +Invoker::Result invoke(Invocable& target, const AMQMethodBody& body) { + typename Invocable::Invoker invoker(target); + body.accept(invoker); + return invoker.getResult(); +} + +/** + * Invoke an invocable object. + * Invocable classes must provide a nested type Invoker. + */ +template <class Invocable> +Invoker::Result invoke(Invocable& target, const AMQBody& body) { + typename Invocable::Invoker invoker(target); + const AMQMethodBody* method = body.getMethod(); + if (method) + method->accept(invoker); + return invoker.getResult(); +} + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_INVOKER_H*/ diff --git a/qpid/cpp/src/qpid/framing/IsInSequenceSet.h b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h new file mode 100644 index 0000000000..fe10c1b9fa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h @@ -0,0 +1,51 @@ +#ifndef QPID_FRAMING_ISINSEQUENCESET_H +#define QPID_FRAMING_ISINSEQUENCESET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" + +namespace qpid { +namespace framing { +/** + * Functor to test whether values are in a sequence set. This is a + * stateful functor that requires the values to be supplied in order + * and takes advantage of that ordering to avoid multiple scans. + */ +class IsInSequenceSet +{ + public: + IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {} + + bool operator()(const SequenceNumber& n) { + while (i != set.rangesEnd() && i->end() <= n) ++i; + return i != set.rangesEnd() && i->begin() <= n; + } + + private: + const SequenceSet& set; + SequenceSet::RangeIterator i; +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_ISINSEQUENCESET_H*/ diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp new file mode 100644 index 0000000000..963ebc206b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/List.cpp @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/List.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +uint32_t List::encodedSize() const +{ + uint32_t len(4/*size*/ + 4/*count*/); + for(Values::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->encodedSize(); + } + return len; +} + +void List::encode(Buffer& buffer) const +{ + buffer.putLong(encodedSize() - 4); + buffer.putLong(size()); + for (Values::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->encode(buffer); + } +} + +void List::decode(Buffer& buffer) +{ + values.clear(); + uint32_t size = buffer.getLong(); + uint32_t available = buffer.available(); + if (available < size) { + throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected " + << size << " bytes but only " << available << " available")); + } + if (size) { + uint32_t count = buffer.getLong(); + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->decode(buffer); + values.push_back(value); + } + } +} + + +bool List::operator==(const List& other) const { + return values.size() == other.values.size() && + std::equal(values.begin(), values.end(), other.values.begin()); +} + +std::ostream& operator<<(std::ostream& out, const List& l) +{ + out << "{"; + for(List::Values::const_iterator i = l.values.begin(); i != l.values.end(); ++i) { + if (i != l.values.begin()) out << ", "; + (*i)->print(out); + } + return out << "}"; +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/MethodBodyFactory.h b/qpid/cpp/src/qpid/framing/MethodBodyFactory.h new file mode 100644 index 0000000000..88bc444795 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/MethodBodyFactory.h @@ -0,0 +1,45 @@ +#ifndef QPID_FRAMING_METHODBODYFACTORY_H +#define QPID_FRAMING_METHODBODYFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace framing { + +class AMQMethodBody; + +/** + * Functions to create instances of AMQMethodBody sub-classes. + * Note: MethodBodyFactory.cpp file is generated by rubygen. + */ +class MethodBodyFactory +{ + public: + static boost::intrusive_ptr<AMQMethodBody> create(ClassId c, MethodId m); +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_METHODBODYFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/framing/MethodContent.h b/qpid/cpp/src/qpid/framing/MethodContent.h new file mode 100644 index 0000000000..b290a0c140 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/MethodContent.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MethodContent_ +#define _MethodContent_ + +#include <string> +#include "qpid/framing/AMQHeaderBody.h" + +namespace qpid { +namespace framing { + +class MethodContent +{ +public: + virtual ~MethodContent() {} + //TODO: rethink this interface + virtual AMQHeaderBody getHeader() const = 0; + virtual const std::string& getData() const = 0; +}; + +}} +#endif diff --git a/qpid/cpp/src/qpid/framing/ModelMethod.h b/qpid/cpp/src/qpid/framing/ModelMethod.h new file mode 100644 index 0000000000..d99bd06cfa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ModelMethod.h @@ -0,0 +1,49 @@ +#ifndef _ModelMethod_ +#define _ModelMethod_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/Header.h" + +namespace qpid { +namespace framing { + + +class ModelMethod : public AMQMethodBody +{ + mutable Header header; +public: + virtual ~ModelMethod() {} + virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); } + virtual void decodeHeader(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); } + virtual uint32_t headerSize() const { return header.encodedSize(); } + virtual bool isSync() const { return header.getSync(); } + virtual void setSync(bool on) const { header.setSync(on); } + Header& getHeader() { return header; } + const Header& getHeader() const { return header; } +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/OutputHandler.h b/qpid/cpp/src/qpid/framing/OutputHandler.h new file mode 100644 index 0000000000..88c95589da --- /dev/null +++ b/qpid/cpp/src/qpid/framing/OutputHandler.h @@ -0,0 +1,42 @@ +#ifndef _OutputHandler_ +#define _OutputHandler_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace framing { + +// TODO aconway 2007-08-29: Replace with FrameHandler. +class OutputHandler : public FrameHandler { + public: + virtual ~OutputHandler() {} + virtual void send(AMQFrame&) = 0; + void handle(AMQFrame& f) { send(f); } +}; + + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp new file mode 100644 index 0000000000..e617015d64 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolInitiation.h" + +namespace qpid { +namespace framing { + +ProtocolInitiation::ProtocolInitiation(){} + +ProtocolInitiation::ProtocolInitiation(uint8_t _major, uint8_t _minor) : version(_major, _minor) {} + +ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} + +ProtocolInitiation::~ProtocolInitiation(){} + +void ProtocolInitiation::encode(Buffer& buffer) const { + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('Q'); + buffer.putOctet('P'); + buffer.putOctet(1);//class + buffer.putOctet(1);//instance + buffer.putOctet(version.getMajor()); + buffer.putOctet(version.getMinor()); +} + +bool ProtocolInitiation::decode(Buffer& buffer){ + if(buffer.available() >= 8){ + buffer.getOctet();//A + buffer.getOctet();//M + buffer.getOctet();//Q + buffer.getOctet();//P + buffer.getOctet();//class + buffer.getOctet();//instance + version.setMajor(buffer.getOctet()); + version.setMinor(buffer.getOctet()); + return true; + }else{ + return false; + } +} + + +std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi) { + return o << int(pi.getMajor()) << "-" << int(pi.getMinor()); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h new file mode 100644 index 0000000000..c519bc2442 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQDataBlock.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/CommonImportExport.h" + +#ifndef _ProtocolInitiation_ +#define _ProtocolInitiation_ + +namespace qpid { +namespace framing { + +class ProtocolInitiation : public AMQDataBlock +{ +private: + ProtocolVersion version; + +public: + QPID_COMMON_EXTERN ProtocolInitiation(); + QPID_COMMON_EXTERN ProtocolInitiation(uint8_t major, uint8_t minor); + QPID_COMMON_EXTERN ProtocolInitiation(ProtocolVersion p); + QPID_COMMON_EXTERN virtual ~ProtocolInitiation(); + QPID_COMMON_EXTERN virtual void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN virtual bool decode(Buffer& buffer); + inline virtual uint32_t encodedSize() const { return 8; } + inline uint8_t getMajor() const { return version.getMajor(); } + inline uint8_t getMinor() const { return version.getMinor(); } + inline ProtocolVersion getVersion() const { return version; } + bool operator==(ProtocolVersion v) const { return v == getVersion(); } +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi); + + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp b/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp new file mode 100644 index 0000000000..c63cddb4cc --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolVersion.h" +#include <sstream> + +using namespace qpid::framing; + +const std::string ProtocolVersion::toString() const +{ + std::stringstream ss; + ss << major_ << "-" << minor_; + return ss.str(); +} + +ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p) +{ + major_ = p.major_; + minor_ = p.minor_; + return *this; +} + +bool ProtocolVersion::operator==(ProtocolVersion p) const +{ + return major_ == p.major_ && minor_ == p.minor_; +} + diff --git a/qpid/cpp/src/qpid/framing/Proxy.cpp b/qpid/cpp/src/qpid/framing/Proxy.cpp new file mode 100644 index 0000000000..452fb13b01 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Proxy.cpp @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/Proxy.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace framing { + +Proxy::Proxy(FrameHandler& h) : out(&h), sync(false) {} + +Proxy::~Proxy() {} + +void Proxy::send(const AMQBody& b) { + if (sync) { + const AMQMethodBody* m = dynamic_cast<const AMQMethodBody*>(&b); + if (m) m->setSync(sync); + } + AMQFrame f(b); + out->handle(f); +} + +ProtocolVersion Proxy::getVersion() const { + return ProtocolVersion(); +} + +FrameHandler& Proxy::getHandler() { return *out; } + +void Proxy::setHandler(FrameHandler& f) { out=&f; } + +Proxy::ScopedSync::ScopedSync(Proxy& p) : proxy(p) { proxy.sync = true; } +Proxy::ScopedSync::~ScopedSync() { proxy.sync = false; } + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h new file mode 100644 index 0000000000..0884e9cbd2 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Proxy.h @@ -0,0 +1,64 @@ +#ifndef _framing_Proxy_h +#define _framing_Proxy_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ProtocolVersion.h" + +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class AMQBody; + +/** + * Base class for proxies. + */ +class Proxy +{ + public: + class ScopedSync + { + Proxy& proxy; + public: + QPID_COMMON_EXTERN ScopedSync(Proxy& p); + QPID_COMMON_EXTERN ~ScopedSync(); + }; + + QPID_COMMON_EXTERN Proxy(FrameHandler& h); + QPID_COMMON_EXTERN virtual ~Proxy(); + + QPID_COMMON_EXTERN void send(const AMQBody&); + + QPID_COMMON_EXTERN ProtocolVersion getVersion() const; + + QPID_COMMON_EXTERN FrameHandler& getHandler(); + QPID_COMMON_EXTERN void setHandler(FrameHandler&); + private: + FrameHandler* out; + bool sync; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Proxy_h*/ diff --git a/qpid/cpp/src/qpid/framing/ResizableBuffer.h b/qpid/cpp/src/qpid/framing/ResizableBuffer.h new file mode 100644 index 0000000000..0abc5ba7f4 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ResizableBuffer.h @@ -0,0 +1,60 @@ +#ifndef QPID_FRAMING_RESIZABLEBUFFER_H +#define QPID_FRAMING_RESIZABLEBUFFER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Buffer.h" +#include <vector> + +namespace qpid { +namespace framing { + +/** + * A buffer that maintains its own storage and can be resized, + * keeping any data already written to the buffer. + */ +class ResizableBuffer : public Buffer +{ + public: + ResizableBuffer(size_t initialSize) : store(initialSize) { + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + } + + void resize(size_t newSize) { + size_t oldPos = getPosition(); + store.resize(newSize); + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + setPosition(oldPos); + } + + /** Make sure at least n bytes are available */ + void makeAvailable(size_t n) { + if (n > available()) + resize(getSize() + n - available()); + } + + private: + std::vector<char> store; +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_RESIZABLEBUFFER_H*/ diff --git a/qpid/cpp/src/qpid/framing/SendContent.cpp b/qpid/cpp/src/qpid/framing/SendContent.cpp new file mode 100644 index 0000000000..04b60396da --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SendContent.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SendContent.h" + +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t mfs, uint efc) : handler(h), + maxFrameSize(mfs), + expectedFrameCount(efc), frameCount(0) {} + +void qpid::framing::SendContent::operator()(const AMQFrame& f) +{ + bool first = frameCount == 0; + bool last = ++frameCount == expectedFrameCount; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + const AMQContentBody* body(f.castBody<AMQContentBody>()); + if (body->encodedSize() > maxContentSize) { + uint32_t offset = 0; + for (int chunk = body->encodedSize() / maxContentSize; chunk > 0; chunk--) { + sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->encodedSize()); + offset += maxContentSize; + } + uint32_t remainder = body->encodedSize() % maxContentSize; + if (remainder) { + sendFragment(*body, offset, remainder, first && offset == 0, last); + } + } else { + AMQFrame copy(f); + setFlags(copy, first, last); + handler.handle(copy); + } +} + +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const +{ + AMQFrame fragment((AMQContentBody(body.getData().substr(offset, size)))); + setFlags(fragment, first, last); + handler.handle(fragment); +} + +void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const +{ + f.setBof(false); + f.setBos(first); + f.setEof(true);//content is always the last segment + f.setEos(last); +} + diff --git a/qpid/cpp/src/qpid/framing/SendContent.h b/qpid/cpp/src/qpid/framing/SendContent.h new file mode 100644 index 0000000000..1c464b9c8b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SendContent.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/CommonImportExport.h" + +#ifndef _SendContent_ +#define _SendContent_ + +namespace qpid { +namespace framing { + +/** + * Functor that sends frame to handler, refragmenting if + * necessary. Currently only works on content frames but this could be + * changed once we support multi-frame segments in general. + */ +class SendContent +{ + FrameHandler& handler; + const uint16_t maxFrameSize; + uint expectedFrameCount; + uint frameCount; + + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const; + void setFlags(AMQFrame& f, bool first, bool last) const; +public: + QPID_COMMON_EXTERN SendContent(FrameHandler& _handler, uint16_t _maxFrameSize, uint frameCount); + QPID_COMMON_EXTERN void operator()(const AMQFrame& f); +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp new file mode 100644 index 0000000000..41cb236629 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/Buffer.h" +#include <ostream> + +using qpid::framing::SequenceNumber; +using qpid::framing::Buffer; + +void SequenceNumber::encode(Buffer& buffer) const +{ + buffer.putLong(value); +} + +void SequenceNumber::decode(Buffer& buffer) +{ + value = buffer.getLong(); +} + +uint32_t SequenceNumber::encodedSize() const { + return 4; +} + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { + return o << n.getValue(); +} + +}} diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp new file mode 100644 index 0000000000..e9d78f3c17 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumberSet.h" + +using namespace qpid::framing; + +void SequenceNumberSet::encode(Buffer& buffer) const +{ + buffer.putShort(size() * 4); + for (const_iterator i = begin(); i != end(); i++) { + buffer.putLong(i->getValue()); + } +} + +void SequenceNumberSet::decode(Buffer& buffer) +{ + clear(); + uint16_t count = (buffer.getShort() / 4); + for (uint16_t i = 0; i < count; i++) { + push_back(SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceNumberSet::encodedSize() const +{ + return 2 /*count*/ + (size() * 4); +} + +SequenceNumberSet SequenceNumberSet::condense() const +{ + SequenceNumberSet result; + const_iterator last = end(); + const_iterator start = end(); + for (const_iterator i = begin(); i != end(); i++) { + if (start == end()) { + start = i; + } else if (*i - *last > 1) { + result.push_back(*start); + result.push_back(*last); + start = i; + } + last = i; + } + if (start != end()) { + result.push_back(*start); + result.push_back(*last); + } + return result; +} + +void SequenceNumberSet::addRange(const SequenceNumber& start, const SequenceNumber& end) +{ + push_back(start); + push_back(end); +} + +namespace qpid{ +namespace framing{ + +std::ostream& operator<<(std::ostream& out, const SequenceNumberSet& set) { + out << "{"; + for (SequenceNumberSet::const_iterator i = set.begin(); i != set.end(); i++) { + if (i != set.begin()) out << ", "; + out << (i->getValue()); + } + out << "}"; + return out; +} + +} +} diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.h b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h new file mode 100644 index 0000000000..c8356c8163 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceNumberSet_h +#define _framing_SequenceNumberSet_h + +#include <ostream> +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/InlineVector.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class SequenceNumberSet : public InlineVector<SequenceNumber, 2> +{ + typedef InlineVector<SequenceNumber, 2> Base; +public: + typedef Base::const_iterator const_iterator; + typedef Base::iterator iterator; + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t encodedSize() const; + QPID_COMMON_EXTERN SequenceNumberSet condense() const; + QPID_COMMON_EXTERN void addRange(const SequenceNumber& start, const SequenceNumber& end); + + template <class T> + void processRanges(T& t) const + { + if (size() % 2) { //must be even number + throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); + } + + for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { + SequenceNumber first = *(i); + SequenceNumber last = *(++i); + t(first, last); + } + } + + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp new file mode 100644 index 0000000000..72fcd8a9e2 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +using namespace qpid::framing; +using std::max; +using std::min; + +namespace qpid { +namespace framing { + +namespace { +//each range contains 2 numbers, 4 bytes each +uint16_t RANGE_SIZE = 2 * 4; +} + +void SequenceSet::encode(Buffer& buffer) const +{ + buffer.putShort(rangesSize() * RANGE_SIZE); + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) { + buffer.putLong(i->first().getValue()); + buffer.putLong(i->last().getValue()); + } +} + +void SequenceSet::decode(Buffer& buffer) +{ + clear(); + uint16_t size = buffer.getShort(); + uint16_t count = size / RANGE_SIZE;//number of ranges + if (size % RANGE_SIZE) + throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); + + for (uint16_t i = 0; i < count; i++) { + add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceSet::encodedSize() const { + return 2 /*size field*/ + (rangesSize() * RANGE_SIZE); +} + +bool SequenceSet::contains(const SequenceNumber& s) const { + return RangeSet<SequenceNumber>::contains(s); +} + +void SequenceSet::add(const SequenceNumber& s) { *this += s; } + +void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& finish) { + *this += Range<SequenceNumber>::makeClosed(std::min(start,finish), std::max(start, finish)); +} + +void SequenceSet::add(const SequenceSet& set) { *this += set; } + +void SequenceSet::remove(const SequenceSet& set) { *this -= set; } + +void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& finish) { + *this -= Range<SequenceNumber>::makeClosed(std::min(start,finish), std::max(start, finish)); +} + +void SequenceSet::remove(const SequenceNumber& s) { *this -= s; } + + +struct RangePrinter { + std::ostream& out; + RangePrinter(std::ostream& o) : out(o) {} + void operator()(SequenceNumber i, SequenceNumber j) const { + out << "[" << i.getValue() << "," << j.getValue() << "] "; + } +}; + +std::ostream& operator<<(std::ostream& o, const SequenceSet& s) { + RangePrinter print(o); + o << "{ "; + s.for_each(print); + return o << "}"; +} + +}} // namespace qpid::framing + diff --git a/qpid/cpp/src/qpid/framing/TemplateVisitor.h b/qpid/cpp/src/qpid/framing/TemplateVisitor.h new file mode 100644 index 0000000000..d6d59603f7 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TemplateVisitor.h @@ -0,0 +1,89 @@ +#ifndef QPID_FRAMING_TEMPLATEVISITOR_H +#define QPID_FRAMING_TEMPLATEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/mpl/fold.hpp> +#include <boost/utility/value_init.hpp> + +namespace qpid { +namespace framing { + +/** + * Metafunction to generate a visitor class derived from Base, with a + * visit for each type in TypeList calling functor F. TypeList may be + * any boost::mpl type collection e.g. mpl::list. + * + * Generated class is: TemplateVisitor<Base, F, TypeList>::type + * + * @see make_visitor + */ +template <class VisitTemplate, class TypeList, class F> +class TemplateVisitor +{ + struct Base : public VisitorBase { + F action; + Base(F f) : action(f) {} + using VisitorBase::visit; + }; + + template <class B, class T> struct Visit : public B { + Visit(F action) : B(action) {} + using B::visit; + void visit(const T& body) { action(body); } + }; + + typedef typename boost::mpl::fold< + TypeList, Base, Visit<boost::mpl::placeholders::_1, + boost::mpl::placeholders::_2> + >::type type; +}; + +/** + * Construct a TemplateVisitor to perform the given action, + * for example: + * @code + */ +template <class VisitorBase, class TypeList, class F> +TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) { + return TemplateVisitor<VisitorBase,TypeList,F>::type(action); +}; + +/** + * For method body classes in TypeList, invoke the corresponding function + * on Target and return true. For other body types return false. + */ +template <class TypeList, class Target> +bool invoke(const AMQBody& body, Target& target) { + typename InvokeVisitor<TypeList, Target>::type v(target); + body.accept(v); + return v.target; +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/ + +}} // namespace qpid::framing + + + +#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/ diff --git a/qpid/cpp/src/qpid/framing/TransferContent.cpp b/qpid/cpp/src/qpid/framing/TransferContent.cpp new file mode 100644 index 0000000000..837d7d346a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TransferContent.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace framing { + +TransferContent::TransferContent(const std::string& data, const std::string& key) { + setData(data); + if (!key.empty()) getDeliveryProperties().setRoutingKey(key); +} + + +AMQHeaderBody TransferContent::getHeader() const +{ + return header; +} + +const std::string& TransferContent::getData() const { + return data; +} + +std::string& TransferContent::getData() { + return data; +} + +void TransferContent::setData(const std::string& _data) +{ + data = _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +void TransferContent::appendData(const std::string& _data) +{ + data += _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +MessageProperties& TransferContent::getMessageProperties() +{ + return *header.get<MessageProperties>(true); +} + +DeliveryProperties& TransferContent::getDeliveryProperties() +{ + return *header.get<DeliveryProperties>(true); +} + +void TransferContent::populate(const FrameSet& frameset) +{ + const AMQHeaderBody* h = frameset.getHeaders(); + if (h) { + header = *h; + } + frameset.getContent(data); +} + +const MessageProperties& TransferContent::getMessageProperties() const +{ + const MessageProperties* props = header.get<MessageProperties>(); + if (!props) throw Exception("No message properties."); + return *props; +} + +const DeliveryProperties& TransferContent::getDeliveryProperties() const +{ + const DeliveryProperties* props = header.get<DeliveryProperties>(); + if (!props) throw Exception("No message properties."); + return *props; +} + +bool TransferContent::hasMessageProperties() const +{ + return header.get<MessageProperties>(); +} + +bool TransferContent::hasDeliveryProperties() const +{ + return header.get<DeliveryProperties>(); +} + + +}} diff --git a/qpid/cpp/src/qpid/framing/TransferContent.h b/qpid/cpp/src/qpid/framing/TransferContent.h new file mode 100644 index 0000000000..9a698a1823 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TransferContent.h @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransferContent_ +#define _TransferContent_ + +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/Exception.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +/** Message content */ +class QPID_COMMON_CLASS_EXTERN TransferContent : public MethodContent +{ + AMQHeaderBody header; + std::string data; +public: + QPID_COMMON_EXTERN TransferContent(const std::string& data = std::string(), const std::string& key=std::string()); + + ///@internal + QPID_COMMON_EXTERN AMQHeaderBody getHeader() const; + + QPID_COMMON_EXTERN void setData(const std::string&); + QPID_COMMON_EXTERN const std::string& getData() const; + QPID_COMMON_EXTERN std::string& getData(); + + QPID_COMMON_EXTERN void appendData(const std::string&); + + QPID_COMMON_EXTERN bool hasMessageProperties() const; + QPID_COMMON_EXTERN MessageProperties& getMessageProperties(); + QPID_COMMON_EXTERN const MessageProperties& getMessageProperties() const; + + QPID_COMMON_EXTERN bool hasDeliveryProperties() const; + QPID_COMMON_EXTERN DeliveryProperties& getDeliveryProperties(); + QPID_COMMON_EXTERN const DeliveryProperties& getDeliveryProperties() const; + + ///@internal + QPID_COMMON_EXTERN void populate(const FrameSet& frameset); +}; + +}} +#endif diff --git a/qpid/cpp/src/qpid/framing/TypeFilter.h b/qpid/cpp/src/qpid/framing/TypeFilter.h new file mode 100644 index 0000000000..d1c42de583 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TypeFilter.h @@ -0,0 +1,51 @@ +#ifndef QPID_FRAMING_TYPEFILTER_H +#define QPID_FRAMING_TYPEFILTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace framing { + +/** + * Predicate that selects frames by type + */ +template <uint8_t Type> +struct TypeFilter { + bool operator()(const AMQFrame& f) const { + return f.getBody()->type() == Type; + } +}; + +template <uint8_t T1, uint8_t T2> +struct TypeFilter2 { + bool operator()(const AMQFrame& f) const { + return f.getBody()->type() == T1 || f.getBody()->type() == T2; + } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_TYPEFILTER_H*/ diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp new file mode 100644 index 0000000000..945c0a4d24 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/Uuid.h" + +#include "qpid/sys/uuid.h" +#include "qpid/Exception.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +using namespace std; + +static const size_t UNPARSED_SIZE=36; + +Uuid::Uuid(bool unique) { + if (unique) { + generate(); + } else { + clear(); + } +} + +Uuid::Uuid(const uint8_t* data) { + assign(data); +} + +void Uuid::assign(const uint8_t* data) { + // This const cast is for Solaris which has a + // uuid_copy that takes a non const 2nd argument + uuid_copy(c_array(), const_cast<uint8_t*>(data)); +} + +void Uuid::generate() { + uuid_generate(c_array()); +} + +void Uuid::clear() { + uuid_clear(c_array()); +} + +// Force int 0/!0 to false/true; avoids compile warnings. +bool Uuid::isNull() const { + return !!uuid_is_null(data()); +} + +void Uuid::encode(Buffer& buf) const { + buf.putRawData(data(), size()); +} + +void Uuid::decode(Buffer& buf) { + if (buf.available() < size()) + throw IllegalArgumentException(QPID_MSG("Not enough data for UUID.")); + buf.getRawData(c_array(), size()); +} + +ostream& operator<<(ostream& out, Uuid uuid) { + char unparsed[UNPARSED_SIZE + 1]; + uuid_unparse(uuid.data(), unparsed); + return out << unparsed; +} + +istream& operator>>(istream& in, Uuid& uuid) { + char unparsed[UNPARSED_SIZE + 1] = {0}; + in.get(unparsed, sizeof(unparsed)); + if (!in.fail()) { + if (uuid_parse(unparsed, uuid.c_array()) != 0) + in.setstate(ios::failbit); + } + return in; +} + +std::string Uuid::str() const { + std::ostringstream os; + os << *this; + return os.str(); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Visitor.h b/qpid/cpp/src/qpid/framing/Visitor.h new file mode 100644 index 0000000000..759ee65914 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Visitor.h @@ -0,0 +1,92 @@ +#ifndef QPID_FRAMING_VISITOR_H +#define QPID_FRAMING_VISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/mpl/vector.hpp> +#include <boost/type_traits/remove_reference.hpp> +#include <boost/preprocessor/seq/for_each.hpp> + +namespace qpid { +namespace framing { + +/** @file Generic visitor pattern. */ + +/** visit() interface for type T (optional return type R, default is void.) + * To create a visitor for a set of types T1, T2 ... do this: + * struct MyVisitor : public Visit<T1>, public Visit<T2> ... {}; + *@param T Type to visit. This must be forward declared, and need not be defined. + */ +template <class T, class R=void> struct Visit { + typedef R ReturnType; + typedef T VisitType; + + virtual ~Visit() {} + virtual R visit(T&) = 0; +}; + + +#define QPID_VISITOR_DECL(_1,_2,T) class T; + +#define QPID_VISITOR_BASE(_1,_2,T) , public ::qpid::framing::Visit<T> + +/** Convenience macro to generate a visitor interface. + * QPID_VISITOR(MyVisitor,(A)(B)(C)); is equivalent to: + * @code + * class A; class B; class C; + * class MyVisitor : public Visit<A> , public Visit<B> , public Visit<C> {}; + * @endcode + * @param visitor name of the generated visitor class. + * @param bases a sequence of visitable types in the form (T1)(T2)... + * Any parenthesized notations are due to quirks of the preprocesser. + */ +#define QPID_VISITOR(visitor,types) \ + BOOST_PP_SEQ_FOR_EACH(QPID_VISITOR_DECL, _, types) \ + class visitor : public ::qpid::framing::Visit<BOOST_PP_SEQ_HEAD(types)> \ + BOOST_PP_SEQ_FOR_EACH(QPID_VISITOR_BASE, _, BOOST_PP_SEQ_TAIL(types)) \ + {} + +/** The root class for the hierarchy of objects visitable by Visitor V. + * Defines virtual accept(). + */ +template <class V, class R=void> +struct VisitableRoot { + typedef V VisitorType; + typedef R ReturnType; + virtual ~VisitableRoot() {} + virtual R accept(V& v) = 0; +}; + +/** The base class for concrete visitable classes. + * Implements accept(). + * @param T type of visitable class (CRTP). + * @param Base base class to inherit from. + */ +template <class T, class Base> +struct Visitable : public Base { + void accept(typename Base::VisitorType& v) { + static_cast<Visit<T>& >(v).visit(static_cast<T&>(*this)); + } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_VISITOR_H*/ diff --git a/qpid/cpp/src/qpid/framing/amqp_framing.h b/qpid/cpp/src/qpid/framing/amqp_framing.h new file mode 100644 index 0000000000..3a8b39afb5 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/amqp_framing.h @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/BodyHandler.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ProtocolVersion.h" diff --git a/qpid/cpp/src/qpid/framing/frame_functors.h b/qpid/cpp/src/qpid/framing/frame_functors.h new file mode 100644 index 0000000000..d2064d6a57 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/frame_functors.h @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <ostream> +#include <iostream> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" + +#ifndef _frame_functors_ +#define _frame_functors_ + +namespace qpid { +namespace framing { + +class SumFrameSize +{ + uint64_t size; +public: + SumFrameSize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.encodedSize(); } + uint64_t getSize() { return size; } +}; + +class SumBodySize +{ + uint64_t size; +public: + SumBodySize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.getBody()->encodedSize(); } + uint64_t getSize() { return size; } +}; + +class Count +{ + uint count; +public: + Count() : count(0) {} + void operator()(const AMQFrame&) { count++; } + uint getCount() { return count; } +}; + +class EncodeFrame +{ + Buffer& buffer; +public: + EncodeFrame(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.encode(buffer); } +}; + +class EncodeBody +{ + Buffer& buffer; +public: + EncodeBody(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); } +}; + +/** + * Sends to the specified handler a copy of the frame it is applied to. + */ +class Relay +{ + FrameHandler& handler; + +public: + Relay(FrameHandler& h) : handler(h) {} + + void operator()(const AMQFrame& f) + { + AMQFrame copy(f); + handler.handle(copy); + } +}; + +class Print +{ + std::ostream& out; +public: + Print(std::ostream& o) : out(o) {} + + void operator()(const AMQFrame& f) + { + out << f << std::endl; + } +}; + +class MarkLastSegment +{ +public: + void operator()(AMQFrame& f) const { f.setEof(true); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/variant.h b/qpid/cpp/src/qpid/framing/variant.h new file mode 100644 index 0000000000..8e13063385 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/variant.h @@ -0,0 +1,91 @@ +#ifndef QPID_FRAMING_VARIANT_H +#define QPID_FRAMING_VARIANT_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tools for using boost::variant. */ + + +#include <boost/variant.hpp> + +namespace qpid { +namespace framing { +class Buffer; + +/** boost::static_visitor that throws an exception if variant contains a blank. + * Subclasses need to have a using() declaration, which can be generated + * with QPID_USING_NOBLANK(R) + */ +template <class R=void> +struct NoBlankVisitor : public boost::static_visitor<R> { + R foundBlank() const { + assert(0); + throw Exception(QPID_MSG("Invalid variant value.")); + } + R operator()(const boost::blank&) const { return foundBlank(); } + R operator()(boost::blank&) const { return foundBlank(); } +}; + + +}} // qpid::framing + + +/** Generate a using statement, needed in visitors inheriting NoBlankVisitor + * @param R return type. + */ +#define QPID_USING_NOBLANK(R) using ::qpid::framing::NoBlankVisitor<R>::operator() + +namespace qpid { +namespace framing { + +/** Convert the variant value to type R. */ +template <class R> struct ConvertVisitor : public NoBlankVisitor<R> { + QPID_USING_NOBLANK(R); + template <class T> R operator()(T& t) const { return t; } +}; + +/** Convert the address of variant value to type R. */ +template <class R> struct AddressVisitor : public NoBlankVisitor<R> { + QPID_USING_NOBLANK(R); + template <class T> R operator()(T& t) const { return &t; } +}; + +/** Apply a visitor to the nested variant.*/ +template<class V> +struct ApplyVisitor : public NoBlankVisitor<typename V::result_type> { + QPID_USING_NOBLANK(typename V::result_type); + const V& visitor; + ApplyVisitor(const V& v) : visitor(v) {} + template <class T> typename V::result_type operator()(T& t) const { + return boost::apply_visitor(visitor, t); + } +}; + +/** Convenience function to construct and apply an ApplyVisitor */ +template <class Visitor, class Visitable> +typename Visitor::result_type applyApplyVisitor(const Visitor& visitor, Visitable& visitable) { + return boost::apply_visitor(ApplyVisitor<Visitor>(visitor), visitable); +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_VARIANT_H*/ |