diff options
author | Alan Conway <aconway@apache.org> | 2008-11-06 22:40:57 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-06 22:40:57 +0000 |
commit | 17716fee99670e49a1c3526a44c40d15757d94e3 (patch) | |
tree | a61a44820b9ee72d920a65f3d5683585b652fa76 /cpp/src/qpid | |
parent | 2de0473cf8c64e06396c5f5e6a0cf8b5e982514e (diff) | |
download | qpid-python-17716fee99670e49a1c3526a44c40d15757d94e3.tar.gz |
Add Message callbacks for async completion.
Add unit test for async completion.
Add sync parameter to generated session functions, defaults as before but allows greater control.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711998 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 7 |
10 files changed, 72 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index be13538ca6..3ba2e70bc2 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -194,16 +194,12 @@ Broker::Broker(const Broker::Options& conf) : (*i)->earlyInitialize(*this); // If no plugin store module registered itself, set up the null store. - if (store == 0) + if (store.get() == 0) setStore (new NullMessageStore (false)); - queues.setStore (store); - dtxManager.setStore (store); - links.setStore (store); - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if (store != 0) { + if (store.get() != 0) { RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); @@ -247,7 +243,7 @@ Broker::Broker(const Broker::Options& conf) : void Broker::declareStandardExchange(const std::string& name, const std::string& type) { - bool storeEnabled = store != NULL; + bool storeEnabled = store.get() != NULL; std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { store->create(*status.first, framing::FieldTable ()); @@ -269,9 +265,10 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) void Broker::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); - if (store == 0 && _store != 0) - store = new MessageStoreModule (_store); + store.reset(new MessageStoreModule (_store)); + queues.setStore (store.get()); + dtxManager.setStore (store.get()); + links.setStore (store.get()); } void Broker::run() { @@ -304,7 +301,6 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); finalize(); // Finalize any plugins. - delete store; if (config.auth) SaslAuthenticator::fini(); QPID_LOG(notice, "Shut down"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index bd8cf532d1..c64bfa8a9f 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -112,7 +112,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Options config; management::ManagementAgent::Singleton managementAgentSingleton; ProtocolFactoryMap protocolFactories; - MessageStore* store; + std::auto_ptr<MessageStore> store; AclModule* acl; DataDir dataDir; diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 33cff3075e..f4494fccc6 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -167,6 +167,5 @@ void DtxManager::DtxCleanup::fire() void DtxManager::setStore (TransactionalStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 5caf3ec801..960e9f21ba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -180,7 +180,6 @@ void LinkRegistry::destroy(const std::string& host, void LinkRegistry::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d2c5682359..89c647358a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -45,9 +45,10 @@ namespace broker { TransferAdapter Message::TRANSFER; - Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE) {} +Message::Message(const framing::SequenceNumber& id) : + frames(id), persistenceId(0), redelivered(false), loaded(false), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} Message::~Message() { @@ -268,7 +269,7 @@ bool Message::isContentLoaded() const namespace { - const std::string X_QPID_TRACE("x-qpid.trace"); +const std::string X_QPID_TRACE("x-qpid.trace"); } bool Message::isExcluded(const std::vector<std::string>& excludes) const @@ -341,4 +342,22 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que replacement[qfor] = msg; } +void Message::allEnqueuesComplete() { + MessageCallback* cb = 0; + { + sys::Mutex::ScopedLock l(lock); + swap(cb, enqueueCallback); + } + if (cb && *cb) (*cb)(*this); +} + +void Message::allDequeuesComplete() { + MessageCallback* cb = 0; + { + sys::Mutex::ScopedLock l(lock); + swap(cb, dequeueCallback); + } + if (cb && *cb) (*cb)(*this); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f7f49f1857..762ec68fe8 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -22,15 +22,15 @@ * */ -#include <string> -#include <vector> -#include <boost/shared_ptr.hpp> -#include <boost/variant.hpp> #include "PersistableMessage.h" #include "MessageAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" +#include "qpid/shared_ptr.h" +#include <boost/function.hpp> +#include <string> +#include <vector> namespace qpid { @@ -48,6 +48,8 @@ class Queue; class Message : public PersistableMessage { public: + typedef boost::function<void (Message&)> MessageCallback; + Message(const framing::SequenceNumber& id = framing::SequenceNumber()); ~Message(); @@ -142,7 +144,19 @@ public: boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); + /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ + void setEnqueueCompleteCallback(const MessageCallback* cb); + + /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ + void setDequeueCompleteCallback(const MessageCallback& cb); + private: + typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; + + MessageAdapter& getAdapter() const; + void allEnqueuesComplete(); + void allDequeuesComplete(); + mutable sys::Mutex lock; framing::FrameSet frames; mutable boost::shared_ptr<Exchange> exchange; @@ -157,11 +171,10 @@ public: static TransferAdapter TRANSFER; - MessageAdapter& getAdapter() const; - typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; - mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; + MessageCallback* enqueueCallback; + MessageCallback* dequeueCallback; }; }} diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index b67a669f1d..920dfd6386 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -87,6 +87,7 @@ void PersistableMessage::enqueueComplete() { } } if (notify) { + allEnqueuesComplete(); sys::ScopedLock<sys::Mutex> l(storeLock); if (store) { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { @@ -118,13 +119,17 @@ bool PersistableMessage::isDequeueComplete() { } void PersistableMessage::dequeueComplete() { - - sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); - if (asyncDequeueCounter > 0) { - if (--asyncDequeueCounter == 0) { - asyncDequeueLock.notify(); + bool notify = false; + { + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + if (asyncDequeueCounter > 0) { + if (--asyncDequeueCounter == 0) { + notify = true; + asyncDequeueLock.notify(); + } } } + if (notify) allDequeuesComplete(); } void PersistableMessage::waitForDequeueComplete() { diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 6b6e9a7007..59fa2e3d95 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -68,10 +68,15 @@ class PersistableMessage : public Persistable syncList synclist; protected: - MessageStore* store; - + /** Called when all enqueues are complete for this message. */ + virtual void allEnqueuesComplete() = 0; + /** Called when all dequeues are complete for this message. */ + virtual void allDequeuesComplete() = 0; + void setContentReleased(); + MessageStore* store; + public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 62d2222595..2447ce5402 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -85,7 +85,6 @@ string QueueRegistry::generateName(){ void QueueRegistry::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); store = _store; } diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index d7e6449d7a..a05a10d811 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -66,10 +66,11 @@ public: return true; } - T pop() { + T pop(Duration timeout=TIME_INFINITE) { T result; - bool ok = pop(result); - assert(ok); (void) ok; // Infinite wait. + bool ok = pop(result, timeout); + if (!ok) + throw Exception("Timed out waiting on a blocking queue"); return result; } |