summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-08-15 18:13:02 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-08-15 18:13:02 +0000
commit7c966bd1fcb801e14e001237096470b9c7e87f1f (patch)
tree36409d01a360e3150a587fec1b9a30bcb5e8d89b /cpp/src
parent3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (diff)
downloadqpid-python-7c966bd1fcb801e14e001237096470b9c7e87f1f.tar.gz
async IO for broker store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp35
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h10
-rw-r--r--cpp/src/qpid/broker/MessageStore.h23
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp5
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp11
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h1
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h26
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h12
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp9
10 files changed, 122 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 3d4d6b83be..d67575103f 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -56,6 +56,14 @@ Queue::Queue(const string& _name, bool _autodelete,
Queue::~Queue(){}
+void Queue::notifyDurableIOComplete()
+{
+ // signal SemanticHander to ack completed dequeues
+ // then dispatch to ack...
+ serializer.execute(dispatchCallback);
+}
+
+
void Queue::deliver(Message::shared_ptr& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
@@ -63,11 +71,20 @@ void Queue::deliver(Message::shared_ptr& msg){
alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
}
} else {
- enqueue(0, msg);
- process(msg);
+
+
+ // if no store then mark as enqueued
+ if (!enqueue(0, msg)){
+ push(msg);
+ msg->enqueueComplete();
+ }else {
+ push(msg);
+ }
+ serializer.execute(dispatchCallback);
}
}
+
void Queue::recover(Message::shared_ptr& msg){
push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
@@ -127,6 +144,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){
void Queue::dispatch(){
+
Message::shared_ptr msg;
while(true){
{
@@ -134,7 +152,7 @@ void Queue::dispatch(){
if (messages.empty()) break;
msg = messages.front();
}
- if( dispatch(msg) ){
+ if( msg->isEnqueueComplete() && dispatch(msg) ){
pop();
}else break;
@@ -215,20 +233,27 @@ bool Queue::canAutoDelete() const{
return autodelete && consumers.size() == 0;
}
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->enqueue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->dequeue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
+
namespace
{
const std::string qpidMaxSize("qpid.max_size");
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 4214b4b03f..857a7adfc2 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -87,6 +87,12 @@ namespace qpid {
*/
void dispatch();
+ protected:
+ /**
+ * Call back from store
+ */
+ virtual void notifyDurableIOComplete();
+
public:
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -143,11 +149,11 @@ namespace qpid {
bool canAutoDelete() const;
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
+ bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
+ bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeues from memory only
*/
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index cf7dae9888..36a7f8824e 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -104,7 +104,10 @@ public:
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
- * message is on the given queue.
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
*
* @param msg the message to enqueue
* @param queue the name of the queue onto which it is to be enqueued
@@ -113,18 +116,34 @@ public:
* place or null for 'local' transactions
*/
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
* if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
*
* @param msg the message to dequeue
- * @param queue the name of th queue from which it is to be dequeued
+ * @param queue the name of the queue from which it is to be dequeued
* @param xid (a pointer to) an identifier of the
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+
virtual ~MessageStore(){}
};
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 0457643b75..ba37852537 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -95,6 +95,11 @@ void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& m
store->dequeue(ctxt, msg, queue);
}
+u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue)
+{
+ return store->outstandingQueueAIO(queue);
+}
+
std::auto_ptr<TransactionContext> MessageStoreModule::begin()
{
return store->begin();
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 078d2c1fdf..0da12a1a75 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -59,8 +59,10 @@ public:
void destroy(PersistableMessage& msg);
void appendContent(PersistableMessage& msg, const std::string& data);
void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+
void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
~MessageStoreModule(){}
};
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 35a2a45c1f..13f7dd3b9a 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -97,16 +97,23 @@ void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint3
QPID_LOG(info, "Can't load content. Persistence not enabled.");
}
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
+void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
{
+ msg.enqueueComplete();
QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
{
+ msg.dequeueComplete();
QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled.");
}
+u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
+{
+ return 0;
+}
+
std::auto_ptr<TransactionContext> NullMessageStore::begin()
{
return std::auto_ptr<TransactionContext>();
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index e6188b43ce..0d5a5b55f9 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -62,6 +62,7 @@ public:
virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+ virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
~NullMessageStore(){}
};
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 11a12d68e1..72457c408f 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -36,6 +36,23 @@ namespace broker {
*/
class PersistableMessage : public Persistable
{
+
+
+ /**
+ * Needs to be set false on Message construction, then
+ * set once the broker has taken responsibility for the
+ * message. For transient, once enqueued, for durable, once
+ * stored.
+ */
+ bool enqueueCompleted;
+ /**
+ * Needs to be set false on Message construction, then
+ * set once the dequeueis complete, it gets set
+ * For transient, once dequeued, for durable, once
+ * dequeue record has been stored.
+ */
+ bool dequeueCompleted;
+
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -45,6 +62,15 @@ public:
virtual uint32_t encodedHeaderSize() const = 0;
virtual ~PersistableMessage() {};
+ PersistableMessage():
+ enqueueCompleted(false),
+ dequeueCompleted(false){};
+
+ inline bool isEnqueueComplete() {return enqueueCompleted;};
+ inline void enqueueComplete() {enqueueCompleted = true;};
+ inline bool isDequeueComplete() {return dequeueCompleted;};
+ inline void dequeueComplete() {dequeueCompleted = true;};
+
};
}}
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index 5dd91dde9b..4ba08c5998 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -35,8 +35,20 @@ namespace broker {
class PersistableQueue : public Persistable
{
public:
+
virtual const std::string& getName() const = 0;
virtual ~PersistableQueue() {};
+
+protected:
+ /**
+ * call back for the store to signal AIO writes have
+ * completed (enqueue/dequeue etc)
+ *
+ * Note: DO NOT do work on this callback, if you block
+ * this callback you will block the store.
+ */
+ virtual void notifyDurableIOComplete() = 0;
+
};
}}
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index db02673b1f..ac246a3bfe 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -51,7 +51,14 @@ TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg
: ctxt(_ctxt), msg(_msg){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(ctxt, msg);
+ if (!queue->enqueue(ctxt, msg)){
+ /**
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
+ msg->enqueueComplete();
+ }
}
TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}