diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.h')
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 102 |
1 files changed, 75 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 0a95fedea6..375fa9ce26 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -22,33 +22,38 @@ * */ -#include <string> -#include <vector> -#include <boost/shared_ptr.hpp> -#include <boost/variant.hpp> -#include "PersistableMessage.h" -#include "MessageAdapter.h" +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/broker/PersistableMessage.h" +#include "qpid/broker/MessageAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <string> +#include <vector> namespace qpid { - + namespace framing { class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; class ExchangeRegistry; class MessageStore; class Queue; +class ExpiryPolicy; class Message : public PersistableMessage { public: - Message(const framing::SequenceNumber& id = framing::SequenceNumber()); - ~Message(); + typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; + + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); + QPID_BROKER_EXTERN ~Message(); uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -61,16 +66,22 @@ public: const framing::SequenceNumber& getCommandId() { return frames.getId(); } - uint64_t contentSize() const; + QPID_BROKER_EXTERN uint64_t contentSize() const; - std::string getRoutingKey() const; + QPID_BROKER_EXTERN std::string getRoutingKey() const; const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const; - std::string getExchangeName() const; + QPID_BROKER_EXTERN std::string getExchangeName() const; bool isImmediate() const; - const framing::FieldTable* getApplicationHeaders() const; - bool isPersistent(); + QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; + framing::FieldTable& getOrInsertHeaders(); + QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); + QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + bool hasExpired(); + sys::AbsTime getExpiration() const { return expiration; } + framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -84,15 +95,24 @@ public: return p->get<T>(true); } + template <class T> const T* hasProperties() const { + const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + return p->get<T>(); + } + template <class T> const T* getMethod() const { return frames.as<T>(); } + template <class T> T* getMethod() { + return frames.as<T>(); + } + template <class T> bool isA() const { return frames.isA<T>(); } - uint32_t getRequiredCredit() const; + uint32_t getRequiredCredit(); void encode(framing::Buffer& buffer) const; void encodeContent(framing::Buffer& buffer) const; @@ -110,26 +130,44 @@ public: uint32_t encodedHeaderSize() const; uint32_t encodedContentSize() const; - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer); + QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); + QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); + void QPID_BROKER_EXTERN tryReleaseContent(); + void releaseContent(); + void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead void destroy(); - void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; + bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; + QPID_BROKER_EXTERN void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const; - bool isContentLoaded() const; + QPID_BROKER_EXTERN bool isContentLoaded() const; bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); + + void forcePersistent(); + bool isForcedPersistent(); + + boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; + void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); + + /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ + void setEnqueueCompleteCallback(MessageCallback& cb); + void resetEnqueueCompleteCallback(); + + /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ + void setDequeueCompleteCallback(MessageCallback& cb); + void resetDequeueCompleteCallback(); private: + typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; + + MessageAdapter& getAdapter() const; + void allEnqueuesComplete(); + void allDequeuesComplete(); + mutable sys::Mutex lock; framing::FrameSet frames; mutable boost::shared_ptr<Exchange> exchange; @@ -137,12 +175,22 @@ public: bool redelivered; bool loaded; bool staged; + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; + qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; static TransferAdapter TRANSFER; - MessageAdapter& getAdapter() const; + mutable Replacement replacement; + mutable boost::intrusive_ptr<Message> empty; + + sys::Mutex callbackLock; + MessageCallback* enqueueCallback; + MessageCallback* dequeueCallback; + + uint32_t requiredCredit; }; }} |