summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-12-14 16:25:47 +0000
committerGordon Sim <gsim@apache.org>2007-12-14 16:25:47 +0000
commitbb1b322d0e69384789e6a21c5d8da49311a1a628 (patch)
tree1d80f8d128e87731309ca69e7bab463c3e61ad5e /cpp
parent968971e63c9c21f24e2a7003addfbd574ada12f0 (diff)
downloadqpid-python-bb1b322d0e69384789e6a21c5d8da49311a1a628.tar.gz
Some fixes for 'flow to disk' (i.e. dropping message content from memory, and loading it from disk for delivery)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@604215 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp15
-rw-r--r--cpp/src/qpid/broker/Message.h4
-rw-r--r--cpp/src/qpid/broker/MessageStore.h2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h2
7 files changed, 22 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e01a553207..1730c01099 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -28,6 +28,7 @@
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -35,7 +36,18 @@ using std::string;
TransferAdapter Message::TRANSFER;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+
+Message::~Message()
+{
+ if (staged) {
+ if (store) {
+ store->destroy(*this);
+ } else {
+ QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+ }
+ }
+}
std::string Message::getRoutingKey() const
{
@@ -152,6 +164,7 @@ void Message::releaseContent(MessageStore* _store)
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
+ staged = true;
}
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 1581e4dd3c..8a30c5770b 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -48,7 +48,8 @@ public:
typedef boost::intrusive_ptr<Message> shared_ptr;
Message(const framing::SequenceNumber& id = framing::SequenceNumber());
-
+ ~Message();
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -127,6 +128,7 @@ public:
mutable uint64_t persistenceId;
bool redelivered;
bool loaded;
+ bool staged;
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 73ece93c72..eaf9f1688f 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -104,7 +104,7 @@ public:
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
- virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0;
+ virtual void destroy(PersistableMessage& msg) = 0;
/**
* Appends content to a previously staged message
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 4850cfb921..175055215c 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -78,7 +78,7 @@ void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
TRANSFER_EXCEPTION(store->stage(msg));
}
-void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
+void MessageStoreModule::destroy(PersistableMessage& msg)
{
TRANSFER_EXCEPTION(store->destroy(msg));
}
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index ce8e746193..d8f6ab7299 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -56,7 +56,7 @@ public:
const std::string& key, const framing::FieldTable& args);
void recover(RecoveryManager& queues);
void stage(intrusive_ptr<PersistableMessage>& msg);
- void destroy(intrusive_ptr<PersistableMessage>& msg);
+ void destroy(PersistableMessage& msg);
void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
intrusive_ptr<const PersistableMessage>& msg, std::string& data,
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 5890be8d1a..5dbd3379fa 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -84,7 +84,7 @@ void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&)
QPID_LOG(info, "Can't stage message. Persistence not enabled.");
}
-void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&)
+void NullMessageStore::destroy(PersistableMessage&)
{
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index b83e4c44c7..9c5631b75d 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -57,7 +57,7 @@ public:
const std::string& key, const framing::FieldTable& args);
virtual void recover(RecoveryManager& queues);
virtual void stage(intrusive_ptr<PersistableMessage>& msg);
- virtual void destroy(intrusive_ptr<PersistableMessage>& msg);
+ virtual void destroy(PersistableMessage& msg);
virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
const std::string& data);
virtual void loadContent(const qpid::broker::PersistableQueue& queue,