diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStore.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessageBuilderTest.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TestMessageStore.h | 8 |
8 files changed, 65 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index 17fd6aefb8..1b9e6fe104 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -43,17 +43,15 @@ namespace broker { * are valid only for the duration of the call). */ class MessageStore : public TransactionalStore, public Recoverable { -public: + public: /** * init the store, call before any other call. If not called, store * is free to pick any defaults * - * @param dir the directory to create logs/db's - * @param async true, enable async, false, enable sync - * @param force true, delete data on mode change, false, error on mode change - */ - virtual bool init(const Options* options) = 0; + * @param options Options object provided by concrete store plug in. + */ + virtual bool init(const Options* options) = 0; /** * Record the existence of a durable queue @@ -105,15 +103,8 @@ public: * store the headers as well as any content passed in. A * persistence id will be set on the message which can be * used to load the content or to append to it. - * - * TODO ::If it is know - * which queue the message is to be staged/ release to in cases - * of flowing tmp messages to disk for memory conservation set - * the queue ptr. This allows the store to optimize the read/writes - * for that queue and avoid searching based on id. Set queue = 0 for - * large message staging when the queue is not known. - */ - virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg) = 0; + */ + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0; /** * Destroys a previously staged message. This only needs @@ -126,7 +117,7 @@ public: /** * Appends content to a previously staged message */ - virtual void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data) = 0; /** @@ -138,7 +129,7 @@ public: * meta-data). */ virtual void loadContent(const qpid::broker::PersistableQueue& queue, - boost::intrusive_ptr<const PersistableMessage>& msg, + const boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length) = 0; /** @@ -155,7 +146,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + virtual void enqueue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) = 0; /** @@ -172,7 +164,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + virtual void dequeue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) = 0; /** @@ -185,7 +178,7 @@ public: */ virtual void flush(const qpid::broker::PersistableQueue& queue)=0; - /** + /** * Returns the number of outstanding AIO's for a given queue * * If 0, than all the enqueue / dequeues have been stored diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 2544d5d533..21c7f6bc02 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -85,7 +85,7 @@ void MessageStoreModule::recover(RecoveryManager& registry) TRANSFER_EXCEPTION(store->recover(registry)); } -void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg) +void MessageStoreModule::stage(const intrusive_ptr<PersistableMessage>& msg) { TRANSFER_EXCEPTION(store->stage(msg)); } @@ -95,23 +95,30 @@ void MessageStoreModule::destroy(PersistableMessage& msg) TRANSFER_EXCEPTION(store->destroy(msg)); } -void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data) +void MessageStoreModule::appendContent(const intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) { TRANSFER_EXCEPTION(store->appendContent(msg, data)); } -void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, - intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length) +void MessageStoreModule::loadContent( + const qpid::broker::PersistableQueue& queue, + const intrusive_ptr<const PersistableMessage>& msg, + string& data, uint64_t offset, uint32_t length) { TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length)); } -void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) +void MessageStoreModule::enqueue(TransactionContext* ctxt, + const intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue)); } -void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) +void MessageStoreModule::dequeue(TransactionContext* ctxt, + const intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue)); } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index f4d05e3e0d..41a76e7a50 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -38,10 +38,10 @@ namespace broker { class MessageStoreModule : public MessageStore { MessageStore* store; -public: + public: MessageStoreModule(MessageStore* store); - bool init(const Options* options); + bool init(const Options* options); std::auto_ptr<TransactionContext> begin(); std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); void prepare(TPCTransactionContext& txn); @@ -60,16 +60,18 @@ public: void create(const PersistableConfig& config); void destroy(const PersistableConfig& config); void recover(RecoveryManager& queues); - void stage(boost::intrusive_ptr<PersistableMessage>& msg); + void stage(const boost::intrusive_ptr<PersistableMessage>& msg); void destroy(PersistableMessage& msg); - void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); + void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); void loadContent(const qpid::broker::PersistableQueue& queue, - boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, - uint64_t offset, uint32_t length); + const boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, + uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + void enqueue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); - void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + void dequeue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); u_int32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index 401c76f5a2..6fb92a2fc5 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -94,7 +94,7 @@ void NullMessageStore::recover(RecoveryManager&) QPID_LOG(info, "Persistence not enabled, no recovery attempted."); } -void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&) +void NullMessageStore::stage(const intrusive_ptr<PersistableMessage>&) { QPID_LOG(info, "Can't stage message. Persistence not enabled."); } @@ -103,23 +103,29 @@ void NullMessageStore::destroy(PersistableMessage&) { } -void NullMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&, const string&) +void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>&, const string&) { QPID_LOG(info, "Can't append content. Persistence not enabled."); } -void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, intrusive_ptr<const PersistableMessage>&, string&, uint64_t, uint32_t) +void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, + const intrusive_ptr<const PersistableMessage>&, + string&, uint64_t, uint32_t) { QPID_LOG(info, "Can't load content. Persistence not enabled."); } -void NullMessageStore::enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) +void NullMessageStore::enqueue(TransactionContext*, + const intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) { msg->enqueueComplete(); QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&) +void NullMessageStore::dequeue(TransactionContext*, + const intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue&) { msg->dequeueComplete(); } diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index f06e749ebb..b537c05b5d 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -38,7 +38,7 @@ class NullMessageStore : public MessageStore std::set<std::string> prepared; const bool warn; uint64_t nextPersistenceId; -public: + public: NullMessageStore(bool warn = false); virtual bool init(const Options* options); @@ -61,16 +61,18 @@ public: virtual void create(const PersistableConfig& config); virtual void destroy(const PersistableConfig& config); virtual void recover(RecoveryManager& queues); - virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg); + virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg); virtual void destroy(PersistableMessage& msg); - virtual void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, + virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); virtual void loadContent(const qpid::broker::PersistableQueue& queue, - boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, + const boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + virtual void enqueue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); - virtual void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, + virtual void dequeue(TransactionContext* ctxt, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); virtual void flush(const qpid::broker::PersistableQueue& queue); diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index d5977665fe..7ed54c0ff0 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -37,8 +37,7 @@ namespace broker { class MessageStore; /** - * The interface messages must expose to the MessageStore in order to - * be persistable. + * Base class for persistable messages. */ class PersistableMessage : public Persistable { @@ -122,7 +121,6 @@ public: PersistableQueue::shared_ptr q(i->lock()); if (q) q->notifyDurableIOComplete(); } - //synclist.clear(); } } } diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp index 55f568d41d..63c3a800de 100644 --- a/qpid/cpp/src/tests/MessageBuilderTest.cpp +++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp @@ -63,14 +63,15 @@ class MockMessageStore : public NullMessageStore ops.push_back(APPEND); } - void stage(intrusive_ptr<PersistableMessage>& msg) + void stage(const intrusive_ptr<PersistableMessage>& msg) { checkExpectation(STAGE); BOOST_CHECK_EQUAL(expectedMsg, msg); msg->setPersistenceId(++id); } - void appendContent(intrusive_ptr<const PersistableMessage>& msg, const string& data) + void appendContent(const intrusive_ptr<const PersistableMessage>& msg, + const string& data) { checkExpectation(APPEND); BOOST_CHECK_EQUAL(static_pointer_cast<const PersistableMessage>(expectedMsg), msg); diff --git a/qpid/cpp/src/tests/TestMessageStore.h b/qpid/cpp/src/tests/TestMessageStore.h index 2ee2a2da01..a6fe716e9c 100644 --- a/qpid/cpp/src/tests/TestMessageStore.h +++ b/qpid/cpp/src/tests/TestMessageStore.h @@ -36,12 +36,16 @@ class TestMessageStore : public NullMessageStore std::vector<boost::intrusive_ptr<PersistableMessage> > dequeued; std::vector<msg_queue_pair> enqueued; - void dequeue(TransactionContext*, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& /*queue*/) + void dequeue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& /*queue*/) { dequeued.push_back(msg); } - void enqueue(TransactionContext*, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) + void enqueue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) { msg->enqueueComplete(); enqueued.push_back(msg_queue_pair(queue.getName(), msg)); |