diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-08-15 18:13:02 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-08-15 18:13:02 +0000 |
commit | 7c966bd1fcb801e14e001237096470b9c7e87f1f (patch) | |
tree | 36409d01a360e3150a587fec1b9a30bcb5e8d89b /cpp/src/qpid/broker/BrokerQueue.cpp | |
parent | 3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (diff) | |
download | qpid-python-7c966bd1fcb801e14e001237096470b9c7e87f1f.tar.gz |
async IO for broker store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 3d4d6b83be..d67575103f 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -56,6 +56,14 @@ Queue::Queue(const string& _name, bool _autodelete, Queue::~Queue(){} +void Queue::notifyDurableIOComplete() +{ + // signal SemanticHander to ack completed dequeues + // then dispatch to ack... + serializer.execute(dispatchCallback); +} + + void Queue::deliver(Message::shared_ptr& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { @@ -63,11 +71,20 @@ void Queue::deliver(Message::shared_ptr& msg){ alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } else { - enqueue(0, msg); - process(msg); + + + // if no store then mark as enqueued + if (!enqueue(0, msg)){ + push(msg); + msg->enqueueComplete(); + }else { + push(msg); + } + serializer.execute(dispatchCallback); } } + void Queue::recover(Message::shared_ptr& msg){ push(msg); if (store && msg->expectedContentSize() != msg->encodedContentSize()) { @@ -127,6 +144,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){ void Queue::dispatch(){ + Message::shared_ptr msg; while(true){ { @@ -134,7 +152,7 @@ void Queue::dispatch(){ if (messages.empty()) break; msg = messages.front(); } - if( dispatch(msg) ){ + if( msg->isEnqueueComplete() && dispatch(msg) ){ pop(); }else break; @@ -215,20 +233,27 @@ bool Queue::canAutoDelete() const{ return autodelete && consumers.size() == 0; } -void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) +// return true if store exists, +bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { store->enqueue(ctxt, *msg.get(), *this); + return true; } + return false; } -void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) +// return true if store exists, +bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { store->dequeue(ctxt, *msg.get(), *this); + return true; } + return false; } + namespace { const std::string qpidMaxSize("qpid.max_size"); |