summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp92
1 files changed, 79 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40574ded3b..f595b81724 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
//#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -818,9 +819,17 @@ bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
// pmsg->enqueueAsync(shared_from_this(), store);
- pmsg->enqueueAsync(shared_from_this(), asyncStore);
// store->enqueue(ctxt, pmsg, *this);
- // TODO - kpvdr: async enqueue here
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+ pmsg->createMessageHandle(asyncStore);
+ EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &enqueueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitEnqueue(eh, th, qac);
}
return true;
}
@@ -893,7 +902,15 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
// if (store && pmsg) {
if (asyncStore && pmsg) {
// store->dequeue(ctxt, pmsg, *this);
- // TODO: kpvdr: async dequeue here
+ pmsg->dequeueAsync(shared_from_this(), asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle);
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &dequeueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitDequeue(eh, th, qac);
}
}
@@ -908,6 +925,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
}
+
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
@@ -997,7 +1015,9 @@ void Queue::create()
// if (store) {
if (asyncStore) {
// store->create(*this, settings.storeSettings);
- // TODO: kpvdr: async store create here
+ queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map());
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitCreate(queueHandle, this, qac);
}
}
@@ -1068,12 +1088,14 @@ void Queue::destroyed()
if (asyncStore) {
barrier.destroy();
// store->flush(*this);
- // TODO: kpvdr: async flush here
+ boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitFlush(queueHandle, flush_qac);
// store->destroy(*this);
- // TODO: kpvdr: async destroy here
+ boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitDestroy(queueHandle, destroy_qac);
// store = 0;//ensure we make no more calls to the store for this queue
// TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
- // will cause store to be destroyed when all outstanding async ops are complete.
+ // will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1102,7 +1124,7 @@ void Queue::bound(const string& exchange, const string& key,
void Queue::unbind(ExchangeRegistry& exchanges)
{
- bindings.unbind(exchanges, shared_from_this());
+ bindings.unbind(exchanges, shared_from_this(), asyncStore);
}
uint64_t Queue::getPersistenceId() const
@@ -1274,6 +1296,46 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement
+void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement
+
+// static
+void Queue::createComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::dequeueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<PersistableQueue> pq = qac->getQueue();
+ boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage();
+ QueueHandle& qh = pq->getQueueHandle();
+ pmsg->dequeueComplete();
+ pmsg->removeEnqueueHandle(qh);
+// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::destroyComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ // TODO: kpvdr: set Queue::asyncStore = 0 from here.
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::enqueueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ qac->getMessage()->enqueueComplete();
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::flushComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
void Queue::countRejected() const
{
if (mgmtObject) {
@@ -1464,19 +1526,23 @@ void Queue::flush()
ScopedUse u(barrier);
// if (u.acquired && store) store->flush(*this);
// TODO: kpvdr: Async store flush here
- if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
+ if (u.acquired && asyncStore) {
+ //store->flush(*this);
+ std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush;
+ }
}
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
- if (exchange->bind(shared_from_this(), key, &arguments)) {
+ if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) {
bound(exchange->getName(), key, arguments);
- if (exchange->isDurable() && isDurable()) {
+// Move this to Exchange::bind() which keeps the binding context
+// if (exchange->isDurable() && isDurable()) {
// store->bind(*exchange, *this, key, arguments);
- // TODO: kpvdr: Store configuration here
- }
+// // TODO: kpvdr: Store configuration here
+// }
return true;
} else {
return false;