diff options
author | Gordon Sim <gsim@apache.org> | 2006-12-06 17:51:42 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-12-06 17:51:42 +0000 |
commit | f2d0f0cfbfebb082e92f28b8d1b2987a0f20acf6 (patch) | |
tree | d71e0a7854dcfbef75040ee426b53f6369e0cc7a | |
parent | f8c692d37104a95ad245402ffe0d7d6b7c4ea816 (diff) | |
download | qpid-python-f2d0f0cfbfebb082e92f28b8d1b2987a0f20acf6.tar.gz |
Allow non-durable messages to be lazy-loaded. Cleanup of lazy-loaded messages
that are never enqueued.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@483165 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 7 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerQueue.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/Content.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/InMemoryContent.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/InMemoryContent.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/LazyLoadedContent.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/broker/LazyLoadedContent.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/MessageBuilder.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/broker/MessageStore.h | 12 | ||||
-rw-r--r-- | cpp/lib/broker/MessageStoreModule.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/broker/MessageStoreModule.h | 8 | ||||
-rw-r--r-- | cpp/lib/broker/NullMessageStore.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/broker/NullMessageStore.h | 8 | ||||
-rw-r--r-- | cpp/tests/MessageBuilderTest.cpp | 72 | ||||
-rw-r--r-- | cpp/tests/TxAckTest.cpp | 18 | ||||
-rw-r--r-- | cpp/tests/TxPublishTest.cpp | 12 |
16 files changed, 100 insertions, 71 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index d30cd12bc3..598de2d590 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -53,7 +53,9 @@ Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} -Message::~Message(){} +Message::~Message(){ + if (content.get()) content->destroy(); +} void Message::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; @@ -205,6 +207,9 @@ u_int64_t Message::expectedContentSize() void Message::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); + if (!isPersistent() && persistenceId == 0) { + store->stage(this); + } if (!content.get() || content->size() > 0) { //set content to lazy loading mode (but only if there is stored content): diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index b0e1f20b01..a8c5343ca3 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -189,14 +189,14 @@ bool Queue::canAutoDelete() const{ void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) { if (msg->isPersistent() && store) { - store->enqueue(ctxt, msg, *this, xid); + store->enqueue(ctxt, msg.get(), *this, xid); } } void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) { if (msg->isPersistent() && store) { - store->dequeue(ctxt, msg, *this, xid); + store->dequeue(ctxt, msg.get(), *this, xid); } } diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h index ed425c6735..b5712c35ed 100644 --- a/cpp/lib/broker/Content.h +++ b/cpp/lib/broker/Content.h @@ -33,6 +33,7 @@ namespace qpid { virtual u_int32_t size() = 0; virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual void destroy() = 0; virtual ~Content(){} }; } diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp index 9d40877c86..7205eb3de0 100644 --- a/cpp/lib/broker/InMemoryContent.cpp +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -67,3 +67,7 @@ void InMemoryContent::encode(Buffer& buffer) (*i)->encode(buffer); } } + +void InMemoryContent::destroy() +{ +} diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h index c54d15447d..79c7cf670b 100644 --- a/cpp/lib/broker/InMemoryContent.h +++ b/cpp/lib/broker/InMemoryContent.h @@ -36,6 +36,7 @@ namespace qpid { u_int32_t size(); void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); + void destroy(); ~InMemoryContent(){} }; } diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp index c0da48efda..51aa6c590b 100644 --- a/cpp/lib/broker/LazyLoadedContent.cpp +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -56,3 +56,8 @@ void LazyLoadedContent::encode(Buffer&) { //do nothing as all content is written as soon as it is added } + +void LazyLoadedContent::destroy() +{ + store->destroy(msg); +} diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h index fdb752f117..68e08c7c3f 100644 --- a/cpp/lib/broker/LazyLoadedContent.h +++ b/cpp/lib/broker/LazyLoadedContent.h @@ -36,6 +36,7 @@ namespace qpid { u_int32_t size(); void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); + void destroy(); ~LazyLoadedContent(){} }; } diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp index 7f009d5cdf..41bf812d2d 100644 --- a/cpp/lib/broker/MessageBuilder.cpp +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -53,7 +53,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ } message->setHeader(header); if (stagingThreshold && header->getContentSize() >= stagingThreshold) { - store->stage(message); + store->stage(message.get()); message->releaseContent(store); } else { auto_ptr<Content> content(new InMemoryContent()); diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h index be9172e383..acbff82c35 100644 --- a/cpp/lib/broker/MessageStore.h +++ b/cpp/lib/broker/MessageStore.h @@ -39,7 +39,9 @@ namespace qpid { u_int64_t stagingThreshold; }; /** - * An abstraction of the persistent storage for messages. + * An abstraction of the persistent storage for messages. (In + * all methods, any pointers/references to queues or messages + * are valid only for the duration of the call). */ class MessageStore : public TransactionalStore{ public: @@ -66,7 +68,7 @@ namespace qpid { * persistence id will be set on the message which can be * used to load the content or to append to it. */ - virtual void stage(Message::shared_ptr& msg) = 0; + virtual void stage(Message* const msg) = 0; /** * Destroys a previously staged message. This only needs @@ -74,7 +76,7 @@ namespace qpid { * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ - virtual void destroy(Message::shared_ptr& msg) = 0; + virtual void destroy(Message* const msg) = 0; /** * Appends content to a previously staged message @@ -102,7 +104,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0; + virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message @@ -114,7 +116,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0; + virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; /** * Treat all enqueue/dequeues where this xid was specified as being committed. */ diff --git a/cpp/lib/broker/MessageStoreModule.cpp b/cpp/lib/broker/MessageStoreModule.cpp index b3f5d6e63c..1952786014 100644 --- a/cpp/lib/broker/MessageStoreModule.cpp +++ b/cpp/lib/broker/MessageStoreModule.cpp @@ -43,12 +43,12 @@ void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSe store->recover(registry, settings); } -void MessageStoreModule::stage(Message::shared_ptr& msg) +void MessageStoreModule::stage(Message* const msg) { store->stage(msg); } -void MessageStoreModule::destroy(Message::shared_ptr& msg) +void MessageStoreModule::destroy(Message* const msg) { store->destroy(msg); } @@ -63,12 +63,12 @@ void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t store->loadContent(msg, data, offset, length); } -void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) { store->enqueue(ctxt, msg, queue, xid); } -void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) { store->dequeue(ctxt, msg, queue, xid); } diff --git a/cpp/lib/broker/MessageStoreModule.h b/cpp/lib/broker/MessageStoreModule.h index d70aab6d13..fcff71bd35 100644 --- a/cpp/lib/broker/MessageStoreModule.h +++ b/cpp/lib/broker/MessageStoreModule.h @@ -39,12 +39,12 @@ namespace qpid { void create(const Queue& queue, const qpid::framing::FieldTable& settings); void destroy(const Queue& queue); void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - void stage(Message::shared_ptr& msg); - void destroy(Message::shared_ptr& msg); + void stage(Message* const msg); + void destroy(Message* const msg); void appendContent(Message* const msg, const std::string& data); void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); - void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); void committed(const string * const xid); void aborted(const string * const xid); std::auto_ptr<TransactionContext> begin(); diff --git a/cpp/lib/broker/NullMessageStore.cpp b/cpp/lib/broker/NullMessageStore.cpp index 3c29994aac..a8318a4bf7 100644 --- a/cpp/lib/broker/NullMessageStore.cpp +++ b/cpp/lib/broker/NullMessageStore.cpp @@ -45,12 +45,12 @@ void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* con if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } -void NullMessageStore::stage(Message::shared_ptr&) +void NullMessageStore::stage(Message* const) { if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; } -void NullMessageStore::destroy(Message::shared_ptr&) +void NullMessageStore::destroy(Message* const) { if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; } @@ -65,12 +65,12 @@ void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; } -void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) +void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) +void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; } diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h index 61afe36281..913d6b0a8d 100644 --- a/cpp/lib/broker/NullMessageStore.h +++ b/cpp/lib/broker/NullMessageStore.h @@ -38,12 +38,12 @@ namespace qpid { virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings); virtual void destroy(const Queue& queue); virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - virtual void stage(Message::shared_ptr& msg); - virtual void destroy(Message::shared_ptr& msg); + virtual void stage(Message* const msg); + virtual void destroy(Message* const msg); virtual void appendContent(Message* const msg, const std::string& data); virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); - virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); virtual void committed(const string * const xid); virtual void aborted(const string * const xid); virtual std::auto_ptr<TransactionContext> begin(); diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index 3f84142461..88e8318832 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -50,7 +50,7 @@ class MessageBuilderTest : public CppUnit::TestCase public: - void stage(Message::shared_ptr& msg) + void stage(Message* const msg) { if (msg->getPersistenceId() == 0) { header = new Buffer(msg->encodedHeaderSize()); @@ -71,6 +71,11 @@ class MessageBuilderTest : public CppUnit::TestCase } } + void destroy(Message* msg) + { + CPPUNIT_ASSERT(msg->getPersistenceId()); + } + Message::shared_ptr getRestoredMessage() { Message::shared_ptr msg(new Message()); @@ -164,37 +169,42 @@ class MessageBuilderTest : public CppUnit::TestCase } void testStaging(){ - DummyHandler handler; + //store must be the last thing to be destroyed or destructor + //of Message fails (it uses the store to call destroy if lazy + //loaded content is in use) TestMessageStore store(14); - MessageBuilder builder(&handler, &store, 5); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); - properties->setMessageId("MyMessage"); - properties->getHeaders().setString("abc", "xyz"); - - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - - builder.initialise(message); - builder.setHeader(header); - builder.addContent(part1); - builder.addContent(part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - - Message::shared_ptr restored = store.getRestoredMessage(); - CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); - CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), - restored->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize()); + { + DummyHandler handler; + MessageBuilder builder(&handler, &store, 5); + + string data1("abcdefg"); + string data2("hijklmn"); + + Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); + properties->setMessageId("MyMessage"); + properties->getHeaders().setString("abc", "xyz"); + + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + builder.initialise(message); + builder.setHeader(header); + builder.addContent(part1); + builder.addContent(part2); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + + Message::shared_ptr restored = store.getRestoredMessage(); + CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); + CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), + restored->getHeaderProperties()->getHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize()); + } } }; diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 709d45c1ad..6832c5995b 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -37,9 +37,9 @@ class TxAckTest : public CppUnit::TestCase class TestMessageStore : public NullMessageStore { public: - vector<Message::shared_ptr> dequeued; + vector<Message*> dequeued; - void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/) + void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const /*xid*/) { dequeued.push_back(msg); } @@ -86,13 +86,13 @@ public: op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); - CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 - CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2 - CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3 - CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4 - CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5 - CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7 - CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9 + CPPUNIT_ASSERT_EQUAL(messages[0].get(), store.dequeued[0]);//msg 1 + CPPUNIT_ASSERT_EQUAL(messages[1].get(), store.dequeued[1]);//msg 2 + CPPUNIT_ASSERT_EQUAL(messages[2].get(), store.dequeued[2]);//msg 3 + CPPUNIT_ASSERT_EQUAL(messages[3].get(), store.dequeued[3]);//msg 4 + CPPUNIT_ASSERT_EQUAL(messages[4].get(), store.dequeued[4]);//msg 5 + CPPUNIT_ASSERT_EQUAL(messages[6].get(), store.dequeued[5]);//msg 7 + CPPUNIT_ASSERT_EQUAL(messages[8].get(), store.dequeued[6]);//msg 9 } void testCommit() diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 6324e5fb01..d33d84ec6e 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -38,11 +38,11 @@ class TxPublishTest : public CppUnit::TestCase class TestMessageStore : public NullMessageStore { public: - vector< pair<string, Message::shared_ptr> > enqueued; + vector< pair<string, Message*> > enqueued; - void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/) + void enqueue(TransactionContext*, Message* const msg, const Queue& queue, const string * const /*xid*/) { - enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg)); + enqueued.push_back(pair<string, Message*>(queue.getName(),msg)); } //dont care about any of the other methods: @@ -59,7 +59,7 @@ class TxPublishTest : public CppUnit::TestCase TestMessageStore store; Queue::shared_ptr queue1; Queue::shared_ptr queue2; - Message::shared_ptr msg; + Message::shared_ptr const msg; TxPublish op; @@ -82,9 +82,9 @@ public: op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); - CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[0].second); CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); - CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second); + CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[1].second); } void testCommit() |