summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.h')
-rw-r--r--cpp/src/qpid/broker/Message.h97
1 files changed, 80 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index ecd84901f9..f1829a17bc 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -35,6 +35,7 @@
#include <string>
#include <vector>
#include <boost/intrusive_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
namespace qpid {
namespace amqp {
@@ -80,7 +81,46 @@ public:
virtual uint64_t getTimestamp() const = 0;
};
- QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
+ class SharedState : public Encoding
+ {
+ public:
+ virtual ~SharedState() {}
+ virtual const Connection* getPublisher() const = 0;
+ virtual void setPublisher(const Connection* p) = 0;
+
+ virtual void setExpiration(sys::AbsTime e) = 0;
+ virtual sys::AbsTime getExpiration() const = 0;
+ virtual sys::Duration getTimeToExpiration() const = 0;
+ virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0;
+ virtual bool hasExpired(const Message& m) const = 0;
+ virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0;
+
+ virtual bool getIsManagementMessage() const = 0;
+ virtual void setIsManagementMessage(bool b) = 0;
+ };
+
+ class SharedStateImpl : public SharedState
+ {
+ const Connection* publisher;
+ qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ bool isManagementMessage;
+ public:
+ SharedStateImpl();
+ virtual ~SharedStateImpl() {}
+ QPID_BROKER_EXTERN const Connection* getPublisher() const;
+ QPID_BROKER_EXTERN void setPublisher(const Connection* p);
+ QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
+ QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
+ QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const;
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN bool hasExpired(const Message& m) const;
+ QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN bool getIsManagementMessage() const;
+ QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
+ };
+
+ QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>);
QPID_BROKER_EXTERN Message();
QPID_BROKER_EXTERN ~Message();
@@ -91,20 +131,14 @@ public:
int getDeliveryCount() const { return deliveryCount; }
void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; }
- QPID_BROKER_EXTERN void setPublisher(const Connection& p);
const Connection* getPublisher() const;
bool isLocalTo(const OwnershipToken*) const;
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
- /** determine msg expiration time using the TTL value if present */
- QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
- void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
-
bool hasExpired() const;
- sys::AbsTime getExpiration() const { return expiration; }
- void setExpiration(sys::AbsTime exp) { expiration = exp; }
+ QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
uint64_t getTtl() const;
QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
@@ -121,12 +155,11 @@ public:
QPID_BROKER_EXTERN uint64_t getMessageSize() const;
- QPID_BROKER_EXTERN Encoding& getEncoding();
QPID_BROKER_EXTERN const Encoding& getEncoding() const;
QPID_BROKER_EXTERN operator bool() const;
+ QPID_BROKER_EXTERN SharedState& getSharedState();
bool getIsManagementMessage() const;
- void setIsManagementMessage(bool b);
QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const;
QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&);
@@ -146,21 +179,51 @@ public:
QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
private:
- boost::intrusive_ptr<Encoding> encoding;
+ /**
+ * Template for optional members that are only constructed when
+ * if/when needed, to conserve memory. (Boost::optional doesn't
+ * help here).
+ */
+ template <typename T> class Optional
+ {
+ boost::scoped_ptr<T> value;
+ public:
+ Optional() : value(0) {}
+ Optional(const Optional<T>& o) : value(o.value ? new T(*o.value) : 0) {}
+ T& get()
+ {
+ if (!value) value.reset(new T);
+ return *value;
+ }
+ const T& operator*() const
+ {
+ return *value;
+ }
+ Optional<T>& operator=(const Optional<T>& o)
+ {
+ if (o.value) {
+ if (!value) value.reset(new T(*o.value));
+ }
+ return *this;
+ }
+ operator bool() const
+ {
+ return value;
+ }
+ };
+
+
+ boost::intrusive_ptr<SharedState> sharedState;
boost::intrusive_ptr<PersistableMessage> persistentContext;
int deliveryCount;
bool alreadyAcquired;
- const Connection* publisher;
- qpid::sys::AbsTime expiration;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- uint64_t timestamp;
- qpid::types::Variant::Map annotations;
- bool isManagementMessage;
+ Optional<qpid::types::Variant::Map> annotations;
MessageState state;
qpid::framing::SequenceNumber sequence;
framing::SequenceNumber replicationId;
void annotationsChanged();
+ bool getTtl(uint64_t&, uint64_t expiredValue) const;
};
}}