summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-30 14:48:24 +0000
committerGordon Sim <gsim@apache.org>2006-11-30 14:48:24 +0000
commit505b253590d2cf8ebfe35ccd2ff3df2443240001 (patch)
tree08896f180af647019b83d8157f578a276a9e9136
parent4d7f04636758d39a3ef7250cbcc745edb3de7685 (diff)
downloadqpid-python-505b253590d2cf8ebfe35ccd2ff3df2443240001.tar.gz
Some further tweaks to MessageStore interface.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480946 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp10
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h4
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--cpp/src/qpid/broker/MessageStore.h26
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp12
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h6
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp6
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h6
-rw-r--r--cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp4
10 files changed, 48 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index eb7536dde3..895df46abe 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -23,12 +23,12 @@
using namespace qpid::broker;
using namespace qpid::framing;
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) :
- store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, u_int64_t _expectedSize) :
+ store(_store), msg(_msg), expectedSize(_expectedSize) {}
void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
{
- store->appendContent(msgId, data->getData());
+ store->appendContent(msg, data->getData());
}
u_int32_t LazyLoadedContent::size()
@@ -42,12 +42,12 @@ void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesiz
for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
u_int64_t remaining = expectedSize - offset;
string data;
- store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining);
+ store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining);
out->send(new AMQFrame(channel, new AMQContentBody(data)));
}
} else {
string data;
- store->loadContent(msgId, data, 0, expectedSize);
+ store->loadContent(msg, data, 0, expectedSize);
out->send(new AMQFrame(channel, new AMQContentBody(data)));
}
}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
index 5a406e3131..4ed639df1a 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ b/cpp/src/qpid/broker/LazyLoadedContent.h
@@ -28,10 +28,10 @@ namespace qpid {
namespace broker {
class LazyLoadedContent : public Content{
MessageStore* const store;
- const u_int64_t msgId;
+ Message* const msg;
const u_int64_t expectedSize;
public:
- LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize);
+ LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 64e66c4a30..6478383cb2 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -201,7 +201,11 @@ void Message::releaseContent(MessageStore* store)
{
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored content):
- content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize()));
+
+ //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
+ // then set as a member of that message so its lifetime is guaranteed to be no longer than
+ // that of the message itself
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
}
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 1a58523c08..56b4b3b4d8 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -54,8 +54,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
message->setHeader(header);
if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
store->stage(message);
- auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize()));
- message->setContent(content);
+ message->releaseContent(store);
} else {
auto_ptr<Content> content(new InMemoryContent());
message->setContent(content);
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 1c5a16c50d..3de7a70a70 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -27,6 +27,16 @@
namespace qpid {
namespace broker {
+ struct MessageStoreSettings
+ {
+ /**
+ * Messages whose content length is larger than this value
+ * will be staged (i.e. will have thier data written to
+ * disk as it arrives) and will load their data lazily. On
+ * recovery therefore, only the headers should be loaded.
+ */
+ u_int64_t stagingThreshold;
+ };
/**
* An abstraction of the persistent storage for messages.
*/
@@ -44,7 +54,7 @@ namespace qpid {
/**
* Request recovery of queue and message state from store
*/
- virtual void recover(RecoveryManager& queues) = 0;
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
/**
* Stores a messages before it has been enqueued
@@ -68,17 +78,17 @@ namespace qpid {
/**
* Appends content to a previously staged message
*/
- virtual void appendContent(u_int64_t msgId, const std::string& data) = 0;
+ virtual void appendContent(Message* const msg, const std::string& data) = 0;
/**
* Loads (a section) of content data for the specified
- * message id (previously set on the message through a
- * call to stage or enqueue) into data. The offset refers
- * to the content only (i.e. an offset of 0 implies that
- * the start of the content should be loaded, not the
- * headers or related meta-data).
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
*/
- virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0;
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0;
/**
* Enqueues a message, storing the message if it has not
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 168cb3d5bb..c0f33cb44c 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -38,9 +38,9 @@ void MessageStoreModule::destroy(const Queue& queue)
store->destroy(queue);
}
-void MessageStoreModule::recover(RecoveryManager& registry)
+void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
{
- store->recover(registry);
+ store->recover(registry, settings);
}
void MessageStoreModule::stage(Message::shared_ptr& msg)
@@ -53,14 +53,14 @@ void MessageStoreModule::destroy(Message::shared_ptr& msg)
store->destroy(msg);
}
-void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data)
+void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
{
- store->appendContent(msgId, data);
+ store->appendContent(msg, data);
}
-void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length)
+void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length)
{
- store->loadContent(msgId, data, offset, length);
+ store->loadContent(msg, data, offset, length);
}
void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 306e1aa3ea..fcd493b384 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -38,11 +38,11 @@ namespace qpid {
MessageStoreModule(const std::string& name);
void create(const Queue& queue);
void destroy(const Queue& queue);
- void recover(RecoveryManager& queues);
+ void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
void stage(Message::shared_ptr& msg);
void destroy(Message::shared_ptr& msg);
- void appendContent(u_int64_t msgId, const std::string& data);
- void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
+ void appendContent(Message* const msg, const std::string& data);
+ void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 5a2837509d..75e044ee04 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -40,7 +40,7 @@ void NullMessageStore::destroy(const Queue& queue)
if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::recover(RecoveryManager&)
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
{
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
@@ -55,12 +55,12 @@ void NullMessageStore::destroy(Message::shared_ptr&)
if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::appendContent(u_int64_t, const string&)
+void NullMessageStore::appendContent(Message* const, const string&)
{
if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
}
-void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t)
{
if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index c13a6c9f72..bd8c674e19 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -37,11 +37,11 @@ namespace qpid {
NullMessageStore(bool warn = true);
virtual void create(const Queue& queue);
virtual void destroy(const Queue& queue);
- virtual void recover(RecoveryManager& queues);
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
virtual void stage(Message::shared_ptr& msg);
virtual void destroy(Message::shared_ptr& msg);
- virtual void appendContent(u_int64_t msgId, const std::string& data);
- virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
+ virtual void appendContent(Message* const msg, const std::string& data);
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
virtual void committed(const string * const xid);
diff --git a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
index 4d267887ba..ecb907400d 100644
--- a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
+++ b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
@@ -55,7 +55,7 @@ class LazyLoadedContentTest : public CppUnit::TestCase
public:
TestMessageStore(const string& _content) : content(_content) {}
- void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t length)
+ void loadContent(Message* const, string& data, u_int64_t offset, u_int32_t length)
{
if (offset + length <= content.size()) {
data = content.substr(offset, length);
@@ -96,7 +96,7 @@ public:
void load(string& in, size_t outCount, string* out, u_int32_t framesize)
{
TestMessageStore store(in);
- LazyLoadedContent content(&store, 1, in.size());
+ LazyLoadedContent content(&store, 0, in.size());
DummyHandler handler;
u_int16_t channel = 3;
content.send(&handler, channel, framesize);