summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp65
1 files changed, 43 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 0dd4cb7b10..40574ded3b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -26,11 +26,11 @@
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -165,12 +165,14 @@ void Queue::TxPublish::rollback() throw()
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
- MessageStore* const _store,
+// MessageStore* const _store,
+ AsyncStore* const _asyncStore,
Manageable* parent,
Broker* b) :
name(_name),
- store(_store),
+// store(_store),
+ asyncStore(_asyncStore),
owner(0),
consumerCount(0),
browserCount(0),
@@ -198,9 +200,11 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete);
mgmtObject->set_arguments(settings.asMap());
- agent->addObject(mgmtObject, 0, store != 0);
+// agent->addObject(mgmtObject, 0, store != 0);
+ agent->addObject(mgmtObject, 0, asyncStore != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
@@ -787,7 +791,7 @@ void Queue::setLastNodeFailure()
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
*/
-bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
+bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -807,13 +811,16 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
msg.addTraceId(settings.traceId);
}
- if (msg.isPersistent() && store) {
+// if (msg.isPersistent() && store) {
+ if (msg.isPersistent() && asyncStore) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
- pmsg->enqueueAsync(shared_from_this(), store);
- store->enqueue(ctxt, pmsg, *this);
+// pmsg->enqueueAsync(shared_from_this(), store);
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+// store->enqueue(ctxt, pmsg, *this);
+ // TODO - kpvdr: async enqueue here
}
return true;
}
@@ -858,8 +865,10 @@ void Queue::dequeueCommited(const Message& msg)
void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
{
ScopedUse u(barrier);
- if (u.acquired && msg && store) {
- store->dequeue(0, msg, *this);
+// if (u.acquired && msg && store) {
+ if (u.acquired && msg && asyncStore) {
+// store->dequeue(0, msg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -881,8 +890,10 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
return;
}
}
- if (store && pmsg) {
- store->dequeue(ctxt, pmsg, *this);
+// if (store && pmsg) {
+ if (asyncStore && pmsg) {
+// store->dequeue(ctxt, pmsg, *this);
+ // TODO: kpvdr: async dequeue here
}
}
@@ -983,8 +994,10 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc
void Queue::create()
{
- if (store) {
- store->create(*this, settings.storeSettings);
+// if (store) {
+ if (asyncStore) {
+// store->create(*this, settings.storeSettings);
+ // TODO: kpvdr: async store create here
}
}
@@ -1051,11 +1064,16 @@ void Queue::destroyed()
alternateExchange->decAlternateUsers();
}
- if (store) {
+// if (store) {
+ if (asyncStore) {
barrier.destroy();
- store->flush(*this);
- store->destroy(*this);
- store = 0;//ensure we make no more calls to the store for this queue
+// store->flush(*this);
+ // TODO: kpvdr: async flush here
+// store->destroy(*this);
+ // TODO: kpvdr: async destroy here
+// store = 0;//ensure we make no more calls to the store for this queue
+ // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
+ // will cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1444,7 +1462,9 @@ void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
void Queue::flush()
{
ScopedUse u(barrier);
- if (u.acquired && store) store->flush(*this);
+// if (u.acquired && store) store->flush(*this);
+ // TODO: kpvdr: Async store flush here
+ if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
}
@@ -1454,7 +1474,8 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
if (exchange->bind(shared_from_this(), key, &arguments)) {
bound(exchange->getName(), key, arguments);
if (exchange->isDurable() && isDurable()) {
- store->bind(*exchange, *this, key, arguments);
+// store->bind(*exchange, *this, key, arguments);
+ // TODO: kpvdr: Store configuration here
}
return true;
} else {