diff options
Diffstat (limited to 'cpp/lib/broker/MessageStore.h')
-rw-r--r-- | cpp/lib/broker/MessageStore.h | 203 |
1 files changed, 96 insertions, 107 deletions
diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h index 9e38408886..1d9ee86e48 100644 --- a/cpp/lib/broker/MessageStore.h +++ b/cpp/lib/broker/MessageStore.h @@ -21,119 +21,108 @@ #ifndef _MessageStore_ #define _MessageStore_ -#include <BrokerMessage.h> -#include <FieldTable.h> -#include <RecoveryManager.h> -#include <TransactionalStore.h> +#include "PersistableExchange.h" +#include "PersistableMessage.h" +#include "PersistableQueue.h" +#include "RecoveryManager.h" +#include "TransactionalStore.h" namespace qpid { - namespace broker { - struct MessageStoreSettings - { - /** - * Messages whose content length is larger than this value - * will be staged (i.e. will have thier data written to - * disk as it arrives) and will load their data lazily. On - * recovery therefore, only the headers should be loaded. - */ - uint64_t stagingThreshold; - }; - /** - * An abstraction of the persistent storage for messages. (In - * all methods, any pointers/references to queues or messages - * are valid only for the duration of the call). - */ - class MessageStore : public TransactionalStore{ - public: - /** - * Record the existance of a durable queue - */ - virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0; - /** - * Destroy a durable queue - */ - virtual void destroy(const Queue& queue) = 0; +namespace broker { - /** - * Request recovery of queue and message state from store - */ - virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; - - /** - * Stores a messages before it has been enqueued - * (enqueueing automatically stores the message so this is - * only required if storage is required prior to that - * point). If the message has not yet been stored it will - * store the headers as well as any content passed in. A - * persistence id will be set on the message which can be - * used to load the content or to append to it. - */ - virtual void stage(Message* const msg) = 0; +/** + * An abstraction of the persistent storage for messages. (In + * all methods, any pointers/references to queues or messages + * are valid only for the duration of the call). + */ +class MessageStore : public TransactionalStore{ +public: + /** + * Record the existence of a durable queue + */ + virtual void create(const PersistableQueue& queue) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const PersistableQueue& queue) = 0; + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange) = 0; + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(RecoveryManager& queues) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(PersistableMessage& msg) = 0; - /** - * Destroys a previously staged message. This only needs - * to be called if the message is never enqueued. (Once - * enqueued, deletion will be automatic when the message - * is dequeued from all queues it was enqueued onto). - */ - virtual void destroy(Message* const msg) = 0; - - /** - * Appends content to a previously staged message - */ - virtual void appendContent(Message* const msg, const std::string& data) = 0; + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg) = 0; - /** - * Loads (a section) of content data for the specified - * message (previously stored through a call to stage or - * enqueue) into data. The offset refers to the content - * only (i.e. an offset of 0 implies that the start of the - * content should be loaded, not the headers or related - * meta-data). - */ - virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0; + /** + * Appends content to a previously staged message + */ + virtual void appendContent(PersistableMessage& msg, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param msg the message to dequeue + * @param queue the name of th queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + + virtual ~MessageStore(){} +}; - /** - * Enqueues a message, storing the message if it has not - * been previously stored and recording that the given - * message is on the given queue. - * - * @param msg the message to enqueue - * @param queue the name of the queue onto which it is to be enqueued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - /** - * Dequeues a message, recording that the given message is - * no longer on the given queue and deleting the message - * if it is no longer on any other queue. - * - * @param msg the message to dequeue - * @param queue the name of th queue from which it is to be dequeued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - - /** - * Treat all enqueue/dequeues where this xid was specified as being prepared. - */ - virtual void prepared(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being committed. - */ - virtual void committed(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being aborted. - */ - virtual void aborted(const std::string * const xid) = 0; - - virtual ~MessageStore(){} - }; - } +} } |