summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp172
1 files changed, 72 insertions, 100 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index f297e83402..06b4e9333f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -31,7 +31,7 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnHandle.h"
+#include "qpid/broker/TxnBuffer.h"
#include <string.h> // memcpy()
@@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name,
}
}
-SimpleQueue::~SimpleQueue()
-{}
+SimpleQueue::~SimpleQueue() {}
const qpid::broker::QueueHandle&
-SimpleQueue::getHandle() const
-{
+SimpleQueue::getHandle() const {
return m_queueHandle;
}
qpid::broker::QueueHandle&
-SimpleQueue::getHandle()
-{
+SimpleQueue::getHandle() {
return m_queueHandle;
}
qpid::broker::AsyncStore*
-SimpleQueue::getStore()
-{
+SimpleQueue::getStore() {
return m_store;
}
void
-SimpleQueue::asyncCreate()
-{
+SimpleQueue::asyncCreate() {
if (m_store) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncCreateResult,
&m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
@@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
&handleAsyncDestroyResult,
&m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
@@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
}
void
-SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
- enqueue(s_nullTxnHandle, qm);
+ enqueue(qm);
push(qm);
}
bool
-SimpleQueue::dispatch(MessageConsumer& mc)
-{
+SimpleQueue::dispatch(MessageConsumer& mc) {
boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
@@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
-{
- return enqueue(s_nullTxnHandle, qm);
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
+ return enqueue(0, qm);
}
bool
-SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ return asyncEnqueue(tb, qm);
}
return false;
}
bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
-{
- return dequeue(s_nullTxnHandle, qm);
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
+ return dequeue(0, qm);
}
bool
-SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ return asyncDequeue(tb, qm);
}
return true;
}
void
-SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
-SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/)
-{}
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
void
-SimpleQueue::encode(qpid::framing::Buffer& buffer) const
-{
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
buffer.putShortString(m_name);
}
uint32_t
-SimpleQueue::encodedSize() const
-{
+SimpleQueue::encodedSize() const {
return m_name.size() + 1;
}
uint64_t
-SimpleQueue::getPersistenceId() const
-{
+SimpleQueue::getPersistenceId() const {
return m_persistenceId;
}
void
-SimpleQueue::setPersistenceId(uint64_t persistenceId) const
-{
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
m_persistenceId = persistenceId;
}
void
-SimpleQueue::flush()
-{
+SimpleQueue::flush() {
//if(m_store) m_store->flush(*this);
}
const std::string&
-SimpleQueue::getName() const
-{
+SimpleQueue::getName() const {
return m_name;
}
void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
-{
+SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
if (externalQueueStore != inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
}
uint64_t
-SimpleQueue::getSize()
-{
+SimpleQueue::getSize() {
return m_persistableData.size();
}
void
-SimpleQueue::write(char* target)
-{
+SimpleQueue::write(char* target) {
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
@@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
+ boost::shared_ptr<QueuedMessage> qm) {
assert(qm.get());
-// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncEnqueueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
-SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
+SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
boost::shared_ptr<QueuedMessage> qm)
{
assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
qm->payload(),
- th,
+ tb,
&handleAsyncDequeueResult,
&m_resultQueue));
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- if (th.isValid()) {
- th.incrOpCnt();
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
}
- m_store->submitDequeue(qm->enqHandle(),
- th,
- qac);
++m_asyncOpCounter;
return true;
}
@@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con
// private
void
-SimpleQueue::destroyCheck(const std::string& opDescr) const
-{
+SimpleQueue::destroyCheck(const std::string& opDescr) const {
if (m_destroyPending || m_destroyed) {
std::ostringstream oss;
oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
@@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
}
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
+SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
--m_asyncOpCounter;
m_destroyed = true;
}
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
- assert(qc->getQueue().get() == this);
- --m_asyncOpCounter;
-
- // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
- qpid::broker::TxnHandle th = qc->getTxnHandle();
- if (th.isValid()) { // transactional enqueue
- th.decrOpCnt();
+SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
}
+ --m_asyncOpCounter;
}
}}} // namespace tests::storePerftools::asyncPerf