diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 0dd4cb7b10..40574ded3b 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -26,11 +26,11 @@ #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/MessageDeque.h" #include "qpid/broker/MessageDistributor.h" #include "qpid/broker/FifoDistributor.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" //TODO: get rid of this @@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw() } Queue::Queue(const string& _name, const QueueSettings& _settings, - MessageStore* const _store, +// MessageStore* const _store, + AsyncStore* const _asyncStore, Manageable* parent, Broker* b) : name(_name), - store(_store), +// store(_store), + asyncStore(_asyncStore), owner(0), consumerCount(0), browserCount(0), @@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); +// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); + mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete); mgmtObject->set_arguments(settings.asMap()); - agent->addObject(mgmtObject, 0, store != 0); +// agent->addObject(mgmtObject, 0, store != 0); + agent->addObject(mgmtObject, 0, asyncStore != 0); brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); @@ -787,7 +791,7 @@ void Queue::setLastNodeFailure() * return true if enqueue succeeded and message should be made * available; returning false will result in the message being dropped */ -bool Queue::enqueue(TransactionContext* ctxt, Message& msg) +bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg) { ScopedUse u(barrier); if (!u.acquired) return false; @@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) msg.addTraceId(settings.traceId); } - if (msg.isPersistent() && store) { +// if (msg.isPersistent() && store) { + if (msg.isPersistent() && asyncStore) { // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() // when it considers the message stored. boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); assert(pmsg); - pmsg->enqueueAsync(shared_from_this(), store); - store->enqueue(ctxt, pmsg, *this); +// pmsg->enqueueAsync(shared_from_this(), store); + pmsg->enqueueAsync(shared_from_this(), asyncStore); +// store->enqueue(ctxt, pmsg, *this); + // TODO - kpvdr: async enqueue here } return true; } @@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Message& msg) void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg) { ScopedUse u(barrier); - if (u.acquired && msg && store) { - store->dequeue(0, msg, *this); +// if (u.acquired && msg && store) { + if (u.acquired && msg && asyncStore) { +// store->dequeue(0, msg, *this); + // TODO: kpvdr: async dequeue here } } @@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) return; } } - if (store && pmsg) { - store->dequeue(ctxt, pmsg, *this); +// if (store && pmsg) { + if (asyncStore && pmsg) { +// store->dequeue(ctxt, pmsg, *this); + // TODO: kpvdr: async dequeue here } } @@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc void Queue::create() { - if (store) { - store->create(*this, settings.storeSettings); +// if (store) { + if (asyncStore) { +// store->create(*this, settings.storeSettings); + // TODO: kpvdr: async store create here } } @@ -1051,11 +1064,16 @@ void Queue::destroyed() alternateExchange->decAlternateUsers(); } - if (store) { +// if (store) { + if (asyncStore) { barrier.destroy(); - store->flush(*this); - store->destroy(*this); - store = 0;//ensure we make no more calls to the store for this queue +// store->flush(*this); + // TODO: kpvdr: async flush here +// store->destroy(*this); + // TODO: kpvdr: async destroy here +// store = 0;//ensure we make no more calls to the store for this queue + // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which + // will cause store to be destroyed when all outstanding async ops are complete. } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); @@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) void Queue::flush() { ScopedUse u(barrier); - if (u.acquired && store) store->flush(*this); +// if (u.acquired && store) store->flush(*this); + // TODO: kpvdr: Async store flush here + if (u.acquired && asyncStore) { /*store->flush(*this);*/ } } @@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, if (exchange->bind(shared_from_this(), key, &arguments)) { bound(exchange->getName(), key, arguments); if (exchange->isDurable() && isDurable()) { - store->bind(*exchange, *this, key, arguments); +// store->bind(*exchange, *this, key, arguments); + // TODO: kpvdr: Store configuration here } return true; } else { |