summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-11-26 21:48:37 +0000
committerKim van der Riet <kpvdr@apache.org>2007-11-26 21:48:37 +0000
commitd971d79e02498ad5fa72ebb5aaf94fa497cc531b (patch)
tree2b8ca78567ef210039b708fe40acd73ac8acac99
parentb934813ffc9926f4defe0ed6513fda54f10e15c9 (diff)
downloadqpid-python-d971d79e02498ad5fa72ebb5aaf94fa497cc531b.tar.gz
Switched all regular PersistentMessage* and PersistentMessage& to intrusive_ptr<PersistentMessage>, so as to hook into the refcount for a message while it is in the store.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@598440 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStore.h17
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.h15
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.h18
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/tests/MessageBuilderTest.cpp12
-rw-r--r--qpid/cpp/src/tests/TxAckTest.cpp8
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp19
11 files changed, 76 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 6aa0f4c30b..0fa8380a32 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -149,7 +149,8 @@ void Message::releaseContent(MessageStore* _store)
store = _store;
}
if (!getPersistenceId()) {
- store->stage(*this);
+ intrusive_ptr<PersistableMessage> pmsg(this);
+ store->stage(pmsg);
}
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
@@ -162,13 +163,14 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max
//load content from store in chunks of maxContentSize
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
uint64_t expectedSize(frames.getHeaders()->getContentLength());
+ intrusive_ptr<const PersistableMessage> pmsg(this);
for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
{
uint64_t remaining = expectedSize - offset;
AMQFrame frame(in_place<AMQContentBody>());
string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, *this, data, offset,
+ store->loadContent(queue, pmsg, data, offset,
remaining > maxContentSize ? maxContentSize : remaining);
frame.setBof(false);
frame.setEof(true);
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index a56c65333c..376a321d2d 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -49,7 +49,8 @@ void MessageBuilder::handle(AMQFrame& frame)
throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
}
if (staging) {
- store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
+ intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message);
+ store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData());
} else {
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h
index 04dbb22376..432fe30bb3 100644
--- a/qpid/cpp/src/qpid/broker/MessageStore.h
+++ b/qpid/cpp/src/qpid/broker/MessageStore.h
@@ -21,6 +21,7 @@
#ifndef _MessageStore_
#define _MessageStore_
+#include <boost/shared_ptr.hpp>
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
@@ -94,7 +95,7 @@ public:
* for that queue and avoid searching based on id. Set queue = 0 for
* large message staging when the queue is not known.
*/
- virtual void stage( PersistableMessage& msg) = 0;
+ virtual void stage(intrusive_ptr<PersistableMessage>& msg) = 0;
/**
* Destroys a previously staged message. This only needs
@@ -102,12 +103,13 @@ public:
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
- virtual void destroy(PersistableMessage& msg) = 0;
+ virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0;
/**
* Appends content to a previously staged message
*/
- virtual void appendContent(const PersistableMessage& msg, const std::string& data) = 0;
+ virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data) = 0;
/**
* Loads (a section) of content data for the specified
@@ -118,7 +120,8 @@ public:
* meta-data).
*/
virtual void loadContent(const qpid::broker::PersistableQueue& queue,
- const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
+ intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data, uint64_t offset, uint32_t length) = 0;
/**
* Enqueues a message, storing the message if it has not
@@ -134,7 +137,8 @@ public:
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+ virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
/**
* Dequeues a message, recording that the given message is
@@ -150,7 +154,8 @@ public:
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+ virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue) = 0;
/**
* Flushes all async messages to disk for the specified queue
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
index 797ac1f617..a1979e2f43 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -73,33 +73,33 @@ void MessageStoreModule::recover(RecoveryManager& registry)
TRANSFER_EXCEPTION(store->recover(registry));
}
-void MessageStoreModule::stage( PersistableMessage& msg)
+void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
{
TRANSFER_EXCEPTION(store->stage(msg));
}
-void MessageStoreModule::destroy(PersistableMessage& msg)
+void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
{
TRANSFER_EXCEPTION(store->destroy(msg));
}
-void MessageStoreModule::appendContent(const PersistableMessage& msg, const std::string& data)
+void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
TRANSFER_EXCEPTION(store->appendContent(msg, data));
}
void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue,
- const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
+ intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length)
{
TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length));
}
-void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue));
}
-void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
}
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
index 6738f0e539..e7404487b0 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
@@ -55,14 +55,17 @@ public:
void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
void recover(RecoveryManager& queues);
- void stage(PersistableMessage& msg);
- void destroy(PersistableMessage& msg);
- void appendContent(const PersistableMessage& msg, const std::string& data);
+ void stage(intrusive_ptr<PersistableMessage>& msg);
+ void destroy(intrusive_ptr<PersistableMessage>& msg);
+ void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
- const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+ intrusive_ptr<const PersistableMessage>& msg, std::string& data,
+ uint64_t offset, uint32_t length);
- void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
- void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+ void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
void flush(const qpid::broker::PersistableQueue& queue);
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
index eb20ab6936..c0dbd9a315 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -79,34 +79,34 @@ void NullMessageStore::recover(RecoveryManager&)
QPID_LOG(info, "Persistence not enabled, no recovery attempted.");
}
-void NullMessageStore::stage(PersistableMessage&)
+void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&)
{
QPID_LOG(info, "Can't stage message. Persistence not enabled.");
}
-void NullMessageStore::destroy(PersistableMessage&)
+void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&)
{
}
-void NullMessageStore::appendContent(const PersistableMessage&, const string&)
+void NullMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&, const string&)
{
QPID_LOG(info, "Can't append content. Persistence not enabled.");
}
-void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const PersistableMessage&, string&, uint64_t, uint32_t)
+void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, intrusive_ptr<const PersistableMessage>&, string&, uint64_t, uint32_t)
{
QPID_LOG(info, "Can't load content. Persistence not enabled.");
}
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
+void NullMessageStore::enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- msg.enqueueComplete();
+ msg->enqueueComplete();
QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&)
+void NullMessageStore::dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&)
{
- msg.dequeueComplete();
+ msg->dequeueComplete();
}
void NullMessageStore::flush(const qpid::broker::PersistableQueue&)
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h
index caf018655c..6a2e960b0f 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.h
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h
@@ -56,13 +56,17 @@ public:
virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args);
virtual void recover(RecoveryManager& queues);
- virtual void stage(PersistableMessage& msg);
- virtual void destroy(PersistableMessage& msg);
- virtual void appendContent(const PersistableMessage& msg, const std::string& data);
- virtual void loadContent(const qpid::broker::PersistableQueue& queue,
- const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
- virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
- virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ virtual void stage(intrusive_ptr<PersistableMessage>& msg);
+ virtual void destroy(intrusive_ptr<PersistableMessage>& msg);
+ virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ intrusive_ptr<const PersistableMessage>& msg, std::string& data,
+ uint64_t offset, uint32_t length);
+ virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+ virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
virtual void flush(const qpid::broker::PersistableQueue& queue);
~NullMessageStore(){}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 376b9367d0..c43ab8c231 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -431,9 +431,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
- store->enqueue(ctxt, *msg.get(), *this);
+ intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+ store->enqueue(ctxt, pmsg, *this);
return true;
}
+ //msg->enqueueAsync(); // increments intrusive ptr cnt
return false;
}
@@ -442,9 +444,11 @@ bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
- store->dequeue(ctxt, *msg.get(), *this);
+ intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+ store->dequeue(ctxt, pmsg, *this);
return true;
}
+ //msg->dequeueAsync(); // decrements intrusive ptr cnt
return false;
}
diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp
index 023aefc3fa..7335867140 100644
--- a/qpid/cpp/src/tests/MessageBuilderTest.cpp
+++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp
@@ -38,7 +38,7 @@ class MessageBuilderTest : public CppUnit::TestCase
enum Op {STAGE=1, APPEND=2};
uint64_t id;
- PersistableMessage* expectedMsg;
+ intrusive_ptr<PersistableMessage> expectedMsg;
string expectedData;
std::list<Op> ops;
@@ -64,17 +64,17 @@ class MessageBuilderTest : public CppUnit::TestCase
ops.push_back(APPEND);
}
- void stage(PersistableMessage& msg)
+ void stage(intrusive_ptr<PersistableMessage>& msg)
{
checkExpectation(STAGE);
- CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
- msg.setPersistenceId(++id);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, msg);
+ msg->setPersistenceId(++id);
}
- void appendContent(const PersistableMessage& msg, const string& data)
+ void appendContent(intrusive_ptr<const PersistableMessage>& msg, const string& data)
{
checkExpectation(APPEND);
- CPPUNIT_ASSERT_EQUAL((const PersistableMessage*) expectedMsg, &msg);
+ CPPUNIT_ASSERT_EQUAL(static_pointer_cast<const PersistableMessage>(expectedMsg), msg);
CPPUNIT_ASSERT_EQUAL(expectedData, data);
}
diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp
index bcf422e706..1451fb65b6 100644
--- a/qpid/cpp/src/tests/TxAckTest.cpp
+++ b/qpid/cpp/src/tests/TxAckTest.cpp
@@ -39,11 +39,11 @@ class TxAckTest : public CppUnit::TestCase
class TestMessageStore : public NullMessageStore
{
public:
- vector<PersistableMessage*> dequeued;
+ vector<intrusive_ptr<PersistableMessage> > dequeued;
- void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/)
+ void dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& /*queue*/)
{
- dequeued.push_back(&msg);
+ dequeued.push_back(msg);
}
TestMessageStore() : NullMessageStore() {}
@@ -97,7 +97,7 @@ public:
CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
int dequeued[] = {0, 1, 2, 3, 4, 6, 8};
for (int i = 0; i < 7; i++) {
- CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]);
+ CPPUNIT_ASSERT_EQUAL(static_pointer_cast<PersistableMessage>(messages[dequeued[i]]), store.dequeued[i]);
}
}
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
index b969598f1d..c9da9762ec 100644
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ b/qpid/cpp/src/tests/TxPublishTest.cpp
@@ -36,17 +36,17 @@ using namespace qpid::framing;
class TxPublishTest : public CppUnit::TestCase
{
- typedef std::pair<string, PersistableMessage*> msg_queue_pair;
+ typedef std::pair<string, intrusive_ptr<PersistableMessage> > msg_queue_pair;
class TestMessageStore : public NullMessageStore
{
public:
vector<msg_queue_pair> enqueued;
- void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
+ void enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- msg.enqueueComplete();
- enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
+ msg->enqueueComplete();
+ enqueued.push_back(msg_queue_pair(queue.getName(), msg));
}
//dont care about any of the other methods:
@@ -81,16 +81,15 @@ public:
void testPrepare()
{
+ intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
//ensure messages are enqueued in store
op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
- CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[0].second);
CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
- CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second);
- CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg.get())->isEnqueueComplete());
-
-
+ CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[1].second);
+ CPPUNIT_ASSERT_EQUAL( true, ( static_pointer_cast<PersistableMessage>(msg))->isEnqueueComplete());
}
void testCommit()
@@ -101,7 +100,7 @@ public:
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
intrusive_ptr<Message> msg_dequeue = queue1->dequeue().payload;
- CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete());
+ CPPUNIT_ASSERT_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());