diff options
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 6 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp | 4 |
10 files changed, 48 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp index eb7536dde3..895df46abe 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -23,12 +23,12 @@ using namespace qpid::broker; using namespace qpid::framing; -LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) : - store(_store), msgId(_msgId), expectedSize(_expectedSize) {} +LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, u_int64_t _expectedSize) : + store(_store), msg(_msg), expectedSize(_expectedSize) {} void LazyLoadedContent::add(AMQContentBody::shared_ptr data) { - store->appendContent(msgId, data->getData()); + store->appendContent(msg, data->getData()); } u_int32_t LazyLoadedContent::size() @@ -42,12 +42,12 @@ void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesiz for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { u_int64_t remaining = expectedSize - offset; string data; - store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining); + store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining); out->send(new AMQFrame(channel, new AMQContentBody(data))); } } else { string data; - store->loadContent(msgId, data, 0, expectedSize); + store->loadContent(msg, data, 0, expectedSize); out->send(new AMQFrame(channel, new AMQContentBody(data))); } } diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h index 5a406e3131..4ed639df1a 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.h +++ b/cpp/src/qpid/broker/LazyLoadedContent.h @@ -28,10 +28,10 @@ namespace qpid { namespace broker { class LazyLoadedContent : public Content{ MessageStore* const store; - const u_int64_t msgId; + Message* const msg; const u_int64_t expectedSize; public: - LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize); + LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize); void add(qpid::framing::AMQContentBody::shared_ptr data); u_int32_t size(); void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 64e66c4a30..6478383cb2 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -201,7 +201,11 @@ void Message::releaseContent(MessageStore* store) { if (!content.get() || content->size() > 0) { //set content to lazy loading mode (but only if there is stored content): - content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize())); + + //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is + // then set as a member of that message so its lifetime is guaranteed to be no longer than + // that of the message itself + content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize())); } } diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 1a58523c08..56b4b3b4d8 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -54,8 +54,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ message->setHeader(header); if (stagingThreshold && header->getContentSize() >= stagingThreshold) { store->stage(message); - auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize())); - message->setContent(content); + message->releaseContent(store); } else { auto_ptr<Content> content(new InMemoryContent()); message->setContent(content); diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 1c5a16c50d..3de7a70a70 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -27,6 +27,16 @@ namespace qpid { namespace broker { + struct MessageStoreSettings + { + /** + * Messages whose content length is larger than this value + * will be staged (i.e. will have thier data written to + * disk as it arrives) and will load their data lazily. On + * recovery therefore, only the headers should be loaded. + */ + u_int64_t stagingThreshold; + }; /** * An abstraction of the persistent storage for messages. */ @@ -44,7 +54,7 @@ namespace qpid { /** * Request recovery of queue and message state from store */ - virtual void recover(RecoveryManager& queues) = 0; + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; /** * Stores a messages before it has been enqueued @@ -68,17 +78,17 @@ namespace qpid { /** * Appends content to a previously staged message */ - virtual void appendContent(u_int64_t msgId, const std::string& data) = 0; + virtual void appendContent(Message* const msg, const std::string& data) = 0; /** * Loads (a section) of content data for the specified - * message id (previously set on the message through a - * call to stage or enqueue) into data. The offset refers - * to the content only (i.e. an offset of 0 implies that - * the start of the content should be loaded, not the - * headers or related meta-data). + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). */ - virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0; + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0; /** * Enqueues a message, storing the message if it has not diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 168cb3d5bb..c0f33cb44c 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -38,9 +38,9 @@ void MessageStoreModule::destroy(const Queue& queue) store->destroy(queue); } -void MessageStoreModule::recover(RecoveryManager& registry) +void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings) { - store->recover(registry); + store->recover(registry, settings); } void MessageStoreModule::stage(Message::shared_ptr& msg) @@ -53,14 +53,14 @@ void MessageStoreModule::destroy(Message::shared_ptr& msg) store->destroy(msg); } -void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data) +void MessageStoreModule::appendContent(Message* const msg, const std::string& data) { - store->appendContent(msgId, data); + store->appendContent(msg, data); } -void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length) +void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length) { - store->loadContent(msgId, data, offset, length); + store->loadContent(msg, data, offset, length); } void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 306e1aa3ea..fcd493b384 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -38,11 +38,11 @@ namespace qpid { MessageStoreModule(const std::string& name); void create(const Queue& queue); void destroy(const Queue& queue); - void recover(RecoveryManager& queues); + void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); void stage(Message::shared_ptr& msg); void destroy(Message::shared_ptr& msg); - void appendContent(u_int64_t msgId, const std::string& data); - void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); + void appendContent(Message* const msg, const std::string& data); + void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void committed(const string * const xid); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 5a2837509d..75e044ee04 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -40,7 +40,7 @@ void NullMessageStore::destroy(const Queue& queue) if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::recover(RecoveryManager&) +void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const) { if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } @@ -55,12 +55,12 @@ void NullMessageStore::destroy(Message::shared_ptr&) if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; } -void NullMessageStore::appendContent(u_int64_t, const string&) +void NullMessageStore::appendContent(Message* const, const string&) { if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; } -void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t) +void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t) { if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; } diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index c13a6c9f72..bd8c674e19 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -37,11 +37,11 @@ namespace qpid { NullMessageStore(bool warn = true); virtual void create(const Queue& queue); virtual void destroy(const Queue& queue); - virtual void recover(RecoveryManager& queues); + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); virtual void stage(Message::shared_ptr& msg); virtual void destroy(Message::shared_ptr& msg); - virtual void appendContent(u_int64_t msgId, const std::string& data); - virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); + virtual void appendContent(Message* const msg, const std::string& data); + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); virtual void committed(const string * const xid); diff --git a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp index 4d267887ba..ecb907400d 100644 --- a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp +++ b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp @@ -55,7 +55,7 @@ class LazyLoadedContentTest : public CppUnit::TestCase public: TestMessageStore(const string& _content) : content(_content) {} - void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t length) + void loadContent(Message* const, string& data, u_int64_t offset, u_int32_t length) { if (offset + length <= content.size()) { data = content.substr(offset, length); @@ -96,7 +96,7 @@ public: void load(string& in, size_t outCount, string* out, u_int32_t framesize) { TestMessageStore store(in); - LazyLoadedContent content(&store, 1, in.size()); + LazyLoadedContent content(&store, 0, in.size()); DummyHandler handler; u_int16_t channel = 3; content.send(&handler, channel, framesize); |