diff options
author | Kim van der Riet <kpvdr@apache.org> | 2007-11-26 21:48:37 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2007-11-26 21:48:37 +0000 |
commit | d971d79e02498ad5fa72ebb5aaf94fa497cc531b (patch) | |
tree | 2b8ca78567ef210039b708fe40acd73ac8acac99 | |
parent | b934813ffc9926f4defe0ed6513fda54f10e15c9 (diff) | |
download | qpid-python-d971d79e02498ad5fa72ebb5aaf94fa497cc531b.tar.gz |
Switched all regular PersistentMessage* and PersistentMessage& to intrusive_ptr<PersistentMessage>, so as to hook into the refcount for a message while it is in the store.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@598440 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStore.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessageBuilderTest.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxAckTest.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxPublishTest.cpp | 19 |
11 files changed, 76 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 6aa0f4c30b..0fa8380a32 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -149,7 +149,8 @@ void Message::releaseContent(MessageStore* _store) store = _store; } if (!getPersistenceId()) { - store->stage(*this); + intrusive_ptr<PersistableMessage> pmsg(this); + store->stage(pmsg); } //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); @@ -162,13 +163,14 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); uint64_t expectedSize(frames.getHeaders()->getContentLength()); + intrusive_ptr<const PersistableMessage> pmsg(this); for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) { uint64_t remaining = expectedSize - offset; AMQFrame frame(in_place<AMQContentBody>()); string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, *this, data, offset, + store->loadContent(queue, pmsg, data, offset, remaining > maxContentSize ? maxContentSize : remaining); frame.setBof(false); frame.setEof(true); diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index a56c65333c..376a321d2d 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -49,7 +49,8 @@ void MessageBuilder::handle(AMQFrame& frame) throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } if (staging) { - store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); + intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message); + store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData()); } else { message->getFrames().append(frame); //have we reached the staging limit? if so stage message and release content diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index 04dbb22376..432fe30bb3 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -21,6 +21,7 @@ #ifndef _MessageStore_ #define _MessageStore_ +#include <boost/shared_ptr.hpp> #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" @@ -94,7 +95,7 @@ public: * for that queue and avoid searching based on id. Set queue = 0 for * large message staging when the queue is not known. */ - virtual void stage( PersistableMessage& msg) = 0; + virtual void stage(intrusive_ptr<PersistableMessage>& msg) = 0; /** * Destroys a previously staged message. This only needs @@ -102,12 +103,13 @@ public: * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ - virtual void destroy(PersistableMessage& msg) = 0; + virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0; /** * Appends content to a previously staged message */ - virtual void appendContent(const PersistableMessage& msg, const std::string& data) = 0; + virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) = 0; /** * Loads (a section) of content data for the specified @@ -118,7 +120,8 @@ public: * meta-data). */ virtual void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; + intrusive_ptr<const PersistableMessage>& msg, + std::string& data, uint64_t offset, uint32_t length) = 0; /** * Enqueues a message, storing the message if it has not @@ -134,7 +137,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; /** * Dequeues a message, recording that the given message is @@ -150,7 +154,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; /** * Flushes all async messages to disk for the specified queue diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 797ac1f617..a1979e2f43 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -73,33 +73,33 @@ void MessageStoreModule::recover(RecoveryManager& registry) TRANSFER_EXCEPTION(store->recover(registry)); } -void MessageStoreModule::stage( PersistableMessage& msg) +void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg) { TRANSFER_EXCEPTION(store->stage(msg)); } -void MessageStoreModule::destroy(PersistableMessage& msg) +void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg) { TRANSFER_EXCEPTION(store->destroy(msg)); } -void MessageStoreModule::appendContent(const PersistableMessage& msg, const std::string& data) +void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data) { TRANSFER_EXCEPTION(store->appendContent(msg, data)); } void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length) + intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length) { TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length)); } -void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) +void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue)); } -void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) +void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue)); } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 6738f0e539..e7404487b0 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -55,14 +55,17 @@ public: void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); void recover(RecoveryManager& queues); - void stage(PersistableMessage& msg); - void destroy(PersistableMessage& msg); - void appendContent(const PersistableMessage& msg, const std::string& data); + void stage(intrusive_ptr<PersistableMessage>& msg); + void destroy(intrusive_ptr<PersistableMessage>& msg); + void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data); void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + intrusive_ptr<const PersistableMessage>& msg, std::string& data, + uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); - void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); u_int32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index eb20ab6936..c0dbd9a315 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -79,34 +79,34 @@ void NullMessageStore::recover(RecoveryManager&) QPID_LOG(info, "Persistence not enabled, no recovery attempted."); } -void NullMessageStore::stage(PersistableMessage&) +void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&) { QPID_LOG(info, "Can't stage message. Persistence not enabled."); } -void NullMessageStore::destroy(PersistableMessage&) +void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&) { } -void NullMessageStore::appendContent(const PersistableMessage&, const string&) +void NullMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&, const string&) { QPID_LOG(info, "Can't append content. Persistence not enabled."); } -void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const PersistableMessage&, string&, uint64_t, uint32_t) +void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, intrusive_ptr<const PersistableMessage>&, string&, uint64_t, uint32_t) { QPID_LOG(info, "Can't load content. Persistence not enabled."); } -void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) +void NullMessageStore::enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { - msg.enqueueComplete(); + msg->enqueueComplete(); QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&) +void NullMessageStore::dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&) { - msg.dequeueComplete(); + msg->dequeueComplete(); } void NullMessageStore::flush(const qpid::broker::PersistableQueue&) diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index caf018655c..6a2e960b0f 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -56,13 +56,17 @@ public: virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); virtual void recover(RecoveryManager& queues); - virtual void stage(PersistableMessage& msg); - virtual void destroy(PersistableMessage& msg); - virtual void appendContent(const PersistableMessage& msg, const std::string& data); - virtual void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); - virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + virtual void stage(intrusive_ptr<PersistableMessage>& msg); + virtual void destroy(intrusive_ptr<PersistableMessage>& msg); + virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + intrusive_ptr<const PersistableMessage>& msg, std::string& data, + uint64_t offset, uint32_t length); + virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); virtual void flush(const qpid::broker::PersistableQueue& queue); ~NullMessageStore(){} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 376b9367d0..c43ab8c231 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -431,9 +431,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue - store->enqueue(ctxt, *msg.get(), *this); + intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + store->enqueue(ctxt, pmsg, *this); return true; } + //msg->enqueueAsync(); // increments intrusive ptr cnt return false; } @@ -442,9 +444,11 @@ bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue - store->dequeue(ctxt, *msg.get(), *this); + intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + store->dequeue(ctxt, pmsg, *this); return true; } + //msg->dequeueAsync(); // decrements intrusive ptr cnt return false; } diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp index 023aefc3fa..7335867140 100644 --- a/qpid/cpp/src/tests/MessageBuilderTest.cpp +++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp @@ -38,7 +38,7 @@ class MessageBuilderTest : public CppUnit::TestCase enum Op {STAGE=1, APPEND=2}; uint64_t id; - PersistableMessage* expectedMsg; + intrusive_ptr<PersistableMessage> expectedMsg; string expectedData; std::list<Op> ops; @@ -64,17 +64,17 @@ class MessageBuilderTest : public CppUnit::TestCase ops.push_back(APPEND); } - void stage(PersistableMessage& msg) + void stage(intrusive_ptr<PersistableMessage>& msg) { checkExpectation(STAGE); - CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); - msg.setPersistenceId(++id); + CPPUNIT_ASSERT_EQUAL(expectedMsg, msg); + msg->setPersistenceId(++id); } - void appendContent(const PersistableMessage& msg, const string& data) + void appendContent(intrusive_ptr<const PersistableMessage>& msg, const string& data) { checkExpectation(APPEND); - CPPUNIT_ASSERT_EQUAL((const PersistableMessage*) expectedMsg, &msg); + CPPUNIT_ASSERT_EQUAL(static_pointer_cast<const PersistableMessage>(expectedMsg), msg); CPPUNIT_ASSERT_EQUAL(expectedData, data); } diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp index bcf422e706..1451fb65b6 100644 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ b/qpid/cpp/src/tests/TxAckTest.cpp @@ -39,11 +39,11 @@ class TxAckTest : public CppUnit::TestCase class TestMessageStore : public NullMessageStore { public: - vector<PersistableMessage*> dequeued; + vector<intrusive_ptr<PersistableMessage> > dequeued; - void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/) + void dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& /*queue*/) { - dequeued.push_back(&msg); + dequeued.push_back(msg); } TestMessageStore() : NullMessageStore() {} @@ -97,7 +97,7 @@ public: CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; for (int i = 0; i < 7; i++) { - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]); + CPPUNIT_ASSERT_EQUAL(static_pointer_cast<PersistableMessage>(messages[dequeued[i]]), store.dequeued[i]); } } diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index b969598f1d..c9da9762ec 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -36,17 +36,17 @@ using namespace qpid::framing; class TxPublishTest : public CppUnit::TestCase { - typedef std::pair<string, PersistableMessage*> msg_queue_pair; + typedef std::pair<string, intrusive_ptr<PersistableMessage> > msg_queue_pair; class TestMessageStore : public NullMessageStore { public: vector<msg_queue_pair> enqueued; - void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) + void enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { - msg.enqueueComplete(); - enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); + msg->enqueueComplete(); + enqueued.push_back(msg_queue_pair(queue.getName(), msg)); } //dont care about any of the other methods: @@ -81,16 +81,15 @@ public: void testPrepare() { + intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); //ensure messages are enqueued in store op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[0].second); CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second); - CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg.get())->isEnqueueComplete()); - - + CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[1].second); + CPPUNIT_ASSERT_EQUAL( true, ( static_pointer_cast<PersistableMessage>(msg))->isEnqueueComplete()); } void testCommit() @@ -101,7 +100,7 @@ public: CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount()); intrusive_ptr<Message> msg_dequeue = queue1->dequeue().payload; - CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete()); + CPPUNIT_ASSERT_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete()); CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue); CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount()); |