summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/PersistableMessage.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/PersistableMessage.h')
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h84
1 files changed, 22 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index d29c2c45b4..eb6b444e4a 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -24,29 +24,30 @@
#include <string>
#include <list>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <map>
+#include <boost/intrusive_ptr.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Persistable.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/amqp_framing.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/AsyncCompletion.h"
namespace qpid {
+namespace types {
+class Variant;
+}
namespace broker {
class MessageStore;
+class Queue;
/**
* Base class for persistable messages.
*/
class PersistableMessage : public Persistable
{
- typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Mutex asyncDequeueLock;
- sys::Mutex storeLock;
-
/**
* "Ingress" messages == messages sent _to_ the broker.
* Tracks the number of outstanding asynchronous operations that must
@@ -56,85 +57,44 @@ class PersistableMessage : public Persistable
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- AsyncCompletion ingressCompletion;
-
- /**
- * Tracks the number of outstanding asynchronous dequeue
- * operations. When the message is dequeued asynchronously the
- * count is incremented; when that dequeue completes it is
- * decremented. Thus when it is 0, there are no outstanding
- * dequeues.
- */
- int asyncDequeueCounter;
-
- void dequeueAsync();
-
- syncList synclist;
- struct ContentReleaseState
- {
- bool blocked;
- bool requested;
- bool released;
-
- ContentReleaseState();
- };
- ContentReleaseState contentReleaseState;
-
- protected:
- /** Called when all dequeues are complete for this message. */
- virtual void allDequeuesComplete() = 0;
-
- void setContentReleased();
-
- MessageStore* store;
-
+ boost::intrusive_ptr<AsyncCompletion> ingressCompletion;
+ mutable uint64_t persistenceId;
public:
- typedef boost::shared_ptr<PersistableMessage> shared_ptr;
-
- /**
- * @returns the size of the headers when encoded
- */
- virtual uint32_t encodedHeaderSize() const = 0;
-
virtual ~PersistableMessage();
-
PersistableMessage();
void flush();
-
- QPID_BROKER_EXTERN bool isContentReleased() const;
QPID_BROKER_EXTERN void setStore(MessageStore*);
- void requestContentRelease();
- void blockContentRelease();
- bool checkContentReleasable();
- bool isContentReleaseBlocked();
- bool isContentReleaseRequested();
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
/** track the progress of a message received by the broker - see ingressCompletion above */
- QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
- QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
+ QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); }
+ QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; }
+ QPID_BROKER_INLINE_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { ingressCompletion = i; }
- QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
- QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
+ QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); }
+ QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
QPID_BROKER_EXTERN bool isDequeueComplete();
-
QPID_BROKER_EXTERN void dequeueComplete();
-
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
- bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
-
- void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
+ uint64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+
+ virtual void decodeHeader(framing::Buffer& buffer) = 0;
+ virtual void decodeContent(framing::Buffer& buffer) = 0;
+ virtual uint32_t encodedHeaderSize() const = 0;
+ virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0;
};
}}