diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.h')
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 97 |
1 files changed, 80 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index ecd84901f9..f1829a17bc 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -35,6 +35,7 @@ #include <string> #include <vector> #include <boost/intrusive_ptr.hpp> +#include <boost/scoped_ptr.hpp> namespace qpid { namespace amqp { @@ -80,7 +81,46 @@ public: virtual uint64_t getTimestamp() const = 0; }; - QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>); + class SharedState : public Encoding + { + public: + virtual ~SharedState() {} + virtual const Connection* getPublisher() const = 0; + virtual void setPublisher(const Connection* p) = 0; + + virtual void setExpiration(sys::AbsTime e) = 0; + virtual sys::AbsTime getExpiration() const = 0; + virtual sys::Duration getTimeToExpiration() const = 0; + virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0; + virtual bool hasExpired(const Message& m) const = 0; + virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0; + + virtual bool getIsManagementMessage() const = 0; + virtual void setIsManagementMessage(bool b) = 0; + }; + + class SharedStateImpl : public SharedState + { + const Connection* publisher; + qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + bool isManagementMessage; + public: + SharedStateImpl(); + virtual ~SharedStateImpl() {} + QPID_BROKER_EXTERN const Connection* getPublisher() const; + QPID_BROKER_EXTERN void setPublisher(const Connection* p); + QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e); + QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; + QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const; + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); + QPID_BROKER_EXTERN bool hasExpired(const Message& m) const; + QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + QPID_BROKER_EXTERN bool getIsManagementMessage() const; + QPID_BROKER_EXTERN void setIsManagementMessage(bool b); + }; + + QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>); QPID_BROKER_EXTERN Message(); QPID_BROKER_EXTERN ~Message(); @@ -91,20 +131,14 @@ public: int getDeliveryCount() const { return deliveryCount; } void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; } - QPID_BROKER_EXTERN void setPublisher(const Connection& p); const Connection* getPublisher() const; bool isLocalTo(const OwnershipToken*) const; QPID_BROKER_EXTERN std::string getRoutingKey() const; QPID_BROKER_EXTERN bool isPersistent() const; - /** determine msg expiration time using the TTL value if present */ - QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); - void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); - bool hasExpired() const; - sys::AbsTime getExpiration() const { return expiration; } - void setExpiration(sys::AbsTime exp) { expiration = exp; } + QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; uint64_t getTtl() const; QPID_BROKER_EXTERN bool getTtl(uint64_t&) const; @@ -121,12 +155,11 @@ public: QPID_BROKER_EXTERN uint64_t getMessageSize() const; - QPID_BROKER_EXTERN Encoding& getEncoding(); QPID_BROKER_EXTERN const Encoding& getEncoding() const; QPID_BROKER_EXTERN operator bool() const; + QPID_BROKER_EXTERN SharedState& getSharedState(); bool getIsManagementMessage() const; - void setIsManagementMessage(bool b); QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const; QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&); @@ -146,21 +179,51 @@ public: QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id); private: - boost::intrusive_ptr<Encoding> encoding; + /** + * Template for optional members that are only constructed when + * if/when needed, to conserve memory. (Boost::optional doesn't + * help here). + */ + template <typename T> class Optional + { + boost::scoped_ptr<T> value; + public: + Optional() : value(0) {} + Optional(const Optional<T>& o) : value(o.value ? new T(*o.value) : 0) {} + T& get() + { + if (!value) value.reset(new T); + return *value; + } + const T& operator*() const + { + return *value; + } + Optional<T>& operator=(const Optional<T>& o) + { + if (o.value) { + if (!value) value.reset(new T(*o.value)); + } + return *this; + } + operator bool() const + { + return value; + } + }; + + + boost::intrusive_ptr<SharedState> sharedState; boost::intrusive_ptr<PersistableMessage> persistentContext; int deliveryCount; bool alreadyAcquired; - const Connection* publisher; - qpid::sys::AbsTime expiration; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - uint64_t timestamp; - qpid::types::Variant::Map annotations; - bool isManagementMessage; + Optional<qpid::types::Variant::Map> annotations; MessageState state; qpid::framing::SequenceNumber sequence; framing::SequenceNumber replicationId; void annotationsChanged(); + bool getTtl(uint64_t&, uint64_t expiredValue) const; }; }} |