diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-23 00:29:32 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-23 00:29:32 +0000 |
commit | 4274056d0323499ba148ec0ed10770d2758b9ae7 (patch) | |
tree | d7e69f0b6c019aedf891a0ebe5017365bae4a323 /cpp/src | |
parent | dd56b89311e810bbfd3b002e026ad4ab5f79b71e (diff) | |
download | qpid-python-4274056d0323499ba148ec0ed10770d2758b9ae7.tar.gz |
- flush async IO if present on sync for 0-10
- notify, for ack from sync for 0-10
- use of raw pointer, to avoid recursive fre
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@587332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IncomingExecutionContext.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 36 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableQueue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 5 |
5 files changed, 46 insertions, 7 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 784f2db227..02921fbd08 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -144,6 +144,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ + qpid/broker/PersistableMessage.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -199,6 +200,7 @@ libqpidclient_la_SOURCES = \ qpid/client/Connection.cpp \ qpid/client/Channel.cpp \ qpid/client/Exchange.cpp \ + qpid/broker/PersistableMessage.cpp \ qpid/client/Queue.cpp \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.cpp b/cpp/src/qpid/broker/IncomingExecutionContext.cpp index 7cf1179fcb..4747c71033 100644 --- a/cpp/src/qpid/broker/IncomingExecutionContext.cpp +++ b/cpp/src/qpid/broker/IncomingExecutionContext.cpp @@ -112,6 +112,10 @@ SequenceNumberSet IncomingExecutionContext::getRange() void IncomingExecutionContext::wait() { check(); + // for IO flush on the store + for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); i++) { + (*i)->flush(); + } incomplete.front()->waitForEnqueueComplete(); flush(); } diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index e932fafeae..d8bcc70a30 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -23,14 +23,18 @@ */ #include <string> +#include <list> #include <boost/shared_ptr.hpp> #include "Persistable.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Monitor.h" +#include "PersistableQueue.h" namespace qpid { namespace broker { +class MessageStore; + /** * The interface messages must expose to the MessageStore in order to * be persistable. @@ -39,7 +43,8 @@ class PersistableMessage : public Persistable { sys::Monitor asyncEnqueueLock; sys::Monitor asyncDequeueLock; - + sys::Mutex storeLock; + /** * Tracks the number of outstanding asynchronous enqueue * operations. When the message is enqueued asynchronously the @@ -57,7 +62,10 @@ class PersistableMessage : public Persistable * dequeues. */ int asyncDequeueCounter; - +protected: + typedef std::list<PersistableQueue*> syncList; + syncList synclist; + MessageStore* store; public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; @@ -70,8 +78,11 @@ public: PersistableMessage(): asyncEnqueueCounter(0), - asyncDequeueCounter(0) + asyncDequeueCounter(0), + store(0) {} + + void flush(); inline void waitForEnqueueComplete() { sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); @@ -94,6 +105,15 @@ public: } } + inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + synclist.push_back(queue); + } + enqueueAsync(); + } + inline void enqueueAsync() { sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); asyncEnqueueCounter++; @@ -105,6 +125,7 @@ public: } inline void dequeueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); if (asyncDequeueCounter > 0) { if (--asyncDequeueCounter == 0) { @@ -120,6 +141,15 @@ public: } } + inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + synclist.push_back(queue); + } + dequeueAsync(); + } + inline void dequeueAsync() { sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); asyncDequeueCounter++; diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h index 951c93fb86..2a352b3e9b 100644 --- a/cpp/src/qpid/broker/PersistableQueue.h +++ b/cpp/src/qpid/broker/PersistableQueue.h @@ -24,6 +24,7 @@ #include <string> #include "Persistable.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -49,16 +50,17 @@ public: class PersistableQueue : public Persistable { public: + typedef boost::shared_ptr<PersistableQueue> shared_ptr; virtual const std::string& getName() const = 0; virtual ~PersistableQueue() { if (externalQueueStore) - delete externalQueueStore; + delete externalQueueStore; }; inline void setExternalQueueStore(ExternalQueueStore* inst){ if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + delete externalQueueStore; externalQueueStore = inst; }; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 456e055c74..b72cbc5721 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -347,7 +347,8 @@ bool Queue::canAutoDelete() const{ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg) { if (msg->isPersistent() && store) { - msg->enqueueAsync(); //increment to async counter -- for message sent to more than one queue +std::cout << "-------------- enqueue ------------" << std::endl << std::flush; + msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->enqueue(ctxt, *msg.get(), *this); return true; } @@ -358,7 +359,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg) bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg) { if (msg->isPersistent() && store) { - msg->dequeueAsync(); //increment to async counter -- for message sent to more than one queue + msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->dequeue(ctxt, *msg.get(), *this); return true; } |