diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-08-15 18:13:02 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-08-15 18:13:02 +0000 |
commit | 7c966bd1fcb801e14e001237096470b9c7e87f1f (patch) | |
tree | 36409d01a360e3150a587fec1b9a30bcb5e8d89b /cpp/src | |
parent | 3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (diff) | |
download | qpid-python-7c966bd1fcb801e14e001237096470b9c7e87f1f.tar.gz |
async IO for broker store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableQueue.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 9 |
10 files changed, 122 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 3d4d6b83be..d67575103f 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -56,6 +56,14 @@ Queue::Queue(const string& _name, bool _autodelete, Queue::~Queue(){} +void Queue::notifyDurableIOComplete() +{ + // signal SemanticHander to ack completed dequeues + // then dispatch to ack... + serializer.execute(dispatchCallback); +} + + void Queue::deliver(Message::shared_ptr& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { @@ -63,11 +71,20 @@ void Queue::deliver(Message::shared_ptr& msg){ alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } else { - enqueue(0, msg); - process(msg); + + + // if no store then mark as enqueued + if (!enqueue(0, msg)){ + push(msg); + msg->enqueueComplete(); + }else { + push(msg); + } + serializer.execute(dispatchCallback); } } + void Queue::recover(Message::shared_ptr& msg){ push(msg); if (store && msg->expectedContentSize() != msg->encodedContentSize()) { @@ -127,6 +144,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){ void Queue::dispatch(){ + Message::shared_ptr msg; while(true){ { @@ -134,7 +152,7 @@ void Queue::dispatch(){ if (messages.empty()) break; msg = messages.front(); } - if( dispatch(msg) ){ + if( msg->isEnqueueComplete() && dispatch(msg) ){ pop(); }else break; @@ -215,20 +233,27 @@ bool Queue::canAutoDelete() const{ return autodelete && consumers.size() == 0; } -void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) +// return true if store exists, +bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { store->enqueue(ctxt, *msg.get(), *this); + return true; } + return false; } -void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) +// return true if store exists, +bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { store->dequeue(ctxt, *msg.get(), *this); + return true; } + return false; } + namespace { const std::string qpidMaxSize("qpid.max_size"); diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 4214b4b03f..857a7adfc2 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -87,6 +87,12 @@ namespace qpid { */ void dispatch(); + protected: + /** + * Call back from store + */ + virtual void notifyDurableIOComplete(); + public: typedef boost::shared_ptr<Queue> shared_ptr; @@ -143,11 +149,11 @@ namespace qpid { bool canAutoDelete() const; - void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); + bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); /** * dequeue from store (only done once messages is acknowledged) */ - void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg); + bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg); /** * dequeues from memory only */ diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index cf7dae9888..36a7f8824e 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -104,7 +104,10 @@ public: /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given - * message is on the given queue. + * message is on the given queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. * * @param msg the message to enqueue * @param queue the name of the queue onto which it is to be enqueued @@ -113,18 +116,34 @@ public: * place or null for 'local' transactions */ virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message * if it is no longer on any other queue. + * + * Note: that this is async so the return of the function does + * not mean the opperation is complete. * * @param msg the message to dequeue - * @param queue the name of th queue from which it is to be dequeued + * @param queue the name of the queue from which it is to be dequeued * @param xid (a pointer to) an identifier of the * distributed transaction in which the operation takes * place or null for 'local' transactions */ virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + + + /** + * Returns the number of outstanding AIO's for a given queue + * + * If 0, than all the enqueue / dequeues have been stored + * to disk + * + * @param queue the name of the queue to check for outstanding AIO + */ + virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue) = 0; + virtual ~MessageStore(){} }; diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 0457643b75..ba37852537 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -95,6 +95,11 @@ void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& m store->dequeue(ctxt, msg, queue); } +u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue) +{ + return store->outstandingQueueAIO(queue); +} + std::auto_ptr<TransactionContext> MessageStoreModule::begin() { return store->begin(); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 078d2c1fdf..0da12a1a75 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -59,8 +59,10 @@ public: void destroy(PersistableMessage& msg); void appendContent(PersistableMessage& msg, const std::string& data); void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + u_int32_t outstandingQueueAIO(const PersistableQueue& queue); ~MessageStoreModule(){} }; diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 35a2a45c1f..13f7dd3b9a 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -97,16 +97,23 @@ void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint3 QPID_LOG(info, "Can't load content. Persistence not enabled."); } -void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue) +void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) { + msg.enqueueComplete(); QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue) +void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) { + msg.dequeueComplete(); QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled."); } +u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& ) +{ + return 0; +} + std::auto_ptr<TransactionContext> NullMessageStore::begin() { return std::auto_ptr<TransactionContext>(); diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index e6188b43ce..0d5a5b55f9 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -62,6 +62,7 @@ public: virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); ~NullMessageStore(){} }; diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 11a12d68e1..72457c408f 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -36,6 +36,23 @@ namespace broker { */ class PersistableMessage : public Persistable { + + + /** + * Needs to be set false on Message construction, then + * set once the broker has taken responsibility for the + * message. For transient, once enqueued, for durable, once + * stored. + */ + bool enqueueCompleted; + /** + * Needs to be set false on Message construction, then + * set once the dequeueis complete, it gets set + * For transient, once dequeued, for durable, once + * dequeue record has been stored. + */ + bool dequeueCompleted; + public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; @@ -45,6 +62,15 @@ public: virtual uint32_t encodedHeaderSize() const = 0; virtual ~PersistableMessage() {}; + PersistableMessage(): + enqueueCompleted(false), + dequeueCompleted(false){}; + + inline bool isEnqueueComplete() {return enqueueCompleted;}; + inline void enqueueComplete() {enqueueCompleted = true;}; + inline bool isDequeueComplete() {return dequeueCompleted;}; + inline void dequeueComplete() {dequeueCompleted = true;}; + }; }} diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h index 5dd91dde9b..4ba08c5998 100644 --- a/cpp/src/qpid/broker/PersistableQueue.h +++ b/cpp/src/qpid/broker/PersistableQueue.h @@ -35,8 +35,20 @@ namespace broker { class PersistableQueue : public Persistable { public: + virtual const std::string& getName() const = 0; virtual ~PersistableQueue() {}; + +protected: + /** + * call back for the store to signal AIO writes have + * completed (enqueue/dequeue etc) + * + * Note: DO NOT do work on this callback, if you block + * this callback you will block the store. + */ + virtual void notifyDurableIOComplete() = 0; + }; }} diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index db02673b1f..ac246a3bfe 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -51,7 +51,14 @@ TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg : ctxt(_ctxt), msg(_msg){} void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ - queue->enqueue(ctxt, msg); + if (!queue->enqueue(ctxt, msg)){ + /** + * if not store then mark message for ack and deleivery once + * commit happens, as async IO will never set it when no store + * exists + */ + msg->enqueueComplete(); + } } TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} |