diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 92 |
1 files changed, 79 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40574ded3b..f595b81724 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageDistributor.h" #include "qpid/broker/FifoDistributor.h" //#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/QueueRegistry.h" //TODO: get rid of this @@ -818,9 +819,17 @@ bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg) boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); assert(pmsg); // pmsg->enqueueAsync(shared_from_this(), store); - pmsg->enqueueAsync(shared_from_this(), asyncStore); // store->enqueue(ctxt, pmsg, *this); - // TODO - kpvdr: async enqueue here + pmsg->enqueueAsync(shared_from_this(), asyncStore); + pmsg->createMessageHandle(asyncStore); + EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore); + TxnHandle th; // TODO: kpvdr: Impement transactions + boost::shared_ptr<QueueAsyncContext> qac( + new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), + pmsg, + &enqueueComplete, + &broker->getAsyncResultQueue())); + asyncStore->submitEnqueue(eh, th, qac); } return true; } @@ -893,7 +902,15 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) // if (store && pmsg) { if (asyncStore && pmsg) { // store->dequeue(ctxt, pmsg, *this); - // TODO: kpvdr: async dequeue here + pmsg->dequeueAsync(shared_from_this(), asyncStore); + TxnHandle th; // TODO: kpvdr: Impement transactions + EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle); + boost::shared_ptr<QueueAsyncContext> qac( + new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), + pmsg, + &dequeueComplete, + &broker->getAsyncResultQueue())); + asyncStore->submitDequeue(eh, th, qac); } } @@ -908,6 +925,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(contentSize); } + if (brokerMgmtObject) { _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); bStats->msgTxnDequeues += 1; @@ -997,7 +1015,9 @@ void Queue::create() // if (store) { if (asyncStore) { // store->create(*this, settings.storeSettings); - // TODO: kpvdr: async store create here + queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map()); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue())); + asyncStore->submitCreate(queueHandle, this, qac); } } @@ -1068,12 +1088,14 @@ void Queue::destroyed() if (asyncStore) { barrier.destroy(); // store->flush(*this); - // TODO: kpvdr: async flush here + boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue())); + asyncStore->submitFlush(queueHandle, flush_qac); // store->destroy(*this); - // TODO: kpvdr: async destroy here + boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue())); + asyncStore->submitDestroy(queueHandle, destroy_qac); // store = 0;//ensure we make no more calls to the store for this queue // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which - // will cause store to be destroyed when all outstanding async ops are complete. + // will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete. } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); @@ -1102,7 +1124,7 @@ void Queue::bound(const string& exchange, const string& key, void Queue::unbind(ExchangeRegistry& exchanges) { - bindings.unbind(exchanges, shared_from_this()); + bindings.unbind(exchanges, shared_from_this(), asyncStore); } uint64_t Queue::getPersistenceId() const @@ -1274,6 +1296,46 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement +void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement + +// static +void Queue::createComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::dequeueComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<PersistableQueue> pq = qac->getQueue(); + boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage(); + QueueHandle& qh = pq->getQueueHandle(); + pmsg->dequeueComplete(); + pmsg->removeEnqueueHandle(qh); +// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::destroyComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + // TODO: kpvdr: set Queue::asyncStore = 0 from here. +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::enqueueComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + qac->getMessage()->enqueueComplete(); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::flushComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + void Queue::countRejected() const { if (mgmtObject) { @@ -1464,19 +1526,23 @@ void Queue::flush() ScopedUse u(barrier); // if (u.acquired && store) store->flush(*this); // TODO: kpvdr: Async store flush here - if (u.acquired && asyncStore) { /*store->flush(*this);*/ } + if (u.acquired && asyncStore) { + //store->flush(*this); + std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush; + } } bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { - if (exchange->bind(shared_from_this(), key, &arguments)) { + if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) { bound(exchange->getName(), key, arguments); - if (exchange->isDurable() && isDurable()) { +// Move this to Exchange::bind() which keeps the binding context +// if (exchange->isDurable() && isDurable()) { // store->bind(*exchange, *this, key, arguments); - // TODO: kpvdr: Store configuration here - } +// // TODO: kpvdr: Store configuration here +// } return true; } else { return false; |