diff options
Diffstat (limited to 'qpid/cpp/src/qpid/framing')
74 files changed, 6497 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/framing/AMQBody.cpp b/qpid/cpp/src/qpid/framing/AMQBody.cpp new file mode 100644 index 0000000000..b3eeae0615 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQBody.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include <iostream> + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQBody& body) +{ + body.print(out); + return out; +} + +AMQBody::~AMQBody() {} + +namespace { +struct MatchBodies : public AMQBodyConstVisitor { + const AMQBody& body; + bool match; + + MatchBodies(const AMQBody& b) : body(b), match(false) {} + virtual ~MatchBodies() {} + + virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); } + virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); } + virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); } + virtual void visit(const AMQMethodBody& x) { + const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body); + match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId()); + } +}; + +} +bool AMQBody::match(const AMQBody& a, const AMQBody& b) { + MatchBodies matcher(a); + b.accept(matcher); + return matcher.match; +} + +}} // namespace diff --git a/qpid/cpp/src/qpid/framing/AMQBody.h b/qpid/cpp/src/qpid/framing/AMQBody.h new file mode 100644 index 0000000000..56d1d250c1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQBody.h @@ -0,0 +1,86 @@ +#ifndef QPID_FRAMING_AMQBODY_H +#define QPID_FRAMING_AMQBODY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/RefCounted.h" +#include "qpid/framing/BodyFactory.h" +#include <boost/intrusive_ptr.hpp> +#include <ostream> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class Buffer; + +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +struct AMQBodyConstVisitor { + virtual ~AMQBodyConstVisitor() {} + virtual void visit(const AMQHeaderBody&) = 0; + virtual void visit(const AMQContentBody&) = 0; + virtual void visit(const AMQHeartbeatBody&) = 0; + virtual void visit(const AMQMethodBody&) = 0; +}; + +class QPID_COMMON_CLASS_EXTERN AMQBody : public RefCounted { + public: + AMQBody() {} + QPID_COMMON_EXTERN virtual ~AMQBody(); + + // Make AMQBody copyable even though RefCounted. + AMQBody(const AMQBody&) : RefCounted() {} + AMQBody& operator=(const AMQBody&) { return *this; } + + virtual uint8_t type() const = 0; + + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t=0) = 0; + virtual uint32_t encodedSize() const = 0; + + virtual void print(std::ostream& out) const = 0; + virtual void accept(AMQBodyConstVisitor&) const = 0; + + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ); + virtual boost::intrusive_ptr<AMQBody> clone() const = 0; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; + +enum BodyTypes { + METHOD_BODY = 1, + HEADER_BODY = 2, + CONTENT_BODY = 3, + HEARTBEAT_BODY = 8 +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_AMQBODY_H*/ diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.cpp b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp new file mode 100644 index 0000000000..18f6994f8f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQContentBody.h" +#include <iostream> + +qpid::framing::AMQContentBody::AMQContentBody(){ +} + +qpid::framing::AMQContentBody::AMQContentBody(const std::string& _data) : data(_data){ +} + +uint32_t qpid::framing::AMQContentBody::encodedSize() const{ + return data.size(); +} +void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{ + buffer.putRawData(data); +} +void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ + buffer.getRawData(data, _size); +} + +void qpid::framing::AMQContentBody::print(std::ostream& out) const +{ + out << "content (" << encodedSize() << " bytes)"; + const size_t max = 32; + out << " " << data.substr(0, max); + if (data.size() > max) out << "..."; +} diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.h b/qpid/cpp/src/qpid/framing/AMQContentBody.h new file mode 100644 index 0000000000..148b293a2f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/CommonImportExport.h" + +#ifndef _AMQContentBody_ +#define _AMQContentBody_ + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQContentBody : public AMQBody +{ + std::string data; + +public: + QPID_COMMON_EXTERN AMQContentBody(); + QPID_COMMON_EXTERN AMQContentBody(const std::string& data); + inline virtual ~AMQContentBody(){} + inline uint8_t type() const { return CONTENT_BODY; }; + inline const std::string& getData() const { return data; } + inline std::string& getData() { return data; } + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size); + QPID_COMMON_EXTERN void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQDataBlock.h b/qpid/cpp/src/qpid/framing/AMQDataBlock.h new file mode 100644 index 0000000000..7f0d0dc2b5 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQDataBlock.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Buffer.h" + +#ifndef _AMQDataBlock_ +#define _AMQDataBlock_ + +namespace qpid { +namespace framing { + +class AMQDataBlock +{ +public: + virtual ~AMQDataBlock() {} + virtual void encode(Buffer& buffer) const = 0; + virtual bool decode(Buffer& buffer) = 0; + virtual uint32_t encodedSize() const = 0; +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp new file mode 100644 index 0000000000..5e065d598c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQFrame.h" + +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/BodyFactory.h" +#include "qpid/framing/MethodBodyFactory.h" +#include "qpid/Msg.h" + +#include <boost/format.hpp> +#include <iostream> + +namespace qpid { +namespace framing { + +void AMQFrame::init() { bof = eof = bos = eos = true; subchannel=0; channel=0; } + +AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); } + +AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); } + +AMQFrame::~AMQFrame() {} + +void AMQFrame::setMethod(ClassId c, MethodId m) { body = MethodBodyFactory::create(c,m); } + +uint32_t AMQFrame::encodedSize() const { + uint32_t size = frameOverhead() + body->encodedSize(); + if (body->getMethod()) + size += sizeof(ClassId)+sizeof(MethodId); + return size; +} + +uint32_t AMQFrame::frameOverhead() { + return 12 /*frame header*/; +} + +uint16_t AMQFrame::DECODE_SIZE_MIN=4; + +uint16_t AMQFrame::decodeSize(char* data) { + Buffer buf(data+2, DECODE_SIZE_MIN); + return buf.getShort(); +} + +void AMQFrame::encode(Buffer& buffer) const +{ + //set track first (controls on track 0, everything else on 1): + uint8_t track = getBody()->type() ? 1 : 0; + + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); + buffer.putOctet(flags); + buffer.putOctet(getBody()->type()); + buffer.putShort(encodedSize()); + buffer.putOctet(0); + buffer.putOctet(0x0f & track); + buffer.putShort(channel); + buffer.putLong(0); + const AMQMethodBody* method=getMethod(); + if (method) { + buffer.putOctet(method->amqpClassId()); + buffer.putOctet(method->amqpMethodId()); + } + body->encode(buffer); +} + +bool AMQFrame::decode(Buffer& buffer) +{ + if(buffer.available() < frameOverhead()) + return false; + uint32_t pos = buffer.getPosition(); + + uint8_t flags = buffer.getOctet(); + uint8_t framing_version = (flags & 0xc0) >> 6; + if (framing_version != 0) + throw FramingErrorException(QPID_MSG("Framing version unsupported")); + bof = flags & 0x08; + eof = flags & 0x04; + bos = flags & 0x02; + eos = flags & 0x01; + uint8_t type = buffer.getOctet(); + uint16_t frame_size = buffer.getShort(); + if (frame_size < frameOverhead()) + throw FramingErrorException(QPID_MSG("Frame size too small " << frame_size)); + uint8_t reserved1 = buffer.getOctet(); + uint8_t field1 = buffer.getOctet(); + subchannel = field1 & 0x0f; + channel = buffer.getShort(); + (void) buffer.getLong(); // reserved2 + + // Verify that the protocol header meets current spec + // TODO: should we check reserved2 against zero as well? - the + // spec isn't clear + if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) + throw FramingErrorException(QPID_MSG("Reserved bits not zero")); + + // TODO: should no longer care about body size and only pass up + // B,E,b,e flags + uint16_t body_size = frame_size - frameOverhead(); + if (buffer.available() < body_size){ + buffer.setPosition(pos); + return false; + } + + switch(type) + { + case 0://CONTROL + case METHOD_BODY: { + ClassId c = buffer.getOctet(); + MethodId m = buffer.getOctet(); + body = MethodBodyFactory::create(c, m); + break; + } + case HEADER_BODY: body = BodyFactory::create<AMQHeaderBody>(); break; + case CONTENT_BODY: body = BodyFactory::create<AMQContentBody>(); break; + case HEARTBEAT_BODY: body = BodyFactory::create<AMQHeartbeatBody>(); break; + default: + throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type)); + } + body->decode(buffer, body_size); + + return true; +} + +void AMQFrame::cloneBody() +{ + body = body->clone(); +} + +std::ostream& operator<<(std::ostream& out, const AMQFrame& f) +{ + return + out << "Frame[" + << (f.getBof() ? "B" : "") << (f.getEof() ? "E" : "") + << (f.getBos() ? "b" : "") << (f.getEos() ? "e" : "") << "; " + << "channel=" << f.getChannel() << "; " << *f.getBody() + << "]"; +} + + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h new file mode 100644 index 0000000000..19675ce6ff --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -0,0 +1,116 @@ +#ifndef _AMQFrame_ +#define _AMQFrame_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQDataBlock.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include "qpid/framing/ProtocolVersion.h" +#include <boost/intrusive_ptr.hpp> +#include <boost/cast.hpp> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock +{ + public: + QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0); + QPID_COMMON_EXTERN AMQFrame(const AMQBody& b); + QPID_COMMON_EXTERN ~AMQFrame(); + + ChannelId getChannel() const { return channel; } + void setChannel(ChannelId c) { channel = c; } + + AMQBody* getBody() const { return body.get(); } + + AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; } + const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; } + + void setMethod(ClassId c, MethodId m); + + template <class T> T* castBody() { + return boost::polymorphic_downcast<T*>(getBody()); + } + + template <class T> const T* castBody() const { + return boost::polymorphic_downcast<const T*>(getBody()); + } + + /** + * Take a deep copy of the body currently referenced + */ + QPID_COMMON_EXTERN void cloneBody(); + + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN bool decode(Buffer& buffer); + QPID_COMMON_EXTERN uint32_t encodedSize() const; + + // 0-10 terminology: first/last frame (in segment) first/last segment (in assembly) + + bool isFirstSegment() const { return bof; } + bool isLastSegment() const { return eof; } + bool isFirstFrame() const { return bos; } + bool isLastFrame() const { return eos; } + + void setFirstSegment(bool set=true) { bof = set; } + void setLastSegment(bool set=true) { eof = set; } + void setFirstFrame(bool set=true) { bos = set; } + void setLastFrame(bool set=true) { eos = set; } + + // 0-9 terminology: beginning/end of frameset, beginning/end of segment. + + bool getBof() const { return bof; } + void setBof(bool isBof) { bof = isBof; } + bool getEof() const { return eof; } + void setEof(bool isEof) { eof = isEof; } + + bool getBos() const { return bos; } + void setBos(bool isBos) { bos = isBos; } + bool getEos() const { return eos; } + void setEos(bool isEos) { eos = isEos; } + + static uint16_t DECODE_SIZE_MIN; + QPID_COMMON_EXTERN static uint32_t frameOverhead(); + /** Must point to at least DECODE_SIZE_MIN bytes of data */ + static uint16_t decodeSize(char* data); + + private: + void init(); + + boost::intrusive_ptr<AMQBody> body; + uint16_t channel : 16; + uint8_t subchannel : 8; + bool bof : 1; + bool eof : 1; + bool bos : 1; + bool eos : 1; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AMQFrame&); + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp new file mode 100644 index 0000000000..14218f1b45 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +uint32_t qpid::framing::AMQHeaderBody::encodedSize() const { + return properties.encodedSize(); +} + +void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { + properties.encode(buffer); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size) { + uint32_t limit = buffer.available() - size; + while (buffer.available() > limit + 2) { + uint32_t len = buffer.getLong(); + uint16_t type = buffer.getShort(); + if (!properties.decode(buffer, len, type)) { + // TODO: should just skip & keep for later dispatch. + throw Exception(QPID_MSG("Unexpected property type: " << type)); + } + } +} + +uint64_t qpid::framing::AMQHeaderBody::getContentLength() const +{ + const MessageProperties* mProps = get<MessageProperties>(); + if (mProps) + return mProps->getContentLength(); + return 0; +} + +void qpid::framing::AMQHeaderBody::print(std::ostream& out) const +{ + out << "header (" << encodedSize() << " bytes)"; + out << "; properties={"; + properties.print(out); + out << "}"; +} + +void qpid::framing::AMQHeaderBody::accept(AMQBodyConstVisitor& v) const { + v.visit(*this); +} diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h new file mode 100644 index 0000000000..452154eb5c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -0,0 +1,113 @@ +#ifndef QPID_FRAMING_AMQHEADERBODY_H +#define QPID_FRAMING_AMQHEADERBODY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/CommonImportExport.h" +#include <iostream> + +#include <boost/optional.hpp> + + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody +{ + template <class T> struct OptProps { boost::optional<T> props; }; + template <class Base, class T> + struct PropSet : public Base, public OptProps<T> { + uint32_t encodedSize() const { + const boost::optional<T>& p=this->OptProps<T>::props; + return (p ? p->encodedSize() : 0) + Base::encodedSize(); + } + void encode(Buffer& buffer) const { + const boost::optional<T>& p=this->OptProps<T>::props; + if (p) p->encode(buffer); + Base::encode(buffer); + } + bool decode(Buffer& buffer, uint32_t size, uint16_t type) { + boost::optional<T>& p=this->OptProps<T>::props; + if (type == T::TYPE) { + p=T(); + p->decodeStructBody(buffer, size); + return true; + } + else + return Base::decode(buffer, size, type); + } + void print(std::ostream& out) const { + const boost::optional<T>& p=this->OptProps<T>::props; + if (p) out << *p; + Base::print(out); + } + }; + + struct Empty { + uint32_t encodedSize() const { return 0; } + void encode(Buffer&) const {}; + bool decode(Buffer&, uint32_t, uint16_t) const { return false; }; + void print(std::ostream&) const {} + }; + + // Could use boost::mpl::fold to construct a larger set. + typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties; + + Properties properties; + +public: + + inline uint8_t type() const { return HEADER_BODY; } + + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size); + QPID_COMMON_EXTERN uint64_t getContentLength() const; + QPID_COMMON_EXTERN void print(std::ostream& out) const; + QPID_COMMON_EXTERN void accept(AMQBodyConstVisitor&) const; + + template <class T> T* get(bool create) { + boost::optional<T>& p=properties.OptProps<T>::props; + if (create && !p) p=T(); + return p.get_ptr(); + } + + template <class T> const T* get() const { + return properties.OptProps<T>::props.get_ptr(); + } + + template <class T> void erase() { + properties.OptProps<T>::props.reset(); + } + + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +}} + + + +#endif /*!QPID_FRAMING_AMQHEADERBODY_H*/ diff --git a/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp new file mode 100644 index 0000000000..477616221c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQHeartbeatBody.h" +#include <iostream> + +qpid::framing::AMQHeartbeatBody::~AMQHeartbeatBody() {} + +void qpid::framing::AMQHeartbeatBody::print(std::ostream& out) const { + out << "heartbeat"; +} diff --git a/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h new file mode 100644 index 0000000000..19ac2be013 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQHeartbeatBody.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/CommonImportExport.h" + +#ifndef _AMQHeartbeatBody_ +#define _AMQHeartbeatBody_ + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN AMQHeartbeatBody : public AMQBody +{ +public: + QPID_COMMON_EXTERN virtual ~AMQHeartbeatBody(); + inline uint32_t encodedSize() const { return 0; } + inline uint8_t type() const { return HEARTBEAT_BODY; } + inline void encode(Buffer& ) const {} + inline void decode(Buffer& , uint32_t /*size*/) {} + QPID_COMMON_EXTERN virtual void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); } +}; + +} +} + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp new file mode 100644 index 0000000000..594af4c6dc --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" + +namespace qpid { +namespace framing { + +AMQMethodBody::~AMQMethodBody() {} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.h b/qpid/cpp/src/qpid/framing/AMQMethodBody.h new file mode 100644 index 0000000000..c634180712 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.h @@ -0,0 +1,72 @@ +#ifndef _AMQMethodBody_ +#define _AMQMethodBody_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/CommonImportExport.h" + +#include <boost/shared_ptr.hpp> +#include <ostream> +#include <assert.h> + +namespace qpid { +namespace framing { + +class Buffer; +class AMQP_ServerOperations; +class MethodBodyConstVisitor; + +class AMQMethodBody : public AMQBody { + public: + AMQMethodBody() {} + QPID_COMMON_EXTERN virtual ~AMQMethodBody(); + + virtual void accept(MethodBodyConstVisitor&) const = 0; + + virtual MethodId amqpMethodId() const = 0; + virtual ClassId amqpClassId() const = 0; + virtual bool isContentBearing() const = 0; + virtual bool resultExpected() const = 0; + virtual bool responseExpected() const = 0; + + template <class T> bool isA() const { + return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; + } + + virtual uint32_t encodedSize() const = 0; + virtual uint8_t type() const { return METHOD_BODY; } + + virtual bool isSync() const { return false; /*only ModelMethods can have the sync flag set*/ } + virtual void setSync(bool) const { /*only ModelMethods can have the sync flag set*/ } + + AMQMethodBody* getMethod() { return this; } + const AMQMethodBody* getMethod() const { return this; } + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h new file mode 100644 index 0000000000..42139c7937 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file used to be auto-generated by Qpid Gentools v.0.1 + * its here temporarily until we get a full solution to multi-version support + */ +#ifndef qpid_framing_highestProtocolVersion__ +#define qpid_framing_highestProtocolVersion__ + +#include "qpid/framing/ProtocolVersion.h" + + +namespace qpid { +namespace framing { + +static ProtocolVersion highestProtocolVersion(0, 10); + +} /* namespace framing */ +} /* namespace qpid */ + +#endif diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp new file mode 100644 index 0000000000..2e6433a82f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AccumulatedAck.h" + +#include <assert.h> +#include <iostream> +#include <boost/bind.hpp> + +using std::list; +using std::max; +using std::min; +using namespace qpid::framing; + +AccumulatedAck::AccumulatedAck(SequenceNumber r) : mark(r) {} + +void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ + assert(first <= last); + if (last < mark) return; + + + Range r(first, last); + bool handled = false; + bool markMerged = false; + list<Range>::iterator merged = ranges.end(); + if (r.mergeable(mark)) { + mark = r.end; + markMerged = true; + handled = true; + } else { + for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { + if (i->merge(r)) { + merged = i; + handled = true; + } else if (r.start < i->start) { + ranges.insert(i, r); + handled = true; + } + } + } + if (!handled) { + ranges.push_back(r); + } else { + while (!ranges.empty() && ranges.front().end <= mark) { + ranges.pop_front(); + } + if (markMerged) { + //new range is incorporated, but may be possible to consolidate + merged = ranges.begin(); + while (merged != ranges.end() && merged->mergeable(mark)) { + mark = merged->end; + merged = ranges.erase(merged); + } + } + if (merged != ranges.end()) { + //consolidate ranges + list<Range>::iterator i = merged; + list<Range>::iterator j = i++; + while (i != ranges.end() && j->merge(*i)) { + j = i++; + } + } + } +} + +void AccumulatedAck::consolidate(){} + +void AccumulatedAck::clear(){ + mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility + ranges.clear(); +} + +bool AccumulatedAck::covers(SequenceNumber tag) const{ + if (tag <= mark) return true; + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(tag)) return true; + } + return false; +} + +void AccumulatedAck::collectRanges(SequenceNumberSet& set) const +{ + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + set.push_back(i->start); + set.push_back(i->end); + } +} + +void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range) +{ + update(mark, cumulative); + range.processRanges(*this); +} + + +bool Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool Range::intersect(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool Range::merge(const Range& r) +{ + if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + + +namespace qpid{ +namespace framing{ + std::ostream& operator<<(std::ostream& out, const Range& r) + { + out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; + return out; + } + + std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) + { + out << "{mark: " << a.mark.getValue() << ", ranges: ("; + for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { + if (i != a.ranges.begin()) out << ", "; + out << *i; + } + out << ")]"; + return out; + } +}} diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.h b/qpid/cpp/src/qpid/framing/AccumulatedAck.h new file mode 100644 index 0000000000..8e241b4ba1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.h @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include <ostream> +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceNumberSet.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { + namespace framing { + + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersect(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + }; + /** + * Keeps an accumulated record of acknowledged messages (by delivery + * tag). + */ + class AccumulatedAck { + public: + /** + * Everything up to this value has been acknowledged. + */ + SequenceNumber mark; + /** + * List of individually acknowledged messages greater than the + * 'mark'. + */ + std::list<Range> ranges; + + QPID_COMMON_EXTERN explicit AccumulatedAck(SequenceNumber r = SequenceNumber()); + QPID_COMMON_EXTERN void update(SequenceNumber firstTag, SequenceNumber lastTag); + QPID_COMMON_EXTERN void consolidate(); + QPID_COMMON_EXTERN void clear(); + QPID_COMMON_EXTERN bool covers(SequenceNumber tag) const; + void collectRanges(SequenceNumberSet& set) const; + QPID_COMMON_EXTERN void update(const SequenceNumber cumulative, const SequenceNumberSet& range); + void operator()(SequenceNumber first, SequenceNumber last) { update(first, last); } + }; + QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const Range&); + QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AccumulatedAck&); + } +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp new file mode 100644 index 0000000000..4b4338f931 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Array.cpp @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" +#include <assert.h> + +namespace qpid { +namespace framing { + +Array::Array() : type(TYPE_CODE_VOID) {} + +Array::Array(TypeCode t) : type(t) {} + +Array::Array(uint8_t t) : type(typeCode(t)) {} + +Array::Array(const std::vector<std::string>& in) +{ + type = TYPE_CODE_STR16; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + ValuePtr value(new Str16Value(*i)); + values.push_back(value); + } +} + +uint32_t Array::encodedSize() const { + //note: size is only included when used as a 'top level' type + uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/); + for(ValueVector::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->getData().encodedSize(); + } + return len; +} + +int Array::count() const { + return values.size(); +} + +std::ostream& operator<<(std::ostream& out, const Array& a) { + out << typeName(a.getType()) << "{"; + for(Array::ValueVector::const_iterator i = a.values.begin(); i != a.values.end(); ++i) { + if (i != a.values.begin()) out << ", "; + (*i)->print(out); + } + return out << "}"; +} + +void Array::encode(Buffer& buffer) const{ + buffer.putLong(encodedSize() - 4);//size added only when array is a top-level type + buffer.putOctet(type); + buffer.putLong(count()); + for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->getData().encode(buffer); + } +} + +void Array::decode(Buffer& buffer){ + values.clear(); + uint32_t size = buffer.getLong();//size added only when array is a top-level type + uint32_t available = buffer.available(); + if (available < size) { + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " + << size << " bytes but only " << available << " available")); + } + if (size) { + type = TypeCode(buffer.getOctet()); + uint32_t count = buffer.getLong(); + + FieldValue dummy; + dummy.setType(type); + available = buffer.available(); + uint32_t elementSize = dummy.getData().encodedSize(); + if (available < count * elementSize) { + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " + << count << " items of " << elementSize + << " bytes each but only " << available << " bytes available")); + } + // Special check to avoid ridiculously long arrays of zero length elements (they must all be the same + // value, but consume broker resources without consuming any on the wire) + if (elementSize == 0 && count > 256) { + throw IllegalArgumentException(QPID_MSG("Too many zero length elements in array: " << count)); + } + + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->setType(type); + value->getData().decode(buffer); + values.push_back(ValuePtr(value)); + } + } +} + + +bool Array::operator==(const Array& x) const { + if (type != x.type) return false; + if (values.size() != x.values.size()) return false; + + for (ValueVector::const_iterator i = values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) { + if (*(i->get()) != *(j->get())) return false; + } + + return true; +} + +void Array::insert(iterator i, ValuePtr value) { + if (type != value->getType()) { + // FIXME aconway 2008-10-31: put meaningful strings in this message. + throw Exception(QPID_MSG("Wrong type of value in Array, expected " << type + << " but found " << TypeCode(value->getType()))); + } + values.insert(i, value); +} + + +} +} diff --git a/qpid/cpp/src/qpid/framing/Array.h b/qpid/cpp/src/qpid/framing/Array.h new file mode 100644 index 0000000000..6254f6271a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Array.h @@ -0,0 +1,99 @@ +#ifndef QPID_FRAMING_ARRAY_H +#define QPID_FRAMING_ARRAY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/TypeCode.h" + +#include <boost/shared_ptr.hpp> + +#include <iostream> +#include <vector> + +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class Buffer; +class FieldValue; + +class QPID_COMMON_CLASS_EXTERN Array +{ + public: + typedef boost::shared_ptr<FieldValue> ValuePtr; + typedef std::vector<ValuePtr> ValueVector; + typedef ValueVector::const_iterator const_iterator; + typedef ValueVector::iterator iterator; + + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer); + + QPID_COMMON_EXTERN int count() const; + QPID_COMMON_EXTERN bool operator==(const Array& other) const; + + QPID_COMMON_EXTERN Array(); + QPID_COMMON_EXTERN Array(TypeCode type); + QPID_COMMON_EXTERN Array(uint8_t type); + //creates a longstr array + QPID_COMMON_EXTERN Array(const std::vector<std::string>& in); + + QPID_COMMON_INLINE_EXTERN TypeCode getType() const { return type; } + + // std collection interface. + QPID_COMMON_INLINE_EXTERN const_iterator begin() const { return values.begin(); } + QPID_COMMON_INLINE_EXTERN const_iterator end() const { return values.end(); } + QPID_COMMON_INLINE_EXTERN iterator begin() { return values.begin(); } + QPID_COMMON_INLINE_EXTERN iterator end(){ return values.end(); } + + QPID_COMMON_INLINE_EXTERN ValuePtr front() const { return values.front(); } + QPID_COMMON_INLINE_EXTERN ValuePtr back() const { return values.back(); } + QPID_COMMON_INLINE_EXTERN size_t size() const { return values.size(); } + + QPID_COMMON_EXTERN void insert(iterator i, ValuePtr value); + QPID_COMMON_INLINE_EXTERN void erase(iterator i) { values.erase(i); } + QPID_COMMON_INLINE_EXTERN void push_back(ValuePtr value) { values.insert(end(), value); } + QPID_COMMON_INLINE_EXTERN void pop_back() { values.pop_back(); } + + // Non-std interface + QPID_COMMON_INLINE_EXTERN void add(ValuePtr value) { push_back(value); } + + // For use in standard algorithms + template <typename R, typename V> + static R get(const V& v) { + return v->template get<R>(); + } + + private: + TypeCode type; + ValueVector values; + + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& out, const Array& body); +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Blob.cpp b/qpid/cpp/src/qpid/framing/Blob.cpp new file mode 100644 index 0000000000..0c8316f3d2 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Blob.cpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Blob.h" + + +namespace qpid { +namespace framing { + +void BlobHelper<void>::destroy(void*) {} + +void BlobHelper<void>::copy(void*, const void*) {} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Blob.h b/qpid/cpp/src/qpid/framing/Blob.h new file mode 100644 index 0000000000..9878d92fe4 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Blob.h @@ -0,0 +1,21 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + diff --git a/qpid/cpp/src/qpid/framing/BodyFactory.h b/qpid/cpp/src/qpid/framing/BodyFactory.h new file mode 100644 index 0000000000..6a8d9b1988 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/BodyFactory.h @@ -0,0 +1,47 @@ +#ifndef QPID_FRAMING_BODYFACTORY_H +#define QPID_FRAMING_BODYFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace framing { + +/** + * Indirect creation of body types to allow centralized changes to + * memory management strategy. + */ +class BodyFactory { + public: + template <class BodyType> static boost::intrusive_ptr<BodyType> create() { + return new BodyType; + } + + template <class BodyType> static boost::intrusive_ptr<BodyType> copy(const BodyType& body) { + return new BodyType(body); + } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_BODYFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp new file mode 100644 index 0000000000..1c4caef046 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -0,0 +1,342 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/Msg.h" +#include <string.h> +#include <boost/format.hpp> + +namespace qpid { + +namespace framing { + +using std::string; + +Buffer::Buffer(char* _data, uint32_t _size) + : size(_size), data(_data), position(0) { +} + +void Buffer::reset(){ + position = 0; +} + +/////////////////////////////////////////////////// + +void Buffer::putOctet(uint8_t i){ + checkAvailable(1); + data[position++] = i; +} + +void Buffer::putShort(uint16_t i){ + checkAvailable(2); + uint16_t b = i; + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); +} + +void Buffer::putLong(uint32_t i){ + checkAvailable(4); + uint32_t b = i; + data[position++] = (uint8_t) (0xFF & (b >> 24)); + data[position++] = (uint8_t) (0xFF & (b >> 16)); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); +} + +void Buffer::putLongLong(uint64_t i){ + uint32_t hi = i >> 32; + uint32_t lo = i; + putLong(hi); + putLong(lo); +} + +void Buffer::putInt8(int8_t i){ + checkAvailable(1); + data[position++] = (uint8_t) i; +} + +void Buffer::putInt16(int16_t i){ + putShort((uint16_t) i); +} + +void Buffer::putInt32(int32_t i){ + putLong((uint32_t) i); +} + +void Buffer::putInt64(int64_t i){ + putLongLong((uint64_t) i); +} + +void Buffer::putFloat(float f){ + union { + uint32_t i; + float f; + } val; + + val.f = f; + putLong (val.i); +} + +void Buffer::putDouble(double f){ + union { + uint64_t i; + double f; + } val; + + val.f = f; + putLongLong (val.i); +} + +void Buffer::putBin128(const uint8_t* b){ + checkAvailable(16); + memcpy (data + position, b, 16); + position += 16; +} + +uint8_t Buffer::getOctet(){ + checkAvailable(1); + uint8_t octet = static_cast<uint8_t>(data[position++]); + return octet; +} + +uint16_t Buffer::getShort(){ + checkAvailable(2); + uint16_t hi = (unsigned char) data[position++]; + hi = hi << 8; + hi |= (unsigned char) data[position++]; + return hi; +} + +uint32_t Buffer::getLong(){ + checkAvailable(4); + uint32_t a = (unsigned char) data[position++]; + uint32_t b = (unsigned char) data[position++]; + uint32_t c = (unsigned char) data[position++]; + uint32_t d = (unsigned char) data[position++]; + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +uint64_t Buffer::getLongLong(){ + uint64_t hi = getLong(); + uint64_t lo = getLong(); + hi = hi << 32; + return hi | lo; +} + +int8_t Buffer::getInt8(){ + checkAvailable(1); + int8_t i = static_cast<int8_t>(data[position++]); + return i; +} + +int16_t Buffer::getInt16(){ + return (int16_t) getShort(); +} + +int32_t Buffer::getInt32(){ + return (int32_t) getLong(); +} + +int64_t Buffer::getInt64(){ + return (int64_t) getLongLong(); +} + +float Buffer::getFloat(){ + union { + uint32_t i; + float f; + } val; + val.i = getLong(); + return val.f; +} + +double Buffer::getDouble(){ + union { + uint64_t i; + double f; + } val; + val.i = getLongLong(); + return val.f; +} + +template <> +QPID_COMMON_EXTERN uint64_t Buffer::getUInt<1>() { + return getOctet(); +} + +template <> +QPID_COMMON_EXTERN uint64_t Buffer::getUInt<2>() { + return getShort(); +} + +template <> +QPID_COMMON_EXTERN uint64_t Buffer::getUInt<4>() { + return getLong(); +} + +template <> +QPID_COMMON_EXTERN uint64_t Buffer::getUInt<8>() { + return getLongLong(); +} + +template <> +QPID_COMMON_EXTERN void Buffer::putUInt<1>(uint64_t i) { + if (std::numeric_limits<uint8_t>::min() <= i && i <= std::numeric_limits<uint8_t>::max()) { + putOctet(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint8_t.")); +} + +template <> +QPID_COMMON_EXTERN void Buffer::putUInt<2>(uint64_t i) { + if (std::numeric_limits<uint16_t>::min() <= i && i <= std::numeric_limits<uint16_t>::max()) { + putShort(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint16_t.")); +} + +template <> +QPID_COMMON_EXTERN void Buffer::putUInt<4>(uint64_t i) { + if (std::numeric_limits<uint32_t>::min() <= i && i <= std::numeric_limits<uint32_t>::max()) { + putLong(i); + return; + } + throw Exception(QPID_MSG("Could not encode (" << i << ") as uint32_t.")); +} + +template <> +QPID_COMMON_EXTERN void Buffer::putUInt<8>(uint64_t i) { + putLongLong(i); +} + +void Buffer::putShortString(const string& s){ + size_t slen = s.length(); + if (slen <= std::numeric_limits<uint8_t>::max()) { + uint8_t len = (uint8_t) slen; + putOctet(len); + checkAvailable(slen); + s.copy(data + position, len); + position += len; + return; + } + throw Exception(QPID_MSG("Could not encode string of " << slen << " bytes as uint8_t string.")); +} + +void Buffer::putMediumString(const string& s){ + size_t slen = s.length(); + if (slen <= std::numeric_limits<uint16_t>::max()) { + uint16_t len = (uint16_t) slen; + putShort(len); + checkAvailable(slen); + s.copy(data + position, len); + position += len; + return; + } + throw Exception(QPID_MSG("Could not encode string of " << slen << " bytes as uint16_t string.")); +} + +void Buffer::putLongString(const string& s){ + size_t slen = s.length(); + if (slen <= std::numeric_limits<uint32_t>::max()) { + uint32_t len = (uint32_t) slen; + putLong(len); + checkAvailable(slen); + s.copy(data + position, len); + position += len; + return; + } + throw Exception(QPID_MSG("Could not encode string of " << slen << " bytes as uint32_t string.")); +} + +void Buffer::getShortString(string& s){ + uint8_t len = getOctet(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getMediumString(string& s){ + uint16_t len = getShort(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getLongString(string& s){ + uint32_t len = getLong(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::getBin128(uint8_t* b){ + checkAvailable(16); + memcpy (b, data + position, 16); + position += 16; +} + +void Buffer::putRawData(const string& s){ + size_t len = s.length(); + checkAvailable(len); + s.copy(data + position, len); + position += len; +} + +void Buffer::getRawData(string& s, uint32_t len){ + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + +void Buffer::putRawData(const uint8_t* s, size_t len){ + checkAvailable(len); + memcpy(data + position, s, len); + position += len; +} + +void Buffer::getRawData(uint8_t* s, size_t len){ + checkAvailable(len); + memcpy(s, data + position, len); + position += len; +} + +void Buffer::dump(std::ostream& out) const { + for (uint32_t i = position; i < size; i++) + { + if (i != position) + out << " "; + out << boost::format("%02x") % ((unsigned) (uint8_t) data[i]); + } +} + +std::ostream& operator<<(std::ostream& out, const Buffer& b){ + out << "Buffer["; + b.dump(out); + return out << "]"; +} + +}} diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h new file mode 100644 index 0000000000..166b524e3c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Buffer.h @@ -0,0 +1,115 @@ +#ifndef QPID_FRAMING_BUFFER_H +#define QPID_FRAMING_BUFFER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Exception.h" +#include "qpid/CommonImportExport.h" +#include "qpid/sys/IntegerTypes.h" + +#include <string> + +namespace qpid { +namespace framing { + +struct QPID_COMMON_CLASS_EXTERN OutOfBounds : qpid::Exception { + OutOfBounds() : qpid::Exception(std::string("Out of Bounds")) {} +}; + +class Content; +class FieldTable; + +class QPID_COMMON_CLASS_EXTERN Buffer +{ + uint32_t size; + char* data; + uint32_t position; + + public: + void checkAvailable(size_t count) { if (count > size - position) throw OutOfBounds(); } + + QPID_COMMON_EXTERN Buffer(char* data=0, uint32_t size=0); + + QPID_COMMON_EXTERN void reset(); + + QPID_COMMON_INLINE_EXTERN uint32_t available() const{ return size - position; } + QPID_COMMON_INLINE_EXTERN uint32_t getSize() const { return size; } + QPID_COMMON_INLINE_EXTERN uint32_t getPosition() const { return position; } + QPID_COMMON_INLINE_EXTERN void setPosition(uint32_t p) { position = p; } + QPID_COMMON_INLINE_EXTERN const char * getPointer() const { return data; } + QPID_COMMON_INLINE_EXTERN char* getPointer() { return data; } + + QPID_COMMON_EXTERN void putOctet(uint8_t i); + QPID_COMMON_EXTERN void putShort(uint16_t i); + QPID_COMMON_EXTERN void putLong(uint32_t i); + QPID_COMMON_EXTERN void putLongLong(uint64_t i); + QPID_COMMON_EXTERN void putInt8(int8_t i); + QPID_COMMON_EXTERN void putInt16(int16_t i); + QPID_COMMON_EXTERN void putInt32(int32_t i); + QPID_COMMON_EXTERN void putInt64(int64_t i); + QPID_COMMON_EXTERN void putFloat(float f); + QPID_COMMON_EXTERN void putDouble(double f); + QPID_COMMON_EXTERN void putBin128(const uint8_t* b); + + QPID_COMMON_EXTERN uint8_t getOctet(); + QPID_COMMON_EXTERN uint16_t getShort(); + QPID_COMMON_EXTERN uint32_t getLong(); + QPID_COMMON_EXTERN uint64_t getLongLong(); + QPID_COMMON_EXTERN int8_t getInt8(); + QPID_COMMON_EXTERN int16_t getInt16(); + QPID_COMMON_EXTERN int32_t getInt32(); + QPID_COMMON_EXTERN int64_t getInt64(); + QPID_COMMON_EXTERN float getFloat(); + QPID_COMMON_EXTERN double getDouble(); + + template <int n> + QPID_COMMON_EXTERN uint64_t getUInt(); + + template <int n> + QPID_COMMON_EXTERN void putUInt(uint64_t); + + QPID_COMMON_EXTERN void putShortString(const std::string& s); + QPID_COMMON_EXTERN void putMediumString(const std::string& s); + QPID_COMMON_EXTERN void putLongString(const std::string& s); + QPID_COMMON_EXTERN void getShortString(std::string& s); + QPID_COMMON_EXTERN void getMediumString(std::string& s); + QPID_COMMON_EXTERN void getLongString(std::string& s); + QPID_COMMON_EXTERN void getBin128(uint8_t* b); + + QPID_COMMON_EXTERN void putRawData(const std::string& s); + QPID_COMMON_EXTERN void getRawData(std::string& s, uint32_t size); + + QPID_COMMON_EXTERN void putRawData(const uint8_t* data, size_t size); + QPID_COMMON_EXTERN void getRawData(uint8_t* data, size_t size); + + template <class T> void put(const T& data) { data.encode(*this); } + template <class T> void get(T& data) { data.decode(*this); } + + QPID_COMMON_EXTERN void dump(std::ostream&) const; +}; + +std::ostream& operator<<(std::ostream&, const Buffer&); + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/BufferTypes.h b/qpid/cpp/src/qpid/framing/BufferTypes.h new file mode 100644 index 0000000000..4199755852 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/BufferTypes.h @@ -0,0 +1,106 @@ +#ifndef QPID_FRAMING_BUFFERTYPES_H +#define QPID_FRAMING_BUFFERTYPES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file + * Using templates with framing::Buffer is difficultg becase of the many + * different put/get function names. Here we define a set of types + * corresponding the basic types of Buffer but presenting a uniform + * encode/decode/encodedSize interface. + * + * It also provides some convenience templates for the common case of + * encoding a single encodable value as a string, e.g. + * + * LongString ls("hello"); + * std::string encoded = encodeStr(ls); + * LongString ls2 = decodeStr<LongString>(encoded); + * LongString ls3; + * decodeStr(encoded, ls3); + */ + +namespace qpid { +namespace framing { + +// Templates to help define types +template <class ValueType> struct BufferTypeTraits { + typedef void (Buffer::*Put)(const ValueType&); + typedef void (Buffer::*Get)(ValueType&); +}; + +template <class ValueType, + typename BufferTypeTraits<ValueType>::Put PutFn, + typename BufferTypeTraits<ValueType>::Get GetFn> +struct EncodeDecodeTemplate { + EncodeDecodeTemplate(const ValueType& s) : value(s) {} + operator ValueType() const { return value; } + + ValueType value; + void encode(framing::Buffer& buf) const { (buf.*PutFn)(value); } + void decode(framing::Buffer& buf) { (buf.*GetFn)(value); } +}; + +template <size_t Size, + typename BufferTypeTraits<std::string>::Put PutFn, + typename BufferTypeTraits<std::string>::Get GetFn + > +struct StringTypeTemplate : public EncodeDecodeTemplate<std::string, PutFn, GetFn> { + typedef EncodeDecodeTemplate<std::string, PutFn, GetFn> Base; + StringTypeTemplate(const std::string& s) : Base(s) {} + size_t encodedSize() const { return Size + Base::value.size(); } +}; + + +// Convenience tempates for encoding/decoding values to/from a std::string. + +/** Encode value as a string. */ +template <class T> std::string encodeStr(const T& value) { + std::string encoded(value.encodedSize(), '\0'); + framing::Buffer buffer(&encoded[0], encoded.size()); + value.encode(buffer); + return encoded; +} + +/** Decode value from a string. */ +template <class T> void decodeStr(const std::string& encoded, T& value) { + framing::Buffer b(const_cast<char*>(&encoded[0]), encoded.size()); + value.decode(b); +} + +/** Decode value from a string. */ +template <class T> T decodeStr(const std::string& encoded) { + T value; + decodeStr(encoded, value); + return value; +} + +// The types + +typedef StringTypeTemplate<4, &Buffer::putLongString, &Buffer::getLongString> LongString; +typedef StringTypeTemplate<2, &Buffer::putMediumString, &Buffer::getMediumString> MediumString; +typedef StringTypeTemplate<1, &Buffer::putShortString, &Buffer::getShortString> ShortString; + +// TODO aconway 2013-07-26: Add integer types. + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_BUFFERTYPES_H*/ diff --git a/qpid/cpp/src/qpid/framing/ChannelHandler.h b/qpid/cpp/src/qpid/framing/ChannelHandler.h new file mode 100644 index 0000000000..ddab204578 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ChannelHandler.h @@ -0,0 +1,53 @@ +#ifndef QPID_FRAMING_CHANNELHANDLER_H +#define QPID_FRAMING_CHANNELHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AMQFrame.h" + +namespace qpid { +namespace framing { + +/** + * Sets the channel number on outgoing frames. + */ +class ChannelHandler : public FrameHandler +{ + public: + ChannelHandler(uint16_t channelId=0, FrameHandler* next=0) + : FrameHandler(next), channel(channelId) {} + void handle(AMQFrame& frame) { + frame.setChannel(channel); + next->handle(frame); + } + uint16_t get() const { return channel; } + ChannelHandler& set(uint16_t ch) { channel=ch; return *this; } + operator uint16_t() const { return get(); } + ChannelHandler& operator=(uint16_t ch) { return set(ch); } + + private: + uint16_t channel; +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/framing/Endian.h b/qpid/cpp/src/qpid/framing/Endian.h new file mode 100644 index 0000000000..faf553fed5 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Endian.h @@ -0,0 +1,78 @@ +#ifndef QPID_FRAMING_ENDIAN_H +#define QPID_FRAMING_ENDIAN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace framing { +namespace endian { + +/** Decode integer from network byte order buffer to type T, buffer must be at least sizeof(T). */ +template <class T> T decodeInt(const uint8_t* buffer) { + T v = buffer[0]; + for (size_t i = 1; i < sizeof(T); ++i) { + v <<= 8; + v |= buffer[i]; + } + return v; +} + +/** Encode integer value to network byte order in buffer, buffer must be at least sizeof(T). */ +template <class T> void encodeInt(uint8_t* buffer, T value) { + for (size_t i = sizeof(T); i > 0; --i) { + buffer[i-1] = value & 0XFF; + value >>= 8; + } +} + +// Compute the int type that can hold a float type. +template <class T> struct IntBox { typedef T Type; }; +template <> struct IntBox<float> { typedef uint32_t Type; }; +template <> struct IntBox<double> { typedef uint64_t Type; }; + +/** Decode floating from network byte order buffer to type T, buffer must be at least sizeof(T). */ +template <class T> T decodeFloat(const uint8_t* buffer) { + typedef typename IntBox<T>::Type Box; + union { T f; Box i; } u; + u.i = decodeInt<Box>(buffer); + return u.f; +} + +/** Encode floating value to network byte order in buffer, buffer must be at least sizeof(T). */ +template <class T> void encodeFloat(uint8_t* buffer, T value) { + typedef typename IntBox<T>::Type Box; + union { T f; Box i; } u; + u.f = value; + encodeInt(buffer, u.i); +} + +}}} // namespace qpid::framing::endian + +#endif /*!QPID_FRAMING_ENDIAN_H*/ + + + + + + diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp new file mode 100644 index 0000000000..0b677a6ccb --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -0,0 +1,433 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" +#include <assert.h> + +// The locking rationale in the FieldTable seems a little odd, but it +// maintains the concurrent guarantees and requirements that were in +// place before the cachedBytes/cachedSize were added: +// +// The FieldTable client code needs to make sure that they call no write +// operation in parallel with any other operation on the FieldTable. +// However multiple parallel read operations are safe. +// +// To this end the only code that is locked is code that can transparently +// change the state of the FieldTable during a read only operation. +// (In other words the code that required the mutable members in the class +// definition!) +// +namespace qpid { + +using sys::Mutex; +using sys::ScopedLock; + +namespace framing { + +FieldTable::FieldTable() : + cachedSize(0), + newBytes(false) +{ +} + +FieldTable::FieldTable(const FieldTable& ft) +{ + ScopedLock<Mutex> l(ft.lock); // lock _source_ FieldTable + + cachedBytes = ft.cachedBytes; + cachedSize = ft.cachedSize; + newBytes = ft.newBytes; + + // Only copy the values if we have no raw data + // - copying the map is expensive and we can + // reconstruct it if necessary from the raw data + if (cachedBytes) { + newBytes = true; + return; + } + // In practice Encoding the source field table and only copying + // the encoded bytes is faster than copying the whole value map. + // (Because we nearly always copy a field table internally before + // encoding it to send, but don't change it after the copy) + if (!ft.values.empty()) { + // Side effect of getting encoded size will cache it in ft.cachedSize + ft.cachedBytes = boost::shared_array<uint8_t>(new uint8_t[ft.encodedSize()]); + + Buffer buffer((char*)&ft.cachedBytes[0], ft.cachedSize); + + // Cut and paste ahead... + buffer.putLong(ft.encodedSize() - 4); + buffer.putLong(ft.values.size()); + for (ValueMap::const_iterator i = ft.values.begin(); i!=ft.values.end(); ++i) { + buffer.putShortString(i->first); + i->second->encode(buffer); + } + + cachedBytes = ft.cachedBytes; + cachedSize = ft.cachedSize; + newBytes = true; + } +} + +FieldTable& FieldTable::operator=(const FieldTable& ft) +{ + FieldTable nft(ft); + values.swap(nft.values); + cachedBytes.swap(nft.cachedBytes); + cachedSize = nft.cachedSize; + newBytes = nft.newBytes; + return (*this); +} + +uint32_t FieldTable::encodedSize() const { + ScopedLock<Mutex> l(lock); + + if (cachedSize != 0) { + return cachedSize; + } + uint32_t len(4/*size field*/ + 4/*count field*/); + for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + // shortstr_len_byte + key size + value size + len += 1 + (i->first).size() + (i->second)->encodedSize(); + } + cachedSize = len; + return len; +} + +int FieldTable::count() const { + return values.size(); +} + +namespace +{ +std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) { + return out << i.first << ":" << *i.second; +} +} + +std::ostream& operator<<(std::ostream& out, const FieldTable& t) { + t.realDecode(); + out << "{"; + FieldTable::ValueMap::const_iterator i = t.begin(); + if (i != t.end()) out << *i++; + while (i != t.end()) + { + out << "," << *i++; + } + return out << "}"; +} + +void FieldTable::set(const std::string& name, const ValuePtr& value){ + realDecode(); + values[name] = value; + flushRawCache(); +} + +void FieldTable::setString(const std::string& name, const std::string& value){ + realDecode(); + values[name] = ValuePtr(new Str16Value(value)); + flushRawCache(); +} + +void FieldTable::setInt(const std::string& name, const int value){ + realDecode(); + values[name] = ValuePtr(new IntegerValue(value)); + flushRawCache(); +} + +void FieldTable::setInt64(const std::string& name, const int64_t value){ + realDecode(); + values[name] = ValuePtr(new Integer64Value(value)); + flushRawCache(); +} + +void FieldTable::setTimestamp(const std::string& name, const uint64_t value){ + realDecode(); + values[name] = ValuePtr(new TimeValue(value)); + flushRawCache(); +} + +void FieldTable::setUInt64(const std::string& name, const uint64_t value){ + realDecode(); + values[name] = ValuePtr(new Unsigned64Value(value)); + flushRawCache(); +} + +void FieldTable::setTable(const std::string& name, const FieldTable& value) +{ + realDecode(); + values[name] = ValuePtr(new FieldTableValue(value)); + flushRawCache(); +} +void FieldTable::setArray(const std::string& name, const Array& value) +{ + realDecode(); + values[name] = ValuePtr(new ArrayValue(value)); + flushRawCache(); +} + +void FieldTable::setFloat(const std::string& name, const float value){ + realDecode(); + values[name] = ValuePtr(new FloatValue(value)); + flushRawCache(); +} + +void FieldTable::setDouble(const std::string& name, const double value){ + realDecode(); + values[name] = ValuePtr(new DoubleValue(value)); + flushRawCache(); +} + +FieldTable::ValuePtr FieldTable::get(const std::string& name) const +{ + // Ensure we have any values we're trying to read + realDecode(); + ValuePtr value; + ValueMap::const_iterator i = values.find(name); + if ( i!=values.end() ) + value = i->second; + return value; +} + +namespace { + template <class T> T default_value() { return T(); } + template <> int default_value<int>() { return 0; } + //template <> uint64_t default_value<uint64_t>() { return 0; } +} + +template <class T> +T getValue(const FieldTable::ValuePtr value) +{ + if (!value || !value->convertsTo<T>()) + return default_value<T>(); + + return value->get<T>(); +} + +std::string FieldTable::getAsString(const std::string& name) const { + return getValue<std::string>(get(name)); +} + +int FieldTable::getAsInt(const std::string& name) const { + return getValue<int>(get(name)); +} + +uint64_t FieldTable::getAsUInt64(const std::string& name) const { + return static_cast<uint64_t>( getValue<int64_t>(get(name))); +} + +int64_t FieldTable::getAsInt64(const std::string& name) const { + return getValue<int64_t>(get(name)); +} + +bool FieldTable::getTable(const std::string& name, FieldTable& value) const { + return getEncodedValue<FieldTable>(get(name), value); +} + +bool FieldTable::getArray(const std::string& name, Array& value) const { + return getEncodedValue<Array>(get(name), value); +} + +template <class T, int width, uint8_t typecode> +bool getRawFixedWidthValue(FieldTable::ValuePtr vptr, T& value) +{ + if (vptr && vptr->getType() == typecode) { + value = vptr->get<T>(); + return true; + } + return false; +} + +bool FieldTable::getFloat(const std::string& name, float& value) const { + return getRawFixedWidthValue<float, 4, 0x23>(get(name), value); +} + +bool FieldTable::getDouble(const std::string& name, double& value) const { + return getRawFixedWidthValue<double, 8, 0x33>(get(name), value); +} + +//uint64_t FieldTable::getTimestamp(const std::string& name) const { +// return getValue<uint64_t>(name); +//} + +void FieldTable::encode(Buffer& buffer) const { + // If we've still got the input field table + // we can just copy it directly to the output + if (cachedBytes) { + ScopedLock<Mutex> l(lock); + buffer.putRawData(&cachedBytes[0], cachedSize); + } else { + buffer.putLong(encodedSize() - 4); + buffer.putLong(values.size()); + for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { + buffer.putShortString(i->first); + i->second->encode(buffer); + } + } +} + +// Decode lazily - just record the raw bytes until we need them +void FieldTable::decode(Buffer& buffer){ + if (buffer.available() < 4) + throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); + uint32_t p = buffer.getPosition(); + uint32_t len = buffer.getLong(); + if (len) { + uint32_t available = buffer.available(); + if ((available < len) || (available < 4)) + throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); + } + ScopedLock<Mutex> l(lock); + // Throw away previous stored values + values.clear(); + // Copy data into our buffer + cachedBytes = boost::shared_array<uint8_t>(new uint8_t[len + 4]); + cachedSize = len + 4; + newBytes = true; + buffer.setPosition(p); + buffer.getRawData(&cachedBytes[0], cachedSize); +} + +void FieldTable::realDecode() const +{ + ScopedLock<Mutex> l(lock); + + // If we've got no raw data stored up then nothing to do + if (!newBytes) + return; + + Buffer buffer((char*)&cachedBytes[0], cachedSize); + uint32_t len = buffer.getLong(); + if (len) { + uint32_t available = buffer.available(); + uint32_t count = buffer.getLong(); + uint32_t leftover = available - len; + while(buffer.available() > leftover && count--){ + std::string name; + ValuePtr value(new FieldValue); + + buffer.getShortString(name); + value->decode(buffer); + values[name] = ValuePtr(value); + } + } + newBytes = false; +} + +void FieldTable::flushRawCache() +{ + ScopedLock<Mutex> l(lock); + // We can only flush the cache if there are no cached bytes to decode + assert(newBytes==false); + // Avoid recreating shared array unless we actually have one. + if (cachedBytes) cachedBytes.reset(); + cachedSize = 0; +} + +bool FieldTable::operator==(const FieldTable& x) const { + realDecode(); + x.realDecode(); + if (values.size() != x.values.size()) return false; + for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + ValueMap::const_iterator j = x.values.find(i->first); + if (j == x.values.end()) return false; + if (*(i->second) != *(j->second)) return false; + } + return true; +} + +void FieldTable::erase(const std::string& name) +{ + realDecode(); + if (values.find(name) != values.end()) { + values.erase(name); + flushRawCache(); + } +} + +void FieldTable::clear() +{ + values.clear(); + newBytes = false; + flushRawCache(); +} + +// Map-like interface. +FieldTable::ValueMap::const_iterator FieldTable::begin() const +{ + realDecode(); + return values.begin(); +} + +FieldTable::ValueMap::const_iterator FieldTable::end() const +{ + realDecode(); + return values.end(); +} + +FieldTable::ValueMap::const_iterator FieldTable::find(const std::string& s) const +{ + realDecode(); + return values.find(s); +} + +FieldTable::ValueMap::iterator FieldTable::begin() +{ + realDecode(); + flushRawCache(); + return values.begin(); +} + +FieldTable::ValueMap::iterator FieldTable::end() +{ + realDecode(); + flushRawCache(); + return values.end(); +} + +FieldTable::ValueMap::iterator FieldTable::find(const std::string& s) +{ + realDecode(); + flushRawCache(); + return values.find(s); +} + +std::pair<FieldTable::ValueMap::iterator, bool> FieldTable::insert(const ValueMap::value_type& value) +{ + realDecode(); + flushRawCache(); + return values.insert(value); +} + +FieldTable::ValueMap::iterator FieldTable::insert(ValueMap::iterator position, const ValueMap::value_type& value) +{ + realDecode(); + flushRawCache(); + return values.insert(position, value); +} + +} +} diff --git a/qpid/cpp/src/qpid/framing/FieldTable.h b/qpid/cpp/src/qpid/framing/FieldTable.h new file mode 100644 index 0000000000..1986a72d10 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldTable.h @@ -0,0 +1,139 @@ +#ifndef _FieldTable_ +#define _FieldTable_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <boost/shared_array.hpp> + +#include <iosfwd> +#include <map> + +#include "qpid/CommonImportExport.h" + +namespace qpid { + /** + * The framing namespace contains classes that are used to create, + * send and receive the basic packets from which AMQP is built. + */ +namespace framing { + +class Array; +class FieldValue; +class Buffer; + +/** + * A set of name-value pairs. (See the AMQP spec for more details on + * AMQP field tables). + * + * \ingroup clientapi + */ +class FieldTable +{ + public: + typedef boost::shared_ptr<FieldValue> ValuePtr; + typedef std::map<std::string, ValuePtr> ValueMap; + typedef ValueMap::iterator iterator; + typedef ValueMap::const_iterator const_iterator; + typedef ValueMap::const_reference const_reference; + typedef ValueMap::reference reference; + typedef ValueMap::value_type value_type; + + QPID_COMMON_EXTERN FieldTable(); + QPID_COMMON_EXTERN FieldTable(const FieldTable&); + QPID_COMMON_EXTERN FieldTable& operator=(const FieldTable&); + // Compiler default destructor fine + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer); + + QPID_COMMON_EXTERN int count() const; + QPID_COMMON_INLINE_EXTERN size_t size() const { return values.size(); } + QPID_COMMON_INLINE_EXTERN bool empty() { return size() == 0; } + QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value); + QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const; + QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; } + + QPID_COMMON_EXTERN void setString(const std::string& name, const std::string& value); + QPID_COMMON_EXTERN void setInt(const std::string& name, const int value); + QPID_COMMON_EXTERN void setInt64(const std::string& name, const int64_t value); + QPID_COMMON_EXTERN void setTimestamp(const std::string& name, const uint64_t value); + QPID_COMMON_EXTERN void setUInt64(const std::string& name, const uint64_t value); + QPID_COMMON_EXTERN void setTable(const std::string& name, const FieldTable& value); + QPID_COMMON_EXTERN void setArray(const std::string& name, const Array& value); + QPID_COMMON_EXTERN void setFloat(const std::string& name, const float value); + QPID_COMMON_EXTERN void setDouble(const std::string& name, const double value); + //void setDecimal(string& name, xxx& value); + + QPID_COMMON_EXTERN int getAsInt(const std::string& name) const; + QPID_COMMON_EXTERN uint64_t getAsUInt64(const std::string& name) const; + QPID_COMMON_EXTERN int64_t getAsInt64(const std::string& name) const; + QPID_COMMON_EXTERN std::string getAsString(const std::string& name) const; + + QPID_COMMON_EXTERN bool getTable(const std::string& name, FieldTable& value) const; + QPID_COMMON_EXTERN bool getArray(const std::string& name, Array& value) const; + QPID_COMMON_EXTERN bool getFloat(const std::string& name, float& value) const; + QPID_COMMON_EXTERN bool getDouble(const std::string& name, double& value) const; + //QPID_COMMON_EXTERN bool getTimestamp(const std::string& name, uint64_t& value) const; + //QPID_COMMON_EXTERN bool getDecimal(string& name, xxx& value); + QPID_COMMON_EXTERN void erase(const std::string& name); + + + QPID_COMMON_EXTERN bool operator==(const FieldTable& other) const; + + // Map-like interface. + QPID_COMMON_EXTERN ValueMap::const_iterator begin() const; + QPID_COMMON_EXTERN ValueMap::const_iterator end() const; + QPID_COMMON_EXTERN ValueMap::const_iterator find(const std::string& s) const; + + QPID_COMMON_EXTERN ValueMap::iterator begin(); + QPID_COMMON_EXTERN ValueMap::iterator end(); + QPID_COMMON_EXTERN ValueMap::iterator find(const std::string& s); + + QPID_COMMON_EXTERN std::pair <ValueMap::iterator, bool> insert(const ValueMap::value_type&); + QPID_COMMON_EXTERN ValueMap::iterator insert(ValueMap::iterator, const ValueMap::value_type&); + QPID_COMMON_EXTERN void clear(); + + private: + void realDecode() const; + void flushRawCache(); + + mutable qpid::sys::Mutex lock; + mutable ValueMap values; + mutable boost::shared_array<uint8_t> cachedBytes; + mutable uint32_t cachedSize; // if = 0 then non cached size as 0 is not a legal size + mutable bool newBytes; + + QPID_COMMON_EXTERN friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); +}; + +//class FieldNotFoundException{}; +//class UnknownFieldName : public FieldNotFoundException{}; +//class IncorrectFieldType : public FieldNotFoundException{}; +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp new file mode 100644 index 0000000000..227c12e44e --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/List.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Endian.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +// Some template magic for computing types from sizes. +template<int W> struct IntType{}; +template<> struct IntType<1> { typedef int8_t Type; }; +template<> struct IntType<2> { typedef int16_t Type; }; +template<> struct IntType<4> { typedef int32_t Type; }; +template<> struct IntType<8> { typedef int64_t Type; }; + +template<int W> struct UintType{}; +template<> struct UintType<1> { typedef uint8_t Type; }; +template<> struct UintType<2> { typedef uint16_t Type; }; +template<> struct UintType<4> { typedef uint32_t Type; }; +template<> struct UintType<8> { typedef uint64_t Type; }; + +template<int W> struct FloatType{}; +template<> struct FloatType<1> { typedef int8_t Type; }; // Dummy, never used. +template<> struct FloatType<2> { typedef int16_t Type; }; // Dummy, never used. +template<> struct FloatType<4> { typedef float Type; }; +template<> struct FloatType<8> { typedef double Type; }; + +// Construct the right subclass of FixedWidthValue for numeric types using width and kind. +// Kind 1=int, 2=unsigned int, 3=float +template<int W> FixedWidthValue<W>* numericFixedWidthValue(uint8_t kind) { + switch (kind) { + case 1: return new FixedWidthIntValue<typename IntType<W>::Type>(); + case 2: return new FixedWidthIntValue<typename UintType<W>::Type>(); + case 3: return new FixedWidthFloatValue<typename FloatType<W>::Type>(); + default: return new FixedWidthValue<W>(); + } +} + +uint8_t FieldValue::getType() const +{ + return typeOctet; +} + +void FieldValue::setType(uint8_t type) +{ + typeOctet = type; + if (typeOctet == 0xA8) { + data.reset(new EncodedValue<FieldTable>()); + } else if (typeOctet == 0xA9) { + data.reset(new EncodedValue<List>()); + } else if (typeOctet == 0xAA) { + data.reset(new EncodedValue<Array>()); + } else if (typeOctet == 0x48) { + data.reset(new UuidData()); + } else { + uint8_t kind = typeOctet & 0xF; + uint8_t lenType = typeOctet >> 4; + switch(lenType){ + case 0: + data.reset(numericFixedWidthValue<1>(kind)); + break; + case 1: + data.reset(numericFixedWidthValue<2>(kind)); + break; + case 2: + data.reset(numericFixedWidthValue<4>(kind)); + break; + case 3: + data.reset(numericFixedWidthValue<8>(kind)); + break; + // None of the remaining widths can be numeric types so just use new FixedWidthValue + case 4: + data.reset(new FixedWidthValue<16>()); + break; + case 5: + data.reset(new FixedWidthValue<32>()); + break; + case 6: + data.reset(new FixedWidthValue<64>()); + break; + case 7: + data.reset(new FixedWidthValue<128>()); + break; + case 8: + data.reset(new VariableWidthValue<1>()); + break; + case 9: + data.reset(new VariableWidthValue<2>()); + break; + case 0xA: + data.reset(new VariableWidthValue<4>()); + break; + case 0xC: + data.reset(new FixedWidthValue<5>()); + break; + case 0xD: + data.reset(new FixedWidthValue<9>()); + break; + case 0xF: + data.reset(new FixedWidthValue<0>()); + break; + default: + throw IllegalArgumentException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); + } + } +} + +void FieldValue::decode(Buffer& buffer) +{ + setType(buffer.getOctet()); + data->decode(buffer); +} + +void FieldValue::encode(Buffer& buffer) +{ + buffer.putOctet(typeOctet); + data->encode(buffer); +} + +bool FieldValue::operator==(const FieldValue& v) const +{ + return + typeOctet == v.typeOctet && + *data == *v.data; +} + +Str8Value::Str8Value(const std::string& v) : + FieldValue( + TYPE_CODE_STR8, + new VariableWidthValue<1>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{ +} + +Str16Value::Str16Value(const std::string& v) : + FieldValue( + 0x95, + new VariableWidthValue<2>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +Var16Value::Var16Value(const std::string& v, uint8_t code) : + FieldValue( + code, + new VariableWidthValue<2>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} +Var32Value::Var32Value(const std::string& v, uint8_t code) : + FieldValue( + code, + new VariableWidthValue<4>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +Struct32Value::Struct32Value(const std::string& v) : + FieldValue( + 0xAB, + new VariableWidthValue<4>( + reinterpret_cast<const uint8_t*>(v.data()), + reinterpret_cast<const uint8_t*>(v.data()+v.size()))) +{} + +IntegerValue::IntegerValue(int v) : + FieldValue(0x21, new FixedWidthIntValue<int32_t>(v)) +{} + +FloatValue::FloatValue(float v) : + FieldValue(0x23, new FixedWidthFloatValue<float>(v)) +{} + +DoubleValue::DoubleValue(double v) : + FieldValue(0x33, new FixedWidthFloatValue<double>(v)) +{} + +Integer64Value::Integer64Value(int64_t v) : + FieldValue(0x31, new FixedWidthIntValue<int64_t>(v)) +{} + +Unsigned64Value::Unsigned64Value(uint64_t v) : + FieldValue(0x32, new FixedWidthIntValue<uint64_t>(v)) +{} + + +TimeValue::TimeValue(uint64_t v) : + FieldValue(0x38, new FixedWidthIntValue<uint64_t>(v)) +{ +} + +FieldTableValue::FieldTableValue(const FieldTable& f) : FieldValue(0xa8, new EncodedValue<FieldTable>(f)) +{ +} + +ListValue::ListValue(const List& l) : FieldValue(0xa9, new EncodedValue<List>(l)) +{ +} + +ArrayValue::ArrayValue(const Array& a) : FieldValue(0xaa, new EncodedValue<Array>(a)) +{ +} + +VoidValue::VoidValue() : FieldValue(0xf0, new FixedWidthValue<0>()) {} + +BoolValue::BoolValue(bool b) : + FieldValue(0x08, new FixedWidthIntValue<bool>(b)) +{} + +Unsigned8Value::Unsigned8Value(uint8_t v) : + FieldValue(0x02, new FixedWidthIntValue<uint8_t>(v)) +{} +Unsigned16Value::Unsigned16Value(uint16_t v) : + FieldValue(0x12, new FixedWidthIntValue<uint16_t>(v)) +{} +Unsigned32Value::Unsigned32Value(uint32_t v) : + FieldValue(0x22, new FixedWidthIntValue<uint32_t>(v)) +{} + +Integer8Value::Integer8Value(int8_t v) : + FieldValue(0x01, new FixedWidthIntValue<int8_t>(v)) +{} +Integer16Value::Integer16Value(int16_t v) : + FieldValue(0x11, new FixedWidthIntValue<int16_t>(v)) +{} + +UuidData::UuidData() {} +UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {} +bool UuidData::convertsToString() const { return true; } +std::string UuidData::getString() const { return Uuid(rawOctets()).str(); } +UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {} + +void FieldValue::print(std::ostream& out) const { + data->print(out); + out << TypeCode(typeOctet) << '('; + if (data->convertsToString()) out << data->getString(); + else if (data->convertsToInt()) out << data->getInt(); + else data->print(out); + out << ')'; +} + +}} diff --git a/qpid/cpp/src/qpid/framing/FieldValue.h b/qpid/cpp/src/qpid/framing/FieldValue.h new file mode 100644 index 0000000000..e20244e7c9 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FieldValue.h @@ -0,0 +1,482 @@ +#ifndef _framing_FieldValue_h +#define _framing_FieldValue_h +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Exception.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Endian.h" +#include "qpid/CommonImportExport.h" + +#include <iostream> +#include <memory> +#include <vector> + +#include <assert.h> + +namespace qpid { +namespace framing { + +/** + * Exception that is the base exception for all field table errors. + * + * \ingroup clientapi + */ +class QPID_COMMON_CLASS_EXTERN FieldValueException : public qpid::Exception {}; + +/** + * Exception thrown when we can't perform requested conversion + * + * \ingroup clientapi + */ +struct QPID_COMMON_CLASS_EXTERN InvalidConversionException : public FieldValueException { + InvalidConversionException() {} +}; + +class List; + +/** + * Value that can appear in an AMQP field table + * + * \ingroup clientapi + */ +class QPID_COMMON_CLASS_EXTERN FieldValue { + public: + /* + * Abstract type for content of different types + */ + class Data { + public: + virtual ~Data() {} + virtual uint32_t encodedSize() const = 0; + virtual void encode(Buffer& buffer) = 0; + virtual void decode(Buffer& buffer) = 0; + virtual bool operator==(const Data&) const = 0; + + virtual bool convertsToInt() const { return false; } + virtual bool convertsToFloat() const { return false; } + virtual bool convertsToString() const { return false; } + virtual int64_t getInt() const { throw InvalidConversionException();} + virtual double getFloat() const { throw InvalidConversionException();} + virtual std::string getString() const { throw InvalidConversionException(); } + + virtual void print(std::ostream& out) const = 0; + }; + + FieldValue(): data(0) {}; + // Default assignment operator is fine + void setType(uint8_t type); + QPID_COMMON_EXTERN uint8_t getType() const; + Data& getData() { return *data; } + uint32_t encodedSize() const { return 1 + data->encodedSize(); }; + bool empty() const { return data.get() == 0; } + void encode(Buffer& buffer); + void decode(Buffer& buffer); + QPID_COMMON_EXTERN bool operator==(const FieldValue&) const; + QPID_COMMON_INLINE_EXTERN bool operator!=(const FieldValue& v) const { return !(*this == v); } + + QPID_COMMON_EXTERN void print(std::ostream& out) const; + + template <typename T> bool convertsTo() const { return false; } + template <typename T> T get() const { throw InvalidConversionException(); } + + template <class T, int W> T getIntegerValue() const; + template <class T> T getIntegerValue() const; + template <class T, int W> T getFloatingPointValue() const; + template <int W> void getFixedWidthValue(unsigned char*) const; + template <class T> bool get(T&) const; + + protected: + FieldValue(uint8_t t, Data* d): typeOctet(t), data(d) {} + + private: + uint8_t typeOctet; + std::auto_ptr<Data> data; +}; + +template <> +inline bool FieldValue::convertsTo<int>() const { return data->convertsToInt(); } + +template <> +inline bool FieldValue::convertsTo<int64_t>() const { return data->convertsToInt(); } + +template <> +inline bool FieldValue::convertsTo<std::string>() const { return data->convertsToString(); } + +template <> +inline bool FieldValue::convertsTo<float>() const { return data->convertsToFloat(); } + +template <> +inline bool FieldValue::convertsTo<double>() const { return data->convertsToFloat(); } + +template <> +inline int FieldValue::get<int>() const { return static_cast<int>(data->getInt()); } + +template <> +inline int64_t FieldValue::get<int64_t>() const { return data->getInt(); } + +template <> +inline float FieldValue::get<float>() const { return data->getFloat(); } + +template <> +inline double FieldValue::get<double>() const { return data->getFloat(); } + +template <> +inline std::string FieldValue::get<std::string>() const { return data->getString(); } + +inline std::ostream& operator<<(std::ostream& out, const FieldValue& v) { + v.print(out); + return out; +} + +template <int width> +class FixedWidthValue : public FieldValue::Data { + protected: + uint8_t octets[width]; + + public: + FixedWidthValue() {} + FixedWidthValue(const uint8_t (&data)[width]) : octets(data) {} + FixedWidthValue(const uint8_t* const data) { std::copy(data, data + width, octets); } + + uint32_t encodedSize() const { return width; } + void encode(Buffer& buffer) { buffer.putRawData(octets, width); } + void decode(Buffer& buffer) { buffer.getRawData(octets, width); } + bool operator==(const Data& d) const { + const FixedWidthValue<width>* rhs = dynamic_cast< const FixedWidthValue<width>* >(&d); + if (rhs == 0) return false; + else return std::equal(&octets[0], &octets[width], &rhs->octets[0]); + } + uint8_t* rawOctets() { return octets; } + const uint8_t* rawOctets() const { return octets; } + + void print(std::ostream& o) const { o << "F" << width << ":"; }; +}; + +template <class T> class FixedWidthIntValue : public FixedWidthValue<sizeof(T)> { + public: + FixedWidthIntValue(T v = 0) { endian::encodeInt(this->octets, v); } + bool convertsToInt() const { return true; } + int64_t getInt() const { return endian::decodeInt<T>(this->octets); } + bool convertsToFloat() const { return true; } + double getFloat() const { return getInt(); } +}; + +template <class T> class FixedWidthFloatValue : public FixedWidthValue<sizeof(T)> { + public: + FixedWidthFloatValue(T v = 0) { endian::encodeFloat(this->octets, v); } + bool convertsToFloat() const { return true; } + double getFloat() const { return endian::decodeFloat<T>(this->octets); } +}; + +// Dummy implementations that are never used but needed to avoid compile errors. +template <> class FixedWidthFloatValue<uint8_t> : public FixedWidthValue<1> { + FixedWidthFloatValue() { assert(0); } +}; +template <> class FixedWidthFloatValue<uint16_t> : public FixedWidthValue<2> { + FixedWidthFloatValue() { assert(0); } +}; + + +class UuidData : public FixedWidthValue<16> { + public: + UuidData(); + UuidData(const unsigned char* bytes); + bool convertsToString() const; + std::string getString() const; +}; + +template <class T, int W> +inline T FieldValue::getIntegerValue() const +{ + FixedWidthValue<W>* const fwv = dynamic_cast< FixedWidthValue<W>* const>(data.get()); + if (fwv) { + return endian::decodeInt<T>(fwv->rawOctets()); + } else { + throw InvalidConversionException(); + } +} + +template <class T> +inline T FieldValue::getIntegerValue() const +{ + FixedWidthValue<1>* const fwv = dynamic_cast< FixedWidthValue<1>* const>(data.get()); + if (fwv) { + uint8_t* octets = fwv->rawOctets(); + return octets[0]; + } else { + throw InvalidConversionException(); + } +} + +template <class T, int W> +inline T FieldValue::getFloatingPointValue() const { + const FixedWidthFloatValue<T>* fv = dynamic_cast<FixedWidthFloatValue<T>*>(data.get()); + if (fv) { + return endian::decodeFloat<T>(fv->rawOctets()); + } else { + throw InvalidConversionException(); + } +} + +template <int W> void FieldValue::getFixedWidthValue(unsigned char* value) const +{ + FixedWidthValue<W>* const fwv = dynamic_cast< FixedWidthValue<W>* const>(data.get()); + if (fwv) { + std::copy(fwv->rawOctets(), fwv->rawOctets() + W, value); + } else { + throw InvalidConversionException(); + } +} + +template <> +class FixedWidthValue<0> : public FieldValue::Data { + public: + // Implicit default constructor is fine + uint32_t encodedSize() const { return 0; } + void encode(Buffer&) {}; + void decode(Buffer&) {}; + bool operator==(const Data& d) const { + const FixedWidthValue<0>* rhs = dynamic_cast< const FixedWidthValue<0>* >(&d); + return rhs != 0; + } + void print(std::ostream& o) const { o << "F0"; }; +}; + +template <int lenwidth> +class VariableWidthValue : public FieldValue::Data { + std::vector<uint8_t> octets; + + public: + VariableWidthValue() {} + VariableWidthValue(const std::vector<uint8_t>& data) : octets(data) {} + VariableWidthValue(const uint8_t* start, const uint8_t* end) : octets(start, end) {} + uint32_t encodedSize() const { return lenwidth + octets.size(); } + void encode(Buffer& buffer) { + buffer.putUInt<lenwidth>(octets.size()); + if (octets.size() > 0) + buffer.putRawData(&octets[0], octets.size()); + }; + void decode(Buffer& buffer) { + uint32_t len = buffer.getUInt<lenwidth>(); + buffer.checkAvailable(len); + octets.resize(len); + if (len > 0) + buffer.getRawData(&octets[0], len); + } + bool operator==(const Data& d) const { + const VariableWidthValue<lenwidth>* rhs = dynamic_cast< const VariableWidthValue<lenwidth>* >(&d); + if (rhs == 0) return false; + else return octets==rhs->octets; + } + + bool convertsToString() const { return true; } + std::string getString() const { return std::string(octets.begin(), octets.end()); } + + void print(std::ostream& o) const { o << "V" << lenwidth << ":" << octets.size() << ":"; }; +}; + +template <class T> +class EncodedValue : public FieldValue::Data { + T value; + public: + + EncodedValue() {} + EncodedValue(const T& v) : value(v) {} + + T& getValue() { return value; } + const T& getValue() const { return value; } + + uint32_t encodedSize() const { return value.encodedSize(); } + + void encode(Buffer& buffer) { + value.encode(buffer); + }; + void decode(Buffer& buffer) { + value.decode(buffer); + } + bool operator==(const Data& d) const { + const EncodedValue<T>* rhs = dynamic_cast< const EncodedValue<T>* >(&d); + if (rhs == 0) return false; + else return value==rhs->value; + } + + void print(std::ostream& o) const { o << "[" << value << "]"; }; +}; + +/** + * Accessor that can be used to get values of type FieldTable, Array + * and List. + */ +template <class T> +inline bool FieldValue::get(T& t) const +{ + const EncodedValue<T>* v = dynamic_cast< EncodedValue<T>* >(data.get()); + if (v != 0) { + t = v->getValue(); + return true; + } else { + try { + t = get<T>(); + return true; + } catch (const InvalidConversionException&) { + return false; + } + } +} + +class Str8Value : public FieldValue { + public: + QPID_COMMON_EXTERN Str8Value(const std::string& v); +}; + +class Str16Value : public FieldValue { + public: + QPID_COMMON_EXTERN Str16Value(const std::string& v); +}; + +class Var16Value : public FieldValue { + public: + QPID_COMMON_EXTERN Var16Value(const std::string& v, uint8_t code); +}; + +class Var32Value : public FieldValue { + public: + QPID_COMMON_EXTERN Var32Value(const std::string& v, uint8_t code); + }; + +class Struct32Value : public FieldValue { + public: + QPID_COMMON_EXTERN Struct32Value(const std::string& v); +}; + +class FloatValue : public FieldValue +{ + public: + QPID_COMMON_EXTERN FloatValue(float f); +}; +class DoubleValue : public FieldValue +{ + public: + QPID_COMMON_EXTERN DoubleValue(double f); +}; + +/* + * Basic integer value encodes as signed 32 bit + */ +class IntegerValue : public FieldValue { + public: + QPID_COMMON_EXTERN IntegerValue(int v); +}; + +class TimeValue : public FieldValue { + public: + QPID_COMMON_EXTERN TimeValue(uint64_t v); +}; + +class Integer64Value : public FieldValue { + public: + QPID_COMMON_EXTERN Integer64Value(int64_t v); +}; + +class Unsigned64Value : public FieldValue { + public: + QPID_COMMON_EXTERN Unsigned64Value(uint64_t v); +}; + +class FieldTableValue : public FieldValue { + public: + typedef FieldTable ValueType; + QPID_COMMON_EXTERN FieldTableValue(const FieldTable&); +}; + +class ArrayValue : public FieldValue { + public: + QPID_COMMON_EXTERN ArrayValue(const Array&); +}; + +class VoidValue : public FieldValue { + public: + QPID_COMMON_EXTERN VoidValue(); +}; + +class BoolValue : public FieldValue { + public: + QPID_COMMON_EXTERN BoolValue(bool); +}; + +class Unsigned8Value : public FieldValue { + public: + QPID_COMMON_EXTERN Unsigned8Value(uint8_t); +}; + +class Unsigned16Value : public FieldValue { + public: + QPID_COMMON_EXTERN Unsigned16Value(uint16_t); +}; + +class Unsigned32Value : public FieldValue { + public: + QPID_COMMON_EXTERN Unsigned32Value(uint32_t); +}; + +class Integer8Value : public FieldValue { + public: + QPID_COMMON_EXTERN Integer8Value(int8_t); +}; + +class Integer16Value : public FieldValue { + public: + QPID_COMMON_EXTERN Integer16Value(int16_t); +}; + +typedef IntegerValue Integer32Value; + +class ListValue : public FieldValue { + public: + typedef List ValueType; + QPID_COMMON_EXTERN ListValue(const List&); +}; + +class UuidValue : public FieldValue { + public: + QPID_COMMON_EXTERN UuidValue(); + QPID_COMMON_EXTERN UuidValue(const unsigned char*); +}; + +template <class T> +bool getEncodedValue(FieldTable::ValuePtr vptr, T& value) +{ + if (vptr) { + const EncodedValue<T>* ev = dynamic_cast< EncodedValue<T>* >(&(vptr->getData())); + if (ev != 0) { + value = ev->getValue(); + return true; + } + } + return false; +} + +}} // qpid::framing + +#endif diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.cpp b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp new file mode 100644 index 0000000000..90cbbd84a1 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/FrameDecoder.h" +#include "qpid/framing/Buffer.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" +#include <algorithm> +#include <string.h> + +namespace qpid { +namespace framing { + +namespace { +/** Append up to n bytes from start of buf to end of bytes. */ +void append(std::vector<char>& bytes, Buffer& buffer, size_t n) { + size_t oldSize = bytes.size(); + if ((n = std::min(n, size_t(buffer.available()))) == 0) + return; + bytes.resize(oldSize+n); + char* p = &bytes[oldSize]; + buffer.getRawData(reinterpret_cast<uint8_t*>(p), n); +} +} + +bool FrameDecoder::decode(Buffer& buffer) { + if (buffer.available() == 0) return false; + if (fragment.empty()) { + if (frame.decode(buffer)) // Decode from buffer + return true; + else // Store fragment + append(fragment, buffer, buffer.available()); + } + else { // Already have a fragment + // Get enough data to decode the frame size. + if (fragment.size() < AMQFrame::DECODE_SIZE_MIN) { + append(fragment, buffer, AMQFrame::DECODE_SIZE_MIN - fragment.size()); + } + if (fragment.size() >= AMQFrame::DECODE_SIZE_MIN) { + uint16_t size = AMQFrame::decodeSize(&fragment[0]); + if (size <= fragment.size()) + throw FramingErrorException(QPID_MSG("Frame size " << size << " is too small.")); + append(fragment, buffer, size-fragment.size()); + Buffer b(&fragment[0], fragment.size()); + if (frame.decode(b)) { + assert(b.available() == 0); + fragment.clear(); + return true; + } + } + } + return false; +} + +void FrameDecoder::setFragment(const char* data, size_t size) { + fragment.resize(size); + ::memcpy(&fragment[0], data, size); +} + +std::pair<const char*, size_t> FrameDecoder::getFragment() const { + return std::pair<const char*, size_t>(&fragment[0], fragment.size()); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/FrameDecoder.h b/qpid/cpp/src/qpid/framing/FrameDecoder.h new file mode 100644 index 0000000000..26bed6c447 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDecoder.h @@ -0,0 +1,52 @@ +#ifndef QPID_FRAMING_FRAMEDECODER_H +#define QPID_FRAMING_FRAMEDECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/AMQFrame.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +/** + * Decode a frame from buffer. If buffer does not contain a complete + * frame, caches the fragment for the next call to decode. + */ +class FrameDecoder +{ + public: + QPID_COMMON_EXTERN bool decode(Buffer& buffer); + const AMQFrame& getFrame() const { return frame; } + AMQFrame& getFrame() { return frame; } + + void setFragment(const char*, size_t); + std::pair<const char*, size_t> getFragment() const; + + private: + std::vector<char> fragment; + AMQFrame frame; + +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_FRAMEDECODER_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h b/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h new file mode 100644 index 0000000000..bd676960bf --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h @@ -0,0 +1,60 @@ +#ifndef QPID_FRAMING_FRAMEVISITOR_H +#define QPID_FRAMING_FRAMEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/MethodBodyDefaultVisitor.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" + +namespace qpid { +namespace framing { +/** + * Visitor for all concrete frame body types, which combines + * AMQBodyConstVisitor and MethodBodyDefaultVisitor. + * + * Derived classes can override visit methods to specify actions. + * Derived classes must override defaultVisit(), which is called + * for any non-overridden visit functions. + * + */ +struct FrameDefaultVisitor : public AMQBodyConstVisitor, + protected MethodBodyDefaultVisitor +{ + virtual void defaultVisit(const AMQBody&) = 0; + void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); } + + void visit(const AMQHeaderBody& b) { defaultVisit(b); } + void visit(const AMQContentBody& b) { defaultVisit(b); } + void visit(const AMQHeartbeatBody& b) { defaultVisit(b); } + void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); } + + using AMQBodyConstVisitor::visit; + using MethodBodyDefaultVisitor::visit; +}; + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_FRAMEVISITOR_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameHandler.h b/qpid/cpp/src/qpid/framing/FrameHandler.h new file mode 100644 index 0000000000..fa1fb535ef --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameHandler.h @@ -0,0 +1,33 @@ +#ifndef QPID_FRAMING_FRAMEHANDLER_H +#define QPID_FRAMING_FRAMEHANDLER_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/Handler.h" + +namespace qpid { +namespace framing { + +class AMQFrame; +typedef Handler<AMQFrame&> FrameHandler; + + +}} +#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp new file mode 100644 index 0000000000..4089475c7a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::framing; +using qpid::sys::AbsTime; +using qpid::sys::TIME_MSEC; + +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true),received(AbsTime::FarFuture()) { } +FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true), received(AbsTime::FarFuture()) +{ + for (Frames::const_iterator i = original.begin(); i != original.end(); ++i) { + parts.push_back(AMQFrame(*(i->getBody()))); + parts.back().setFirstSegment(i->isFirstSegment()); + parts.back().setLastSegment(i->isLastSegment()); + parts.back().setFirstFrame(i->isFirstFrame()); + parts.back().setLastFrame(i->isLastFrame()); + } +} + +void FrameSet::append(const AMQFrame& part) +{ + parts.push_back(part); + recalculateSize = true; +} + +bool FrameSet::isComplete() const +{ + return !parts.empty() && parts.back().getEof() && parts.back().getEos(); +} + +bool FrameSet::isContentBearing() const +{ + const AMQMethodBody* method = getMethod(); + return method && method->isContentBearing(); +} + +const AMQMethodBody* FrameSet::getMethod() const +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + +AMQMethodBody* FrameSet::getMethod() +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + +const AMQHeaderBody* FrameSet::getHeaders() const +{ + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); +} + +AMQHeaderBody* FrameSet::getHeaders() +{ + return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); +} + +uint64_t FrameSet::getContentSize() const +{ + if (recalculateSize) + { + SumBodySize sum; + map_if(sum, TypeFilter<CONTENT_BODY>()); + contentSize = sum.getSize(); + recalculateSize = false; + } + return contentSize; +} + +void FrameSet::getContent(std::string& out) const { + out.clear(); + out.reserve(getContentSize()); + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) + out += i->castBody<AMQContentBody>()->getData(); + } +} + +std::string FrameSet::getContent() const { + std::string out; + getContent(out); + return out; +} + +bool FrameSet::hasContent() const { + return parts.size() >= 3; +} + +void FrameSet::setReceived() +{ + received = AbsTime::now(); +} +namespace { +uint64_t MAX_TTL = std::numeric_limits<int64_t>::max()/TIME_MSEC; +} + +bool FrameSet::hasExpired() const +{ + const DeliveryProperties* props = getHeaderProperties<DeliveryProperties>(); + if (props && props->hasTtl() && props->getTtl() < MAX_TTL) { + AbsTime expiration(received, props->getTtl()*TIME_MSEC); + return expiration < AbsTime::now(); + } + return false; +} diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h new file mode 100644 index 0000000000..e234864dfd --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -0,0 +1,130 @@ +#ifndef QPID_FRAMING_FRAMESET_H +#define QPID_FRAMING_FRAMESET_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/InlineVector.h" +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Time.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +/** + * Collects the frames representing a message. + */ +class FrameSet +{ +public: + typedef InlineVector<AMQFrame, 4> Frames; + +private: + const SequenceNumber id; + Frames parts; + mutable uint64_t contentSize; + mutable bool recalculateSize; + qpid::sys::AbsTime received; + +public: + typedef boost::shared_ptr<FrameSet> shared_ptr; + typedef Frames::iterator iterator; + typedef Frames::const_iterator const_iterator; + + QPID_COMMON_EXTERN FrameSet(const SequenceNumber& id); + QPID_COMMON_EXTERN FrameSet(const FrameSet&); + QPID_COMMON_EXTERN void append(const AMQFrame& part); + QPID_COMMON_EXTERN bool isComplete() const; + + QPID_COMMON_EXTERN uint64_t getContentSize() const; + + QPID_COMMON_EXTERN void getContent(std::string&) const; + QPID_COMMON_EXTERN std::string getContent() const; + QPID_COMMON_EXTERN bool hasContent() const; + + QPID_COMMON_EXTERN void setReceived(); + QPID_COMMON_EXTERN bool hasExpired() const; + + bool isContentBearing() const; + + QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const; + QPID_COMMON_EXTERN AMQMethodBody* getMethod(); + QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const; + QPID_COMMON_EXTERN AMQHeaderBody* getHeaders(); + + template <class T> bool isA() const { + const AMQMethodBody* method = getMethod(); + return method && method->isA<T>(); + } + + template <class T> const T* as() const { + const AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; + } + + template <class T> T* as() { + AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0; + } + + template <class T> const T* getHeaderProperties() const { + const AMQHeaderBody* header = getHeaders(); + return header ? header->get<T>() : 0; + } + + Frames::const_iterator begin() const { return parts.begin(); } + Frames::const_iterator end() const { return parts.end(); } + + const SequenceNumber& getId() const { return id; } + + template <class P> void remove(P predicate) { + parts.erase(std::remove_if(parts.begin(), parts.end(), predicate), parts.end()); + } + + template <class F> void map(F& functor) { + std::for_each(parts.begin(), parts.end(), functor); + } + + template <class F> void map(F& functor) const { + std::for_each(parts.begin(), parts.end(), functor); + } + + template <class F, class P> void map_if(F& functor, P predicate) { + for(Frames::iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } + + template <class F, class P> void map_if(F& functor, P predicate) const { + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } +}; + +} +} + + +#endif /*!QPID_FRAMING_FRAMESET_H*/ diff --git a/qpid/cpp/src/qpid/framing/Handler.h b/qpid/cpp/src/qpid/framing/Handler.h new file mode 100644 index 0000000000..f2a9fccde0 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Handler.h @@ -0,0 +1,103 @@ +#ifndef QPID_FRAMING_HANDLER_H +#define QPID_FRAMING_HANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace framing { + +template <class T> +struct Handler { + typedef T HandledType; + typedef void handleFptr(T); + typedef void result_type; // Compatible with std/boost functors. + + Handler(Handler<T>* next_=0) : next(next_) {} + virtual ~Handler() {} + virtual void handle(T) = 0; + + /** Allow functor syntax for calling handle */ + void operator()(T t) { handle(t); } + + + /** Pointer to next handler in a linked list. */ + Handler<T>* next; + + /** Adapt any void(T) functor as a Handler. + * Functor<F>(f) will copy f. + * Functor<F&>(f) will only take a reference to x. + */ + template <class F> class Functor; + + /** Adapt a member function of X as a Handler. + * Only holds a reference to its target, not a copy. + */ + template <class X, void (X::*F)(T)> class MemFunRef; + + /** Interface for a handler that implements a + * pair of in/out handle operations. + * @see InOutHandler + */ + class InOutHandlerInterface { + public: + virtual ~InOutHandlerInterface() {} + virtual void handleIn(T) = 0; + virtual void handleOut(T) = 0; + }; + + /** Support for implementing an in-out handler pair as a single class. + * Overrides handleIn, handleOut functions in a single class. + */ + struct InOutHandler : protected InOutHandlerInterface { + InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in; + MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out; + }; +}; + +template <class T> +template <class F> +class Handler<T>::Functor : public Handler<T> { + public: + Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {} + void handle(T t) { functor(t); } + private: + F functor; +}; + +template <class T> +template <class X, void (X::*F)(T)> +class Handler<T>::MemFunRef : public Handler<T> { + public: + MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} + void handle(T t) { (target->*F)(t); } + + /** Allow calling with -> syntax */ + MemFunRef* operator->() { return this; } + + private: + X* target; +}; + +}} +#endif /*!QPID_FRAMING_HANDLER_H*/ +// diff --git a/qpid/cpp/src/qpid/framing/HeaderProperties.h b/qpid/cpp/src/qpid/framing/HeaderProperties.h new file mode 100644 index 0000000000..8b1828daec --- /dev/null +++ b/qpid/cpp/src/qpid/framing/HeaderProperties.h @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" + +#ifndef _HeaderProperties_ +#define _HeaderProperties_ + +namespace qpid { +namespace framing { + + class HeaderProperties + { + + public: + inline virtual ~HeaderProperties(){} + virtual uint8_t classId() const = 0; + virtual uint32_t encodedSize() const = 0; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t size) = 0; + }; +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/InitiationHandler.cpp b/qpid/cpp/src/qpid/framing/InitiationHandler.cpp new file mode 100644 index 0000000000..7ded505a47 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InitiationHandler.cpp @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/InitiationHandler.h" + +qpid::framing::InitiationHandler::~InitiationHandler() {} diff --git a/qpid/cpp/src/qpid/framing/InitiationHandler.h b/qpid/cpp/src/qpid/framing/InitiationHandler.h new file mode 100644 index 0000000000..5dfcc6b468 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InitiationHandler.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +#ifndef _InitiationHandler_ +#define _InitiationHandler_ + +#include "qpid/framing/ProtocolInitiation.h" + +namespace qpid { +namespace framing { + + class InitiationHandler{ + public: + virtual ~InitiationHandler(); + virtual void initiated(const ProtocolInitiation&) = 0; + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/InputHandler.h b/qpid/cpp/src/qpid/framing/InputHandler.h new file mode 100644 index 0000000000..3efb23632a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/InputHandler.h @@ -0,0 +1,41 @@ +#ifndef _InputHandler_ +#define _InputHandler_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace framing { + +// TODO aconway 2007-08-29: Eliminate, replace with FrameHandler. +class InputHandler : public FrameHandler { + public: + virtual ~InputHandler() {} + virtual void received(AMQFrame&) = 0; + void handle(AMQFrame& f) { received(f); } +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/Invoker.h b/qpid/cpp/src/qpid/framing/Invoker.h new file mode 100644 index 0000000000..4f1cf7c331 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Invoker.h @@ -0,0 +1,86 @@ +#ifndef QPID_FRAMING_INVOKER_H +#define QPID_FRAMING_INVOKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/MethodBodyDefaultVisitor.h" +#include "qpid/framing/StructHelper.h" + +#include <boost/optional.hpp> + +namespace qpid { +namespace framing { + +class AMQMethodBody; + +/** + * Base class for invoker visitors. + */ +class Invoker: public MethodBodyDefaultVisitor, protected StructHelper +{ + public: + struct Result { + public: + Result() : handled(false) {} + const std::string& getResult() const { return result; } + bool hasResult() const { return !result.empty(); } + bool wasHandled() const { return handled; } + operator bool() const { return handled; } + + std::string result; + bool handled; + }; + + void defaultVisit(const AMQMethodBody&) {} + Result getResult() const { return result; } + + protected: + Result result; +}; + +/** + * Invoke an invocable object. + * Invocable classes must provide a nested type Invoker. + */ +template <class Invocable> +Invoker::Result invoke(Invocable& target, const AMQMethodBody& body) { + typename Invocable::Invoker invoker(target); + body.accept(invoker); + return invoker.getResult(); +} + +/** + * Invoke an invocable object. + * Invocable classes must provide a nested type Invoker. + */ +template <class Invocable> +Invoker::Result invoke(Invocable& target, const AMQBody& body) { + typename Invocable::Invoker invoker(target); + const AMQMethodBody* method = body.getMethod(); + if (method) + method->accept(invoker); + return invoker.getResult(); +} + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_INVOKER_H*/ diff --git a/qpid/cpp/src/qpid/framing/IsInSequenceSet.h b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h new file mode 100644 index 0000000000..fe10c1b9fa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h @@ -0,0 +1,51 @@ +#ifndef QPID_FRAMING_ISINSEQUENCESET_H +#define QPID_FRAMING_ISINSEQUENCESET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" + +namespace qpid { +namespace framing { +/** + * Functor to test whether values are in a sequence set. This is a + * stateful functor that requires the values to be supplied in order + * and takes advantage of that ordering to avoid multiple scans. + */ +class IsInSequenceSet +{ + public: + IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {} + + bool operator()(const SequenceNumber& n) { + while (i != set.rangesEnd() && i->end() <= n) ++i; + return i != set.rangesEnd() && i->begin() <= n; + } + + private: + const SequenceSet& set; + SequenceSet::RangeIterator i; +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_ISINSEQUENCESET_H*/ diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp new file mode 100644 index 0000000000..d7ea172bac --- /dev/null +++ b/qpid/cpp/src/qpid/framing/List.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/List.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +uint32_t List::encodedSize() const +{ + uint32_t len(4/*size*/ + 4/*count*/); + for(Values::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->encodedSize(); + } + return len; +} + +void List::encode(Buffer& buffer) const +{ + buffer.putLong(encodedSize() - 4); + buffer.putLong(size()); + for (Values::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->encode(buffer); + } +} + +void List::decode(Buffer& buffer) +{ + values.clear(); + if (buffer.available() < 4) + throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least " + " 4 bytes but only " << buffer.available() << " available")); + uint32_t size = buffer.getLong(); + uint32_t available = buffer.available(); + if (available < size) { + throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected " + << size << " bytes but only " << available << " available")); + } + if (size) { + if (buffer.available() < 4) + throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least " + " 4 bytes but only " << buffer.available() << " available")); + uint32_t count = buffer.getLong(); + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->decode(buffer); + values.push_back(value); + } + } +} + + +bool List::operator==(const List& other) const { + return values.size() == other.values.size() && + std::equal(values.begin(), values.end(), other.values.begin()); +} + +std::ostream& operator<<(std::ostream& out, const List& l) +{ + out << "{"; + for(List::Values::const_iterator i = l.values.begin(); i != l.values.end(); ++i) { + if (i != l.values.begin()) out << ", "; + (*i)->print(out); + } + return out << "}"; +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/List.h b/qpid/cpp/src/qpid/framing/List.h new file mode 100644 index 0000000000..681445947c --- /dev/null +++ b/qpid/cpp/src/qpid/framing/List.h @@ -0,0 +1,78 @@ +#ifndef QPID_FRAMING_LIST_H +#define QPID_FRAMING_LIST_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/CommonImportExport.h" +#include "qpid/framing/amqp_types.h" +#include <iostream> +#include <list> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace framing { + +class Buffer; +class FieldValue; + +/** + * Representation of an AMQP 0-10 list + */ +class QPID_COMMON_CLASS_EXTERN List +{ + public: + typedef boost::shared_ptr<FieldValue> ValuePtr; + typedef ValuePtr value_type; + typedef std::list<ValuePtr> Values; + typedef Values::const_iterator const_iterator; + typedef Values::iterator iterator; + typedef Values::const_reference const_reference; + typedef Values::reference reference; + + QPID_COMMON_EXTERN uint32_t encodedSize() const; + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer); + + QPID_COMMON_EXTERN bool operator==(const List& other) const; + + // std collection interface. + QPID_COMMON_INLINE_EXTERN const_iterator begin() const { return values.begin(); } + QPID_COMMON_INLINE_EXTERN const_iterator end() const { return values.end(); } + QPID_COMMON_INLINE_EXTERN iterator begin() { return values.begin(); } + QPID_COMMON_INLINE_EXTERN iterator end(){ return values.end(); } + + QPID_COMMON_INLINE_EXTERN ValuePtr front() const { return values.front(); } + QPID_COMMON_INLINE_EXTERN ValuePtr back() const { return values.back(); } + QPID_COMMON_INLINE_EXTERN size_t size() const { return values.size(); } + + QPID_COMMON_INLINE_EXTERN iterator insert(iterator i, ValuePtr value) { return values.insert(i, value); } + QPID_COMMON_INLINE_EXTERN void erase(iterator i) { values.erase(i); } + QPID_COMMON_INLINE_EXTERN void push_back(ValuePtr value) { values.insert(end(), value); } + QPID_COMMON_INLINE_EXTERN void pop_back() { values.pop_back(); } + + private: + Values values; + + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& out, const List& list); +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_LIST_H*/ diff --git a/qpid/cpp/src/qpid/framing/MethodBodyFactory.h b/qpid/cpp/src/qpid/framing/MethodBodyFactory.h new file mode 100644 index 0000000000..88bc444795 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/MethodBodyFactory.h @@ -0,0 +1,45 @@ +#ifndef QPID_FRAMING_METHODBODYFACTORY_H +#define QPID_FRAMING_METHODBODYFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQBody.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace framing { + +class AMQMethodBody; + +/** + * Functions to create instances of AMQMethodBody sub-classes. + * Note: MethodBodyFactory.cpp file is generated by rubygen. + */ +class MethodBodyFactory +{ + public: + static boost::intrusive_ptr<AMQMethodBody> create(ClassId c, MethodId m); +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_METHODBODYFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/framing/MethodContent.h b/qpid/cpp/src/qpid/framing/MethodContent.h new file mode 100644 index 0000000000..58c9143cfa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/MethodContent.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MethodContent_ +#define _MethodContent_ + +#include <string> +#include "qpid/framing/AMQHeaderBody.h" + +namespace qpid { +namespace framing { + +class MethodContent +{ +public: + virtual ~MethodContent() {} + //TODO: rethink this interface + virtual const AMQHeaderBody& getHeader() const = 0; + virtual const std::string& getData() const = 0; +}; + +}} +#endif diff --git a/qpid/cpp/src/qpid/framing/ModelMethod.h b/qpid/cpp/src/qpid/framing/ModelMethod.h new file mode 100644 index 0000000000..d99bd06cfa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ModelMethod.h @@ -0,0 +1,49 @@ +#ifndef _ModelMethod_ +#define _ModelMethod_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/Header.h" + +namespace qpid { +namespace framing { + + +class ModelMethod : public AMQMethodBody +{ + mutable Header header; +public: + virtual ~ModelMethod() {} + virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); } + virtual void decodeHeader(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); } + virtual uint32_t headerSize() const { return header.encodedSize(); } + virtual bool isSync() const { return header.getSync(); } + virtual void setSync(bool on) const { header.setSync(on); } + Header& getHeader() { return header; } + const Header& getHeader() const { return header; } +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp new file mode 100644 index 0000000000..19cb3f0e3d --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolInitiation.h" + +#include <iostream> + +namespace qpid { +namespace framing { + +ProtocolInitiation::ProtocolInitiation(){} + +ProtocolInitiation::ProtocolInitiation(uint8_t _major, uint8_t _minor) : version(_major, _minor) {} + +ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} + +ProtocolInitiation::~ProtocolInitiation(){} + +void ProtocolInitiation::encode(Buffer& buffer) const { + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('Q'); + buffer.putOctet('P'); + if (version.getMajor() == 1) { + buffer.putOctet(version.getProtocol()); + buffer.putOctet(version.getMajor()); + buffer.putOctet(version.getMinor()); + buffer.putOctet(0);//revision + } else { + buffer.putOctet(1);//class + buffer.putOctet(1);//instance + buffer.putOctet(version.getMajor()); + buffer.putOctet(version.getMinor()); + } +} + +bool ProtocolInitiation::decode(Buffer& buffer){ + if(buffer.available() >= 8){ + buffer.getOctet();//A + buffer.getOctet();//M + buffer.getOctet();//Q + buffer.getOctet();//P + uint8_t protocolClass = buffer.getOctet();//class + version.setProtocol(protocolClass); + if (protocolClass == 1) { + //old (pre-1.0) style + buffer.getOctet();//instance + version.setMajor(buffer.getOctet()); + version.setMinor(buffer.getOctet()); + } else { + version.setMajor(buffer.getOctet()); + version.setMinor(buffer.getOctet()); + buffer.getOctet();//revision + } + return true; + }else{ + return false; + } +} + + +std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi) { + return o << int(pi.getMajor()) << "-" << int(pi.getMinor()); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h new file mode 100644 index 0000000000..fe6410af55 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQDataBlock.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/CommonImportExport.h" + +#ifndef _ProtocolInitiation_ +#define _ProtocolInitiation_ + +namespace qpid { +namespace framing { + +class ProtocolInitiation : public AMQDataBlock +{ +private: + ProtocolVersion version; + +public: + QPID_COMMON_EXTERN ProtocolInitiation(); + QPID_COMMON_EXTERN ProtocolInitiation(uint8_t major, uint8_t minor); + QPID_COMMON_EXTERN ProtocolInitiation(ProtocolVersion p); + QPID_COMMON_EXTERN virtual ~ProtocolInitiation(); + QPID_COMMON_EXTERN virtual void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN virtual bool decode(Buffer& buffer); + inline virtual uint32_t encodedSize() const { return 8; } + inline uint8_t getMajor() const { return version.getMajor(); } + inline uint8_t getMinor() const { return version.getMinor(); } + inline ProtocolVersion getVersion() const { return version; } + bool operator==(ProtocolVersion v) const { return v == getVersion(); } + bool matches(ProtocolVersion v) const { return v == getVersion(); } +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi); + + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp b/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp new file mode 100644 index 0000000000..269b861191 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolVersion.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolVersion.h" +#include <sstream> + +using namespace qpid::framing; + +const std::string ProtocolVersion::toString() const +{ + std::stringstream ss; + ss << unsigned(major_) << "-" << unsigned(minor_); + if (major_ == 1) { + if (protocol_ == SASL) ss << " (SASL)"; + else if (protocol_ == TLS) ss << " (TLS)"; + } + return ss.str(); +} + +ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p) +{ + major_ = p.major_; + minor_ = p.minor_; + return *this; +} + +bool ProtocolVersion::operator==(ProtocolVersion p) const +{ + return major_ == p.major_ && minor_ == p.minor_; +} + +const uint8_t ProtocolVersion::AMQP(0); +const uint8_t ProtocolVersion::LEGACY_AMQP(1); +const uint8_t ProtocolVersion::TLS(2); +const uint8_t ProtocolVersion::SASL(3); diff --git a/qpid/cpp/src/qpid/framing/ProtocolVersion.h b/qpid/cpp/src/qpid/framing/ProtocolVersion.h new file mode 100644 index 0000000000..92580baf1a --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ProtocolVersion.h @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ProtocolVersion_ +#define _ProtocolVersion_ + +#include "qpid/framing/amqp_types.h" +#include "qpid/CommonImportExport.h" + +#include <string> + +namespace qpid +{ +namespace framing +{ + +class QPID_COMMON_CLASS_EXTERN ProtocolVersion +{ +private: + uint8_t major_; + uint8_t minor_; + uint8_t protocol_; + +public: + explicit ProtocolVersion(uint8_t _major=0, uint8_t _minor=0, uint8_t _protocol=0) + : major_(_major), minor_(_minor), protocol_(_protocol) {} + + QPID_COMMON_INLINE_EXTERN uint8_t getMajor() const { return major_; } + QPID_COMMON_INLINE_EXTERN void setMajor(uint8_t major) { major_ = major; } + QPID_COMMON_INLINE_EXTERN uint8_t getMinor() const { return minor_; } + QPID_COMMON_INLINE_EXTERN void setMinor(uint8_t minor) { minor_ = minor; } + QPID_COMMON_INLINE_EXTERN uint8_t getProtocol() const { return protocol_; } + QPID_COMMON_INLINE_EXTERN void setProtocol(uint8_t protocol) { protocol_ = protocol; } + QPID_COMMON_EXTERN const std::string toString() const; + + QPID_COMMON_EXTERN ProtocolVersion& operator=(ProtocolVersion p); + + QPID_COMMON_EXTERN bool operator==(ProtocolVersion p) const; + QPID_COMMON_INLINE_EXTERN bool operator!=(ProtocolVersion p) const { return ! (*this == p); } + QPID_COMMON_EXTERN static const uint8_t AMQP; + QPID_COMMON_EXTERN static const uint8_t LEGACY_AMQP; + QPID_COMMON_EXTERN static const uint8_t TLS; + QPID_COMMON_EXTERN static const uint8_t SASL; +}; + +} // namespace framing +} // namespace qpid + + +#endif // ifndef _ProtocolVersion_ diff --git a/qpid/cpp/src/qpid/framing/Proxy.cpp b/qpid/cpp/src/qpid/framing/Proxy.cpp new file mode 100644 index 0000000000..452fb13b01 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Proxy.cpp @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/Proxy.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace framing { + +Proxy::Proxy(FrameHandler& h) : out(&h), sync(false) {} + +Proxy::~Proxy() {} + +void Proxy::send(const AMQBody& b) { + if (sync) { + const AMQMethodBody* m = dynamic_cast<const AMQMethodBody*>(&b); + if (m) m->setSync(sync); + } + AMQFrame f(b); + out->handle(f); +} + +ProtocolVersion Proxy::getVersion() const { + return ProtocolVersion(); +} + +FrameHandler& Proxy::getHandler() { return *out; } + +void Proxy::setHandler(FrameHandler& f) { out=&f; } + +Proxy::ScopedSync::ScopedSync(Proxy& p) : proxy(p) { proxy.sync = true; } +Proxy::ScopedSync::~ScopedSync() { proxy.sync = false; } + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h new file mode 100644 index 0000000000..0884e9cbd2 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Proxy.h @@ -0,0 +1,64 @@ +#ifndef _framing_Proxy_h +#define _framing_Proxy_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ProtocolVersion.h" + +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class AMQBody; + +/** + * Base class for proxies. + */ +class Proxy +{ + public: + class ScopedSync + { + Proxy& proxy; + public: + QPID_COMMON_EXTERN ScopedSync(Proxy& p); + QPID_COMMON_EXTERN ~ScopedSync(); + }; + + QPID_COMMON_EXTERN Proxy(FrameHandler& h); + QPID_COMMON_EXTERN virtual ~Proxy(); + + QPID_COMMON_EXTERN void send(const AMQBody&); + + QPID_COMMON_EXTERN ProtocolVersion getVersion() const; + + QPID_COMMON_EXTERN FrameHandler& getHandler(); + QPID_COMMON_EXTERN void setHandler(FrameHandler&); + private: + FrameHandler* out; + bool sync; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Proxy_h*/ diff --git a/qpid/cpp/src/qpid/framing/ResizableBuffer.h b/qpid/cpp/src/qpid/framing/ResizableBuffer.h new file mode 100644 index 0000000000..0abc5ba7f4 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/ResizableBuffer.h @@ -0,0 +1,60 @@ +#ifndef QPID_FRAMING_RESIZABLEBUFFER_H +#define QPID_FRAMING_RESIZABLEBUFFER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Buffer.h" +#include <vector> + +namespace qpid { +namespace framing { + +/** + * A buffer that maintains its own storage and can be resized, + * keeping any data already written to the buffer. + */ +class ResizableBuffer : public Buffer +{ + public: + ResizableBuffer(size_t initialSize) : store(initialSize) { + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + } + + void resize(size_t newSize) { + size_t oldPos = getPosition(); + store.resize(newSize); + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + setPosition(oldPos); + } + + /** Make sure at least n bytes are available */ + void makeAvailable(size_t n) { + if (n > available()) + resize(getSize() + n - available()); + } + + private: + std::vector<char> store; +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_RESIZABLEBUFFER_H*/ diff --git a/qpid/cpp/src/qpid/framing/SendContent.cpp b/qpid/cpp/src/qpid/framing/SendContent.cpp new file mode 100644 index 0000000000..04b60396da --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SendContent.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SendContent.h" + +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t mfs, uint efc) : handler(h), + maxFrameSize(mfs), + expectedFrameCount(efc), frameCount(0) {} + +void qpid::framing::SendContent::operator()(const AMQFrame& f) +{ + bool first = frameCount == 0; + bool last = ++frameCount == expectedFrameCount; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + const AMQContentBody* body(f.castBody<AMQContentBody>()); + if (body->encodedSize() > maxContentSize) { + uint32_t offset = 0; + for (int chunk = body->encodedSize() / maxContentSize; chunk > 0; chunk--) { + sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->encodedSize()); + offset += maxContentSize; + } + uint32_t remainder = body->encodedSize() % maxContentSize; + if (remainder) { + sendFragment(*body, offset, remainder, first && offset == 0, last); + } + } else { + AMQFrame copy(f); + setFlags(copy, first, last); + handler.handle(copy); + } +} + +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const +{ + AMQFrame fragment((AMQContentBody(body.getData().substr(offset, size)))); + setFlags(fragment, first, last); + handler.handle(fragment); +} + +void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const +{ + f.setBof(false); + f.setBos(first); + f.setEof(true);//content is always the last segment + f.setEos(last); +} + diff --git a/qpid/cpp/src/qpid/framing/SendContent.h b/qpid/cpp/src/qpid/framing/SendContent.h new file mode 100644 index 0000000000..1c464b9c8b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SendContent.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/CommonImportExport.h" + +#ifndef _SendContent_ +#define _SendContent_ + +namespace qpid { +namespace framing { + +/** + * Functor that sends frame to handler, refragmenting if + * necessary. Currently only works on content frames but this could be + * changed once we support multi-frame segments in general. + */ +class SendContent +{ + FrameHandler& handler; + const uint16_t maxFrameSize; + uint expectedFrameCount; + uint frameCount; + + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const; + void setFlags(AMQFrame& f, bool first, bool last) const; +public: + QPID_COMMON_EXTERN SendContent(FrameHandler& _handler, uint16_t _maxFrameSize, uint frameCount); + QPID_COMMON_EXTERN void operator()(const AMQFrame& f); +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp new file mode 100644 index 0000000000..41cb236629 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/Buffer.h" +#include <ostream> + +using qpid::framing::SequenceNumber; +using qpid::framing::Buffer; + +void SequenceNumber::encode(Buffer& buffer) const +{ + buffer.putLong(value); +} + +void SequenceNumber::decode(Buffer& buffer) +{ + value = buffer.getLong(); +} + +uint32_t SequenceNumber::encodedSize() const { + return 4; +} + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { + return o << n.getValue(); +} + +}} diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h new file mode 100644 index 0000000000..00fa2469c8 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceNumber_h +#define _framing_SequenceNumber_h + +#include "qpid/framing/amqp_types.h" +#include <boost/operators.hpp> +#include <iosfwd> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class Buffer; + +/** + * 4-byte sequence number that 'wraps around'. + */ +class QPID_COMMON_CLASS_EXTERN SequenceNumber : public +boost::equality_comparable< + SequenceNumber, boost::less_than_comparable< + SequenceNumber, boost::incrementable< + SequenceNumber, boost::decrementable<SequenceNumber> > > > +{ + int32_t value; + + public: + SequenceNumber(uint32_t v=0) : value(v) {} + + SequenceNumber& operator++() { ++value; return *this; } + SequenceNumber& operator--() { --value; return *this; } + bool operator==(const SequenceNumber& other) const { return value == other.value; } + bool operator<(const SequenceNumber& other) const { return (value - other.value) < 0; } + uint32_t getValue() const { return uint32_t(value); } + operator uint32_t() const { return uint32_t(value); } + + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer); + QPID_COMMON_EXTERN uint32_t encodedSize() const; + + template <class S> void serialize(S& s) { s(value); } +}; + +inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) { + return int32_t(a.getValue() - b.getValue()); +} + +inline SequenceNumber operator+(const SequenceNumber& a, int32_t n) { + return SequenceNumber(a.getValue() + n); +} + +inline SequenceNumber operator-(const SequenceNumber& a, int32_t n) { + return SequenceNumber(a.getValue() - n); +} + +struct Window +{ + SequenceNumber hwm; + SequenceNumber lwm; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& o, const SequenceNumber& n); + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp new file mode 100644 index 0000000000..e9d78f3c17 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumberSet.h" + +using namespace qpid::framing; + +void SequenceNumberSet::encode(Buffer& buffer) const +{ + buffer.putShort(size() * 4); + for (const_iterator i = begin(); i != end(); i++) { + buffer.putLong(i->getValue()); + } +} + +void SequenceNumberSet::decode(Buffer& buffer) +{ + clear(); + uint16_t count = (buffer.getShort() / 4); + for (uint16_t i = 0; i < count; i++) { + push_back(SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceNumberSet::encodedSize() const +{ + return 2 /*count*/ + (size() * 4); +} + +SequenceNumberSet SequenceNumberSet::condense() const +{ + SequenceNumberSet result; + const_iterator last = end(); + const_iterator start = end(); + for (const_iterator i = begin(); i != end(); i++) { + if (start == end()) { + start = i; + } else if (*i - *last > 1) { + result.push_back(*start); + result.push_back(*last); + start = i; + } + last = i; + } + if (start != end()) { + result.push_back(*start); + result.push_back(*last); + } + return result; +} + +void SequenceNumberSet::addRange(const SequenceNumber& start, const SequenceNumber& end) +{ + push_back(start); + push_back(end); +} + +namespace qpid{ +namespace framing{ + +std::ostream& operator<<(std::ostream& out, const SequenceNumberSet& set) { + out << "{"; + for (SequenceNumberSet::const_iterator i = set.begin(); i != set.end(); i++) { + if (i != set.begin()) out << ", "; + out << (i->getValue()); + } + out << "}"; + return out; +} + +} +} diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.h b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h new file mode 100644 index 0000000000..c8356c8163 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceNumberSet_h +#define _framing_SequenceNumberSet_h + +#include <ostream> +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/InlineVector.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +class SequenceNumberSet : public InlineVector<SequenceNumber, 2> +{ + typedef InlineVector<SequenceNumber, 2> Base; +public: + typedef Base::const_iterator const_iterator; + typedef Base::iterator iterator; + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t encodedSize() const; + QPID_COMMON_EXTERN SequenceNumberSet condense() const; + QPID_COMMON_EXTERN void addRange(const SequenceNumber& start, const SequenceNumber& end); + + template <class T> + void processRanges(T& t) const + { + if (size() % 2) { //must be even number + throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); + } + + for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { + SequenceNumber first = *(i); + SequenceNumber last = *(++i); + t(first, last); + } + } + + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp new file mode 100644 index 0000000000..6510842c58 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +using namespace qpid::framing; +using std::max; +using std::min; + +namespace qpid { +namespace framing { + +namespace { +//each range contains 2 numbers, 4 bytes each +uint16_t RANGE_SIZE = 2 * 4; +int32_t MAX_RANGE = 2147483647;//2^31-1 + +int32_t gap(const SequenceNumber& a, const SequenceNumber& b) +{ + return a < b ? b - a : a - b; +} + +bool is_max_range(const SequenceNumber& a, const SequenceNumber& b) +{ + return gap(a, b) == MAX_RANGE; +} +} + +void SequenceSet::encode(Buffer& buffer) const +{ + buffer.putShort(rangesSize() * RANGE_SIZE); + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) { + buffer.putLong(i->first().getValue()); + buffer.putLong(i->last().getValue()); + } +} + +void SequenceSet::decode(Buffer& buffer) +{ + clear(); + uint16_t size = buffer.getShort(); + uint16_t count = size / RANGE_SIZE;//number of ranges + if (size % RANGE_SIZE) + throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); + + for (uint16_t i = 0; i < count; i++) { + SequenceNumber a(buffer.getLong()); + SequenceNumber b(buffer.getLong()); + if (b < a) + throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b)); + if (is_max_range(a, b)) { + //RangeSet holds 'half-closed' ranges, where the end is + //one past the 'highest' value in the range. So if the + //range is already the maximum expressable with a 32bit + //sequence number, we can't represent it as a + //'half-closed' range, so we represent it as two ranges. + add(a, b-1); + add(b); + } else { + add(a, b); + } + } +} + +uint32_t SequenceSet::encodedSize() const { + return 2 /*size field*/ + (rangesSize() * RANGE_SIZE); +} + +bool SequenceSet::contains(const SequenceNumber& s) const { + return RangeSet<SequenceNumber>::contains(s); +} + +void SequenceSet::add(const SequenceNumber& s) { *this += s; } + +void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& finish) { + *this += Range<SequenceNumber>::makeClosed(std::min(start,finish), std::max(start, finish)); +} + +void SequenceSet::add(const SequenceSet& set) { *this += set; } + +void SequenceSet::remove(const SequenceSet& set) { *this -= set; } + +void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& finish) { + *this -= Range<SequenceNumber>::makeClosed(std::min(start,finish), std::max(start, finish)); +} + +void SequenceSet::remove(const SequenceNumber& s) { *this -= s; } + + +struct RangePrinter { + std::ostream& out; + RangePrinter(std::ostream& o) : out(o) {} + void operator()(SequenceNumber i, SequenceNumber j) const { + out << "[" << i.getValue() << "," << j.getValue() << "] "; + } +}; + +std::ostream& operator<<(std::ostream& o, const SequenceSet& s) { + RangePrinter print(o); + o << "{ "; + s.for_each(print); + return o << "}"; +} + +}} // namespace qpid::framing + diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h new file mode 100644 index 0000000000..827c8999b3 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.h @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceSet_h +#define _framing_SequenceSet_h + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/RangeSet.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { +class Buffer; + +class QPID_COMMON_CLASS_EXTERN SequenceSet : public RangeSet<SequenceNumber> { + public: + SequenceSet() {} + SequenceSet(const RangeSet<SequenceNumber>& r) + : RangeSet<SequenceNumber>(r) {} + SequenceSet(const SequenceNumber& s) { add(s); } + SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); } + + + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; + QPID_COMMON_EXTERN void decode(Buffer& buffer); + QPID_COMMON_EXTERN uint32_t encodedSize() const; + + QPID_COMMON_EXTERN bool contains(const SequenceNumber& s) const; + QPID_COMMON_EXTERN void add(const SequenceNumber& s); + QPID_COMMON_EXTERN void add(const SequenceNumber& start, const SequenceNumber& finish); // Closed range + QPID_COMMON_EXTERN void add(const SequenceSet& set); + QPID_COMMON_EXTERN void remove(const SequenceNumber& s); + QPID_COMMON_EXTERN void remove(const SequenceNumber& start, const SequenceNumber& finish); // Closed range + QPID_COMMON_EXTERN void remove(const SequenceSet& set); + + template <class T> void for_each(T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) + t(i->first(), i->last()); + } + + template <class T> void for_each(const T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) + t(i->first(), i->last()); + } + + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const SequenceSet&); +}; + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/StructHelper.h b/qpid/cpp/src/qpid/framing/StructHelper.h new file mode 100644 index 0000000000..fe2fa64ce7 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/StructHelper.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _StructHelper_ +#define _StructHelper_ + +#include "qpid/Exception.h" +#include "qpid/CommonImportExport.h" +#include "qpid/framing/Buffer.h" + +#include <stdlib.h> // For alloca + +namespace qpid { +namespace framing { + +class QPID_COMMON_CLASS_EXTERN StructHelper +{ +public: + + template <class T> void encode(const T& t, std::string& data) { + uint32_t size = t.bodySize() + 2/*type*/; + data.resize(size); + Buffer wbuffer(const_cast<char*>(data.data()), size); + wbuffer.putShort(T::TYPE); + t.encodeStructBody(wbuffer); + } + + template <class T> void decode(T& t, const std::string& data) { + Buffer rbuffer(const_cast<char*>(data.data()), data.length()); + uint16_t type = rbuffer.getShort(); + if (type == T::TYPE) { + t.decodeStructBody(rbuffer); + } else { + throw Exception("Type code does not match"); + } + } +}; + +}} +#endif diff --git a/qpid/cpp/src/qpid/framing/TransferContent.cpp b/qpid/cpp/src/qpid/framing/TransferContent.cpp new file mode 100644 index 0000000000..d997b24304 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TransferContent.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace framing { + +TransferContent::TransferContent(const std::string& data, const std::string& key) { + setData(data); + if (!key.empty()) getDeliveryProperties().setRoutingKey(key); +} + + +const AMQHeaderBody& TransferContent::getHeader() const +{ + return header; +} + +const std::string& TransferContent::getData() const { + return data; +} + +std::string& TransferContent::getData() { + return data; +} + +void TransferContent::setData(const std::string& _data) +{ + data = _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +void TransferContent::appendData(const std::string& _data) +{ + data += _data; + header.get<MessageProperties>(true)->setContentLength(data.size()); +} + +MessageProperties& TransferContent::getMessageProperties() +{ + return *header.get<MessageProperties>(true); +} + +DeliveryProperties& TransferContent::getDeliveryProperties() +{ + return *header.get<DeliveryProperties>(true); +} + +void TransferContent::populate(const FrameSet& frameset) +{ + const AMQHeaderBody* h = frameset.getHeaders(); + if (h) { + header = *h; + } + frameset.getContent(data); +} + +const MessageProperties& TransferContent::getMessageProperties() const +{ + const MessageProperties* props = header.get<MessageProperties>(); + if (!props) throw Exception("No message properties."); + return *props; +} + +const DeliveryProperties& TransferContent::getDeliveryProperties() const +{ + const DeliveryProperties* props = header.get<DeliveryProperties>(); + if (!props) throw Exception("No message properties."); + return *props; +} + +bool TransferContent::hasMessageProperties() const +{ + return header.get<MessageProperties>(); +} + +bool TransferContent::hasDeliveryProperties() const +{ + return header.get<DeliveryProperties>(); +} + + +}} diff --git a/qpid/cpp/src/qpid/framing/TransferContent.h b/qpid/cpp/src/qpid/framing/TransferContent.h new file mode 100644 index 0000000000..32663d7020 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TransferContent.h @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransferContent_ +#define _TransferContent_ + +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/Exception.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace framing { + +/** Message content */ +class QPID_COMMON_CLASS_EXTERN TransferContent : public MethodContent +{ + AMQHeaderBody header; + std::string data; +public: + QPID_COMMON_EXTERN TransferContent(const std::string& data = std::string(), const std::string& key=std::string()); + + ///@internal + QPID_COMMON_EXTERN const AMQHeaderBody& getHeader() const; + + QPID_COMMON_EXTERN void setData(const std::string&); + QPID_COMMON_EXTERN const std::string& getData() const; + QPID_COMMON_EXTERN std::string& getData(); + + QPID_COMMON_EXTERN void appendData(const std::string&); + + QPID_COMMON_EXTERN bool hasMessageProperties() const; + QPID_COMMON_EXTERN MessageProperties& getMessageProperties(); + QPID_COMMON_EXTERN const MessageProperties& getMessageProperties() const; + + QPID_COMMON_EXTERN bool hasDeliveryProperties() const; + QPID_COMMON_EXTERN DeliveryProperties& getDeliveryProperties(); + QPID_COMMON_EXTERN const DeliveryProperties& getDeliveryProperties() const; + + ///@internal + QPID_COMMON_EXTERN void populate(const FrameSet& frameset); +}; + +}} +#endif diff --git a/qpid/cpp/src/qpid/framing/TypeFilter.h b/qpid/cpp/src/qpid/framing/TypeFilter.h new file mode 100644 index 0000000000..d1c42de583 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/TypeFilter.h @@ -0,0 +1,51 @@ +#ifndef QPID_FRAMING_TYPEFILTER_H +#define QPID_FRAMING_TYPEFILTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace framing { + +/** + * Predicate that selects frames by type + */ +template <uint8_t Type> +struct TypeFilter { + bool operator()(const AMQFrame& f) const { + return f.getBody()->type() == Type; + } +}; + +template <uint8_t T1, uint8_t T2> +struct TypeFilter2 { + bool operator()(const AMQFrame& f) const { + return f.getBody()->type() == T1 || f.getBody()->type() == T2; + } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_TYPEFILTER_H*/ diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp new file mode 100644 index 0000000000..eb4c33be75 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/Uuid.h" + +#include "qpid/sys/uuid.h" +#include "qpid/Exception.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" + +namespace qpid { +namespace framing { + +using namespace std; + +Uuid::Uuid(bool unique): + qpid::types::Uuid(unique) +{} + +Uuid::Uuid(const uint8_t* data): + qpid::types::Uuid(data) +{} + +void Uuid::encode(Buffer& buf) const { + buf.putRawData(data(), size()); +} + +void Uuid::decode(Buffer& buf) { + if (buf.available() < size()) + throw IllegalArgumentException(QPID_MSG("Not enough data for UUID.")); + + // Break qpid::types::Uuid encapsulation - Nasty, but efficient + buf.getRawData(const_cast<uint8_t*>(data()), size()); +} + +}} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/Uuid.h b/qpid/cpp/src/qpid/framing/Uuid.h new file mode 100644 index 0000000000..906e20951f --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Uuid.h @@ -0,0 +1,57 @@ +#ifndef QPID_FRAMING_UUID_H +#define QPID_FRAMING_UUID_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/CommonImportExport.h" +#include "qpid/sys/IntegerTypes.h" + +#include "qpid/types/Uuid.h" + +#include <ostream> +#include <istream> + +namespace qpid { +namespace framing { + +class Buffer; + +/** + * Framing UUID is now a thine wrapper around qpid::types::Uuid + */ +struct Uuid : public qpid::types::Uuid { + /** If unique is true, generate a unique ID else a null ID. */ + QPID_COMMON_EXTERN Uuid(bool unique=false); + + /** Copy from 16 bytes of data. */ + QPID_COMMON_EXTERN Uuid(const uint8_t* data); + + // We get most of our operations directly from qpid::types::Uuid + QPID_COMMON_INLINE_EXTERN static size_t size() + { return SIZE; } + + QPID_COMMON_EXTERN void encode(framing::Buffer& buf) const; + QPID_COMMON_EXTERN void decode(framing::Buffer& buf); + QPID_COMMON_INLINE_EXTERN uint32_t encodedSize() const + { return size(); } +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_UUID_H*/ diff --git a/qpid/cpp/src/qpid/framing/amqp_framing.h b/qpid/cpp/src/qpid/framing/amqp_framing.h new file mode 100644 index 0000000000..bad1c08a46 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/amqp_framing.h @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ProtocolVersion.h" diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h new file mode 100644 index 0000000000..3fe8b68dcd --- /dev/null +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -0,0 +1,63 @@ +#ifndef AMQP_TYPES_H +#define AMQP_TYPES_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** \file + * Definitions and forward declarations of all types used + * in AMQP messages. + */ + +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace framing { + +typedef uint8_t FrameType; +typedef uint16_t ChannelId; +typedef uint32_t BatchOffset; +typedef uint8_t ClassId; +typedef uint8_t MethodId; +typedef uint16_t ReplyCode; + +// Types represented by classes. +class Content; +class FieldTable; +class SequenceNumberSet; +struct Uuid; + +// Useful constants + +/** Maximum channel ID used by broker. */ +const ChannelId CHANNEL_MAX=(ChannelId(~1)); + +// Forward declare class types +class FramingContent; +class FieldTable; +class SequenceNumberSet; +class SequenceSet; +struct Uuid; + +// Enum types +enum DeliveryMode { TRANSIENT = 1, PERSISTENT = 2}; + +}} // namespace qpid::framing +#endif diff --git a/qpid/cpp/src/qpid/framing/amqp_types_full.h b/qpid/cpp/src/qpid/framing/amqp_types_full.h new file mode 100644 index 0000000000..c5d84dedea --- /dev/null +++ b/qpid/cpp/src/qpid/framing/amqp_types_full.h @@ -0,0 +1,38 @@ +#ifndef _framing_amqp_types_decl_h +#define _framing_amqp_types_decl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** \file + * Definitions and full declarations of all types used + * in AMQP messages. + * + * It's better to include amqp_types.h in another header instead of this file + * unless the header actually needs the full declarations. Including + * full declarations when forward declarations would increase compile + * times. + */ + +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/Uuid.h" + +#endif /*!_framing_amqp_types_decl_h*/ diff --git a/qpid/cpp/src/qpid/framing/frame_functors.h b/qpid/cpp/src/qpid/framing/frame_functors.h new file mode 100644 index 0000000000..d2064d6a57 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/frame_functors.h @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <ostream> +#include <iostream> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" + +#ifndef _frame_functors_ +#define _frame_functors_ + +namespace qpid { +namespace framing { + +class SumFrameSize +{ + uint64_t size; +public: + SumFrameSize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.encodedSize(); } + uint64_t getSize() { return size; } +}; + +class SumBodySize +{ + uint64_t size; +public: + SumBodySize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.getBody()->encodedSize(); } + uint64_t getSize() { return size; } +}; + +class Count +{ + uint count; +public: + Count() : count(0) {} + void operator()(const AMQFrame&) { count++; } + uint getCount() { return count; } +}; + +class EncodeFrame +{ + Buffer& buffer; +public: + EncodeFrame(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.encode(buffer); } +}; + +class EncodeBody +{ + Buffer& buffer; +public: + EncodeBody(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); } +}; + +/** + * Sends to the specified handler a copy of the frame it is applied to. + */ +class Relay +{ + FrameHandler& handler; + +public: + Relay(FrameHandler& h) : handler(h) {} + + void operator()(const AMQFrame& f) + { + AMQFrame copy(f); + handler.handle(copy); + } +}; + +class Print +{ + std::ostream& out; +public: + Print(std::ostream& o) : out(o) {} + + void operator()(const AMQFrame& f) + { + out << f << std::endl; + } +}; + +class MarkLastSegment +{ +public: + void operator()(AMQFrame& f) const { f.setEof(true); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/variant.h b/qpid/cpp/src/qpid/framing/variant.h new file mode 100644 index 0000000000..8e13063385 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/variant.h @@ -0,0 +1,91 @@ +#ifndef QPID_FRAMING_VARIANT_H +#define QPID_FRAMING_VARIANT_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tools for using boost::variant. */ + + +#include <boost/variant.hpp> + +namespace qpid { +namespace framing { +class Buffer; + +/** boost::static_visitor that throws an exception if variant contains a blank. + * Subclasses need to have a using() declaration, which can be generated + * with QPID_USING_NOBLANK(R) + */ +template <class R=void> +struct NoBlankVisitor : public boost::static_visitor<R> { + R foundBlank() const { + assert(0); + throw Exception(QPID_MSG("Invalid variant value.")); + } + R operator()(const boost::blank&) const { return foundBlank(); } + R operator()(boost::blank&) const { return foundBlank(); } +}; + + +}} // qpid::framing + + +/** Generate a using statement, needed in visitors inheriting NoBlankVisitor + * @param R return type. + */ +#define QPID_USING_NOBLANK(R) using ::qpid::framing::NoBlankVisitor<R>::operator() + +namespace qpid { +namespace framing { + +/** Convert the variant value to type R. */ +template <class R> struct ConvertVisitor : public NoBlankVisitor<R> { + QPID_USING_NOBLANK(R); + template <class T> R operator()(T& t) const { return t; } +}; + +/** Convert the address of variant value to type R. */ +template <class R> struct AddressVisitor : public NoBlankVisitor<R> { + QPID_USING_NOBLANK(R); + template <class T> R operator()(T& t) const { return &t; } +}; + +/** Apply a visitor to the nested variant.*/ +template<class V> +struct ApplyVisitor : public NoBlankVisitor<typename V::result_type> { + QPID_USING_NOBLANK(typename V::result_type); + const V& visitor; + ApplyVisitor(const V& v) : visitor(v) {} + template <class T> typename V::result_type operator()(T& t) const { + return boost::apply_visitor(visitor, t); + } +}; + +/** Convenience function to construct and apply an ApplyVisitor */ +template <class Visitor, class Visitable> +typename Visitor::result_type applyApplyVisitor(const Visitor& visitor, Visitable& visitable) { + return boost::apply_visitor(ApplyVisitor<Visitor>(visitor), visitable); +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_VARIANT_H*/ |