From 17716fee99670e49a1c3526a44c40d15757d94e3 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 6 Nov 2008 22:40:57 +0000 Subject: Add Message callbacks for async completion. Add unit test for async completion. Add sync parameter to generated session functions, defaults as before but allows greater control. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711998 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 18 ++---- cpp/src/qpid/broker/Broker.h | 2 +- cpp/src/qpid/broker/DtxManager.cpp | 1 - cpp/src/qpid/broker/LinkRegistry.cpp | 1 - cpp/src/qpid/broker/Message.cpp | 27 ++++++-- cpp/src/qpid/broker/Message.h | 27 ++++++-- cpp/src/qpid/broker/PersistableMessage.cpp | 15 +++-- cpp/src/qpid/broker/PersistableMessage.h | 9 ++- cpp/src/qpid/broker/QueueRegistry.cpp | 1 - cpp/src/qpid/sys/BlockingQueue.h | 7 +- cpp/src/tests/AsyncCompletion.cpp | 100 +++++++++++++++++++++++++++++ cpp/src/tests/Makefile.am | 1 + 12 files changed, 173 insertions(+), 36 deletions(-) create mode 100644 cpp/src/tests/AsyncCompletion.cpp (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index be13538ca6..3ba2e70bc2 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -194,16 +194,12 @@ Broker::Broker(const Broker::Options& conf) : (*i)->earlyInitialize(*this); // If no plugin store module registered itself, set up the null store. - if (store == 0) + if (store.get() == 0) setStore (new NullMessageStore (false)); - queues.setStore (store); - dtxManager.setStore (store); - links.setStore (store); - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if (store != 0) { + if (store.get() != 0) { RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); @@ -247,7 +243,7 @@ Broker::Broker(const Broker::Options& conf) : void Broker::declareStandardExchange(const std::string& name, const std::string& type) { - bool storeEnabled = store != NULL; + bool storeEnabled = store.get() != NULL; std::pair status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { store->create(*status.first, framing::FieldTable ()); @@ -269,9 +265,10 @@ boost::intrusive_ptr Broker::create(const Options& opts) void Broker::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); - if (store == 0 && _store != 0) - store = new MessageStoreModule (_store); + store.reset(new MessageStoreModule (_store)); + queues.setStore (store.get()); + dtxManager.setStore (store.get()); + links.setStore (store.get()); } void Broker::run() { @@ -304,7 +301,6 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); finalize(); // Finalize any plugins. - delete store; if (config.auth) SaslAuthenticator::fini(); QPID_LOG(notice, "Shut down"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index bd8cf532d1..c64bfa8a9f 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -112,7 +112,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Options config; management::ManagementAgent::Singleton managementAgentSingleton; ProtocolFactoryMap protocolFactories; - MessageStore* store; + std::auto_ptr store; AclModule* acl; DataDir dataDir; diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 33cff3075e..f4494fccc6 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -167,6 +167,5 @@ void DtxManager::DtxCleanup::fire() void DtxManager::setStore (TransactionalStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 5caf3ec801..960e9f21ba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -180,7 +180,6 @@ void LinkRegistry::destroy(const std::string& host, void LinkRegistry::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d2c5682359..89c647358a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -45,9 +45,10 @@ namespace broker { TransferAdapter Message::TRANSFER; - Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE) {} +Message::Message(const framing::SequenceNumber& id) : + frames(id), persistenceId(0), redelivered(false), loaded(false), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} Message::~Message() { @@ -268,7 +269,7 @@ bool Message::isContentLoaded() const namespace { - const std::string X_QPID_TRACE("x-qpid.trace"); +const std::string X_QPID_TRACE("x-qpid.trace"); } bool Message::isExcluded(const std::vector& excludes) const @@ -341,4 +342,22 @@ void Message::setReplacementMessage(boost::intrusive_ptr msg, const Que replacement[qfor] = msg; } +void Message::allEnqueuesComplete() { + MessageCallback* cb = 0; + { + sys::Mutex::ScopedLock l(lock); + swap(cb, enqueueCallback); + } + if (cb && *cb) (*cb)(*this); +} + +void Message::allDequeuesComplete() { + MessageCallback* cb = 0; + { + sys::Mutex::ScopedLock l(lock); + swap(cb, dequeueCallback); + } + if (cb && *cb) (*cb)(*this); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f7f49f1857..762ec68fe8 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -22,15 +22,15 @@ * */ -#include -#include -#include -#include #include "PersistableMessage.h" #include "MessageAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" +#include "qpid/shared_ptr.h" +#include +#include +#include namespace qpid { @@ -48,6 +48,8 @@ class Queue; class Message : public PersistableMessage { public: + typedef boost::function MessageCallback; + Message(const framing::SequenceNumber& id = framing::SequenceNumber()); ~Message(); @@ -142,7 +144,19 @@ public: boost::intrusive_ptr& getReplacementMessage(const Queue* qfor) const; void setReplacementMessage(boost::intrusive_ptr msg, const Queue* qfor); + /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ + void setEnqueueCompleteCallback(const MessageCallback* cb); + + /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ + void setDequeueCompleteCallback(const MessageCallback& cb); + private: + typedef std::map > Replacement; + + MessageAdapter& getAdapter() const; + void allEnqueuesComplete(); + void allDequeuesComplete(); + mutable sys::Mutex lock; framing::FrameSet frames; mutable boost::shared_ptr exchange; @@ -157,11 +171,10 @@ public: static TransferAdapter TRANSFER; - MessageAdapter& getAdapter() const; - typedef std::map > Replacement; - mutable Replacement replacement; mutable boost::intrusive_ptr empty; + MessageCallback* enqueueCallback; + MessageCallback* dequeueCallback; }; }} diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index b67a669f1d..920dfd6386 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -87,6 +87,7 @@ void PersistableMessage::enqueueComplete() { } } if (notify) { + allEnqueuesComplete(); sys::ScopedLock l(storeLock); if (store) { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { @@ -118,13 +119,17 @@ bool PersistableMessage::isDequeueComplete() { } void PersistableMessage::dequeueComplete() { - - sys::ScopedLock l(asyncDequeueLock); - if (asyncDequeueCounter > 0) { - if (--asyncDequeueCounter == 0) { - asyncDequeueLock.notify(); + bool notify = false; + { + sys::ScopedLock l(asyncDequeueLock); + if (asyncDequeueCounter > 0) { + if (--asyncDequeueCounter == 0) { + notify = true; + asyncDequeueLock.notify(); + } } } + if (notify) allDequeuesComplete(); } void PersistableMessage::waitForDequeueComplete() { diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 6b6e9a7007..59fa2e3d95 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -68,10 +68,15 @@ class PersistableMessage : public Persistable syncList synclist; protected: - MessageStore* store; - + /** Called when all enqueues are complete for this message. */ + virtual void allEnqueuesComplete() = 0; + /** Called when all dequeues are complete for this message. */ + virtual void allDequeuesComplete() = 0; + void setContentReleased(); + MessageStore* store; + public: typedef boost::shared_ptr shared_ptr; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 62d2222595..2447ce5402 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -85,7 +85,6 @@ string QueueRegistry::generateName(){ void QueueRegistry::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index d7e6449d7a..a05a10d811 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -66,10 +66,11 @@ public: return true; } - T pop() { + T pop(Duration timeout=TIME_INFINITE) { T result; - bool ok = pop(result); - assert(ok); (void) ok; // Infinite wait. + bool ok = pop(result, timeout); + if (!ok) + throw Exception("Timed out waiting on a blocking queue"); return result; } diff --git a/cpp/src/tests/AsyncCompletion.cpp b/cpp/src/tests/AsyncCompletion.cpp new file mode 100644 index 0000000000..e33b2dc35d --- /dev/null +++ b/cpp/src/tests/AsyncCompletion.cpp @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "BrokerFixture.h" +#include "qpid/broker/NullMessageStore.h" +#include "qpid/sys/BlockingQueue.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/sys/Time.h" + +using namespace std; +using namespace qpid; +using namespace client; +using namespace framing; + +namespace qpid { namespace broker { +class TransactionContext; +class PersistableQueue; +}} + +using broker::PersistableMessage; +using broker::NullMessageStore; +using broker::TransactionContext; +using broker::PersistableQueue; +using sys::TIME_SEC; +using boost::intrusive_ptr; + +/** @file Unit tests for async completion. + * Using a dummy store, verify that the broker indicates async completion of + * message enqueues at the correct time. + */ + +class AsyncCompletionMessageStore : public NullMessageStore { + public: + sys::BlockingQueue > enqueued; + + AsyncCompletionMessageStore() : NullMessageStore() {} + ~AsyncCompletionMessageStore(){} + + void enqueue(TransactionContext*, + const boost::intrusive_ptr& msg, + const PersistableQueue& ) + { + enqueued.push(msg); + } +}; + +QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite) + +QPID_AUTO_TEST_CASE(testWaitTillComplete) { + AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; + SessionFixture fix; + fix.broker->setStore(store); // Broker will delete store. + AsyncSession s = fix.session; + + static const int count = 3; + + s.queueDeclare("q", arg::durable=true); + Completion transfers[count]; + for (int i = 0; i < count; ++i) { + Message msg(boost::lexical_cast(i), "q"); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + transfers[i] = s.messageTransfer(arg::content=msg); + } + + // Get hold of the broker-side messages. + typedef vector > BrokerMessages; + BrokerMessages enqueued; + for (int j = 0; j < count; ++j) + enqueued.push_back(store->enqueued.pop(TIME_SEC)); + + // Send a sync, make sure it does not complete till all messages are complete. + // In reverse order for fun. + Completion sync = s.executionSync(arg::sync=true); + for (int k = count-1; k >= 0; --k) { + BOOST_CHECK(!transfers[k].isComplete()); // Should not be complete yet. + BOOST_CHECK(!sync.isComplete()); // Should not be complete yet. + enqueued[k]->enqueueComplete(); + } + sync.wait(); // Should complete now, all messages are completed. +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 9f5ca91313..e959934273 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -35,6 +35,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ exception_test.cpp \ RefCounted.cpp \ SessionState.cpp Blob.cpp logging.cpp \ + AsyncCompletion.cpp \ Url.cpp Uuid.cpp \ Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp \ QueueOptionsTest.cpp \ -- cgit v1.2.1