summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp7
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp4
-rw-r--r--cpp/lib/broker/Content.h1
-rw-r--r--cpp/lib/broker/InMemoryContent.cpp4
-rw-r--r--cpp/lib/broker/InMemoryContent.h1
-rw-r--r--cpp/lib/broker/LazyLoadedContent.cpp5
-rw-r--r--cpp/lib/broker/LazyLoadedContent.h1
-rw-r--r--cpp/lib/broker/MessageBuilder.cpp2
-rw-r--r--cpp/lib/broker/MessageStore.h12
-rw-r--r--cpp/lib/broker/MessageStoreModule.cpp8
-rw-r--r--cpp/lib/broker/MessageStoreModule.h8
-rw-r--r--cpp/lib/broker/NullMessageStore.cpp8
-rw-r--r--cpp/lib/broker/NullMessageStore.h8
13 files changed, 44 insertions, 25 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index d30cd12bc3..598de2d590 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -53,7 +53,9 @@ Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
-Message::~Message(){}
+Message::~Message(){
+ if (content.get()) content->destroy();
+}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
this->header = _header;
@@ -205,6 +207,9 @@ u_int64_t Message::expectedContentSize()
void Message::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
+ if (!isPersistent() && persistenceId == 0) {
+ store->stage(this);
+ }
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored content):
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index b0e1f20b01..a8c5343ca3 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -189,14 +189,14 @@ bool Queue::canAutoDelete() const{
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
{
if (msg->isPersistent() && store) {
- store->enqueue(ctxt, msg, *this, xid);
+ store->enqueue(ctxt, msg.get(), *this, xid);
}
}
void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
{
if (msg->isPersistent() && store) {
- store->dequeue(ctxt, msg, *this, xid);
+ store->dequeue(ctxt, msg.get(), *this, xid);
}
}
diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h
index ed425c6735..b5712c35ed 100644
--- a/cpp/lib/broker/Content.h
+++ b/cpp/lib/broker/Content.h
@@ -33,6 +33,7 @@ namespace qpid {
virtual u_int32_t size() = 0;
virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual void destroy() = 0;
virtual ~Content(){}
};
}
diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp
index 9d40877c86..7205eb3de0 100644
--- a/cpp/lib/broker/InMemoryContent.cpp
+++ b/cpp/lib/broker/InMemoryContent.cpp
@@ -67,3 +67,7 @@ void InMemoryContent::encode(Buffer& buffer)
(*i)->encode(buffer);
}
}
+
+void InMemoryContent::destroy()
+{
+}
diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h
index c54d15447d..79c7cf670b 100644
--- a/cpp/lib/broker/InMemoryContent.h
+++ b/cpp/lib/broker/InMemoryContent.h
@@ -36,6 +36,7 @@ namespace qpid {
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
+ void destroy();
~InMemoryContent(){}
};
}
diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp
index c0da48efda..51aa6c590b 100644
--- a/cpp/lib/broker/LazyLoadedContent.cpp
+++ b/cpp/lib/broker/LazyLoadedContent.cpp
@@ -56,3 +56,8 @@ void LazyLoadedContent::encode(Buffer&)
{
//do nothing as all content is written as soon as it is added
}
+
+void LazyLoadedContent::destroy()
+{
+ store->destroy(msg);
+}
diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h
index fdb752f117..68e08c7c3f 100644
--- a/cpp/lib/broker/LazyLoadedContent.h
+++ b/cpp/lib/broker/LazyLoadedContent.h
@@ -36,6 +36,7 @@ namespace qpid {
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
+ void destroy();
~LazyLoadedContent(){}
};
}
diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp
index 7f009d5cdf..41bf812d2d 100644
--- a/cpp/lib/broker/MessageBuilder.cpp
+++ b/cpp/lib/broker/MessageBuilder.cpp
@@ -53,7 +53,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
}
message->setHeader(header);
if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(message);
+ store->stage(message.get());
message->releaseContent(store);
} else {
auto_ptr<Content> content(new InMemoryContent());
diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h
index be9172e383..acbff82c35 100644
--- a/cpp/lib/broker/MessageStore.h
+++ b/cpp/lib/broker/MessageStore.h
@@ -39,7 +39,9 @@ namespace qpid {
u_int64_t stagingThreshold;
};
/**
- * An abstraction of the persistent storage for messages.
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
*/
class MessageStore : public TransactionalStore{
public:
@@ -66,7 +68,7 @@ namespace qpid {
* persistence id will be set on the message which can be
* used to load the content or to append to it.
*/
- virtual void stage(Message::shared_ptr& msg) = 0;
+ virtual void stage(Message* const msg) = 0;
/**
* Destroys a previously staged message. This only needs
@@ -74,7 +76,7 @@ namespace qpid {
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
- virtual void destroy(Message::shared_ptr& msg) = 0;
+ virtual void destroy(Message* const msg) = 0;
/**
* Appends content to a previously staged message
@@ -102,7 +104,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -114,7 +116,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as being committed.
*/
diff --git a/cpp/lib/broker/MessageStoreModule.cpp b/cpp/lib/broker/MessageStoreModule.cpp
index b3f5d6e63c..1952786014 100644
--- a/cpp/lib/broker/MessageStoreModule.cpp
+++ b/cpp/lib/broker/MessageStoreModule.cpp
@@ -43,12 +43,12 @@ void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSe
store->recover(registry, settings);
}
-void MessageStoreModule::stage(Message::shared_ptr& msg)
+void MessageStoreModule::stage(Message* const msg)
{
store->stage(msg);
}
-void MessageStoreModule::destroy(Message::shared_ptr& msg)
+void MessageStoreModule::destroy(Message* const msg)
{
store->destroy(msg);
}
@@ -63,12 +63,12 @@ void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t
store->loadContent(msg, data, offset, length);
}
-void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
{
store->enqueue(ctxt, msg, queue, xid);
}
-void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
{
store->dequeue(ctxt, msg, queue, xid);
}
diff --git a/cpp/lib/broker/MessageStoreModule.h b/cpp/lib/broker/MessageStoreModule.h
index d70aab6d13..fcff71bd35 100644
--- a/cpp/lib/broker/MessageStoreModule.h
+++ b/cpp/lib/broker/MessageStoreModule.h
@@ -39,12 +39,12 @@ namespace qpid {
void create(const Queue& queue, const qpid::framing::FieldTable& settings);
void destroy(const Queue& queue);
void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
- void stage(Message::shared_ptr& msg);
- void destroy(Message::shared_ptr& msg);
+ void stage(Message* const msg);
+ void destroy(Message* const msg);
void appendContent(Message* const msg, const std::string& data);
void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
void aborted(const string * const xid);
std::auto_ptr<TransactionContext> begin();
diff --git a/cpp/lib/broker/NullMessageStore.cpp b/cpp/lib/broker/NullMessageStore.cpp
index 3c29994aac..a8318a4bf7 100644
--- a/cpp/lib/broker/NullMessageStore.cpp
+++ b/cpp/lib/broker/NullMessageStore.cpp
@@ -45,12 +45,12 @@ void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* con
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
-void NullMessageStore::stage(Message::shared_ptr&)
+void NullMessageStore::stage(Message* const)
{
if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::destroy(Message::shared_ptr&)
+void NullMessageStore::destroy(Message* const)
{
if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
}
@@ -65,12 +65,12 @@ void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t
if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
}
-void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h
index 61afe36281..913d6b0a8d 100644
--- a/cpp/lib/broker/NullMessageStore.h
+++ b/cpp/lib/broker/NullMessageStore.h
@@ -38,12 +38,12 @@ namespace qpid {
virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
- virtual void stage(Message::shared_ptr& msg);
- virtual void destroy(Message::shared_ptr& msg);
+ virtual void stage(Message* const msg);
+ virtual void destroy(Message* const msg);
virtual void appendContent(Message* const msg, const std::string& data);
virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
- virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
virtual void committed(const string * const xid);
virtual void aborted(const string * const xid);
virtual std::auto_ptr<TransactionContext> begin();