summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-10-23 00:29:32 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-10-23 00:29:32 +0000
commit4274056d0323499ba148ec0ed10770d2758b9ae7 (patch)
treed7e69f0b6c019aedf891a0ebe5017365bae4a323 /cpp/src
parentdd56b89311e810bbfd3b002e026ad4ab5f79b71e (diff)
downloadqpid-python-4274056d0323499ba148ec0ed10770d2758b9ae7.tar.gz
- flush async IO if present on sync for 0-10
- notify, for ack from sync for 0-10 - use of raw pointer, to avoid recursive fre git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@587332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/IncomingExecutionContext.cpp4
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h36
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h6
-rw-r--r--cpp/src/qpid/broker/Queue.cpp5
5 files changed, 46 insertions, 7 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 784f2db227..02921fbd08 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -144,6 +144,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
+ qpid/broker/PersistableMessage.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -199,6 +200,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Connection.cpp \
qpid/client/Channel.cpp \
qpid/client/Exchange.cpp \
+ qpid/broker/PersistableMessage.cpp \
qpid/client/Queue.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.cpp b/cpp/src/qpid/broker/IncomingExecutionContext.cpp
index 7cf1179fcb..4747c71033 100644
--- a/cpp/src/qpid/broker/IncomingExecutionContext.cpp
+++ b/cpp/src/qpid/broker/IncomingExecutionContext.cpp
@@ -112,6 +112,10 @@ SequenceNumberSet IncomingExecutionContext::getRange()
void IncomingExecutionContext::wait()
{
check();
+ // for IO flush on the store
+ for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); i++) {
+ (*i)->flush();
+ }
incomplete.front()->waitForEnqueueComplete();
flush();
}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index e932fafeae..d8bcc70a30 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -23,14 +23,18 @@
*/
#include <string>
+#include <list>
#include <boost/shared_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Monitor.h"
+#include "PersistableQueue.h"
namespace qpid {
namespace broker {
+class MessageStore;
+
/**
* The interface messages must expose to the MessageStore in order to
* be persistable.
@@ -39,7 +43,8 @@ class PersistableMessage : public Persistable
{
sys::Monitor asyncEnqueueLock;
sys::Monitor asyncDequeueLock;
-
+ sys::Mutex storeLock;
+
/**
* Tracks the number of outstanding asynchronous enqueue
* operations. When the message is enqueued asynchronously the
@@ -57,7 +62,10 @@ class PersistableMessage : public Persistable
* dequeues.
*/
int asyncDequeueCounter;
-
+protected:
+ typedef std::list<PersistableQueue*> syncList;
+ syncList synclist;
+ MessageStore* store;
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -70,8 +78,11 @@ public:
PersistableMessage():
asyncEnqueueCounter(0),
- asyncDequeueCounter(0)
+ asyncDequeueCounter(0),
+ store(0)
{}
+
+ void flush();
inline void waitForEnqueueComplete() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -94,6 +105,15 @@ public:
}
}
+ inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ store = _store;
+ synclist.push_back(queue);
+ }
+ enqueueAsync();
+ }
+
inline void enqueueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
asyncEnqueueCounter++;
@@ -105,6 +125,7 @@ public:
}
inline void dequeueComplete() {
+
sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
if (asyncDequeueCounter > 0) {
if (--asyncDequeueCounter == 0) {
@@ -120,6 +141,15 @@ public:
}
}
+ inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ store = _store;
+ synclist.push_back(queue);
+ }
+ dequeueAsync();
+ }
+
inline void dequeueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
asyncDequeueCounter++;
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index 951c93fb86..2a352b3e9b 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -24,6 +24,7 @@
#include <string>
#include "Persistable.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -49,16 +50,17 @@ public:
class PersistableQueue : public Persistable
{
public:
+ typedef boost::shared_ptr<PersistableQueue> shared_ptr;
virtual const std::string& getName() const = 0;
virtual ~PersistableQueue() {
if (externalQueueStore)
- delete externalQueueStore;
+ delete externalQueueStore;
};
inline void setExternalQueueStore(ExternalQueueStore* inst){
if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ delete externalQueueStore;
externalQueueStore = inst;
};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 456e055c74..b72cbc5721 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -347,7 +347,8 @@ bool Queue::canAutoDelete() const{
bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
{
if (msg->isPersistent() && store) {
- msg->enqueueAsync(); //increment to async counter -- for message sent to more than one queue
+std::cout << "-------------- enqueue ------------" << std::endl << std::flush;
+ msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->enqueue(ctxt, *msg.get(), *this);
return true;
}
@@ -358,7 +359,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
{
if (msg->isPersistent() && store) {
- msg->dequeueAsync(); //increment to async counter -- for message sent to more than one queue
+ msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->dequeue(ctxt, *msg.get(), *this);
return true;
}