From b435b07eb8fa9db484f85b39daaf43642dd623ca Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 25 Jul 2012 12:03:34 +0000 Subject: QPID-3858: WIP: Removed PersistableQueuedMessage again. The non-durable transactional enqueues are broken. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1365545 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/asyncstore.cmake | 1 - .../asyncPerf/PersistableQueuedMessage.cpp | 52 ---------------------- .../asyncPerf/PersistableQueuedMessage.h | 31 ------------- .../storePerftools/asyncPerf/QueuedMessage.cpp | 24 ++++++++-- .../tests/storePerftools/asyncPerf/QueuedMessage.h | 4 ++ .../tests/storePerftools/asyncPerf/SimpleQueue.cpp | 37 ++++++--------- .../tests/storePerftools/asyncPerf/SimpleQueue.h | 5 +-- .../tests/storePerftools/asyncPerf/TxnPublish.cpp | 9 +--- 8 files changed, 41 insertions(+), 122 deletions(-) delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake index 9c0fd0c1b5..94631bb8ea 100644 --- a/cpp/src/tests/asyncstore.cmake +++ b/cpp/src/tests/asyncstore.cmake @@ -58,7 +58,6 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/MessageDeque.cpp storePerftools/asyncPerf/MessageProducer.cpp storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/PersistableQueuedMessage.cpp storePerftools/asyncPerf/QueuedMessage.cpp storePerftools/asyncPerf/SimpleMessage.cpp storePerftools/asyncPerf/SimpleQueue.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp deleted file mode 100644 index 2eba7d5be5..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include "PersistableQueuedMessage.h" - -#include "SimpleQueue.h" -#include "SimpleMessage.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -PersistableQueuedMessage::PersistableQueuedMessage() -{} - -PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q, - boost::intrusive_ptr msg) : - QueuedMessage(q, msg), - m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) -{} - -PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) : - QueuedMessage(pm), - m_enqHandle(pm.m_enqHandle) -{} - -PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) : - QueuedMessage(pm), - m_enqHandle(pm->m_enqHandle) -{} - -PersistableQueuedMessage::~PersistableQueuedMessage() -{} - -PersistableQueuedMessage& -PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs) -{ - QueuedMessage::operator=(rhs); - m_enqHandle = rhs.m_enqHandle; - return *this; -} - -const qpid::broker::EnqueueHandle& -PersistableQueuedMessage::enqHandle() const -{ - return m_enqHandle; -} - -qpid::broker::EnqueueHandle& -PersistableQueuedMessage::enqHandle() -{ - return m_enqHandle; -} - -}}} // tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h deleted file mode 100644 index 1e9446aa57..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ -#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ - -#include "QueuedMessage.h" - -#include "qpid/broker/EnqueueHandle.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class PersistableQueuedMessage : public QueuedMessage { -public: - PersistableQueuedMessage(); - PersistableQueuedMessage(SimpleQueue* q, - boost::intrusive_ptr msg); - PersistableQueuedMessage(const PersistableQueuedMessage& pqm); - PersistableQueuedMessage(PersistableQueuedMessage* const pqm); - virtual ~PersistableQueuedMessage(); - PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs); - - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); - -private: - qpid::broker::EnqueueHandle m_enqHandle; -}; - -}}} // tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index a733a96171..572089faaf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -41,18 +41,24 @@ QueuedMessage::QueuedMessage(SimpleQueue* q, boost::enable_shared_from_this(), m_queue(q), m_msg(msg) -{} +{ + if (m_queue->getStore()) { + m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); + } +} QueuedMessage::QueuedMessage(const QueuedMessage& qm) : boost::enable_shared_from_this(), m_queue(qm.m_queue), - m_msg(qm.m_msg) + m_msg(qm.m_msg), + m_enqHandle(qm.m_enqHandle) {} QueuedMessage::QueuedMessage(QueuedMessage* const qm) : boost::enable_shared_from_this(), m_queue(qm->m_queue), - m_msg(qm->m_msg) + m_msg(qm->m_msg), + m_enqHandle(qm->m_enqHandle) {} QueuedMessage::~QueuedMessage() @@ -70,6 +76,18 @@ QueuedMessage::payload() const return m_msg; } +const qpid::broker::EnqueueHandle& +QueuedMessage::enqHandle() const +{ + return m_enqHandle; +} + +qpid::broker::EnqueueHandle& +QueuedMessage::enqHandle() +{ + return m_enqHandle; +} + void QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index d872cfde58..dd10f8b501 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -25,6 +25,7 @@ #define tests_storePerftools_asyncPerf_QueuedMessage_h_ #include "qpid/broker/AsyncStore.h" +#include "qpid/broker/EnqueueHandle.h" #include #include @@ -54,6 +55,8 @@ public: virtual ~QueuedMessage(); SimpleQueue* getQueue() const; boost::intrusive_ptr payload() const; + const qpid::broker::EnqueueHandle& enqHandle() const; + qpid::broker::EnqueueHandle& enqHandle(); // --- Transaction handling --- void prepareEnqueue(qpid::broker::TxnHandle& th); @@ -63,6 +66,7 @@ public: private: SimpleQueue* m_queue; boost::intrusive_ptr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; }; }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 292fc35925..f297e83402 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -26,7 +26,7 @@ #include "DeliveryRecord.h" #include "MessageConsumer.h" #include "MessageDeque.h" -#include "PersistableQueuedMessage.h" +#include "QueuedMessage.h" #include "SimpleMessage.h" #include "qpid/broker/AsyncResultHandle.h" @@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con void SimpleQueue::deliver(boost::intrusive_ptr msg) { - boost::shared_ptr qm; - if (msg->isPersistent() && m_store) { - qm = boost::shared_ptr(new PersistableQueuedMessage(this, msg)); - } else { - qm = boost::shared_ptr(new QueuedMessage(this, msg)); - } + boost::shared_ptr qm(boost::shared_ptr(new QueuedMessage(this, msg))); enqueue(s_nullTxnHandle, qm); push(qm); } @@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th, } if (qm->payload()->isPersistent() && m_store) { qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, boost::dynamic_pointer_cast(qm)); + return asyncEnqueue(th, qm); } return false; } @@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, } if (qm->payload()->isPersistent() && m_store) { qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, boost::dynamic_pointer_cast(qm)); + return asyncDequeue(th, qm); } return true; } @@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, void SimpleQueue::process(boost::intrusive_ptr msg) { - boost::shared_ptr qm; - if (msg->isPersistent() && m_store) { - qm = boost::shared_ptr(new PersistableQueuedMessage(this, msg)); - } else { - qm = boost::shared_ptr(new QueuedMessage(this, msg)); - } - push(qm); + push(boost::shared_ptr(new QueuedMessage(this, msg))); } void @@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr qm, // private bool SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr pqm) + boost::shared_ptr qm) { - assert(pqm.get()); + assert(qm.get()); // qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - pqm->payload(), + qm->payload(), th, &handleAsyncEnqueueResult, &m_resultQueue)); @@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(pqm->enqHandle(), th, qac); + m_store->submitEnqueue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr pqm) + boost::shared_ptr qm) { - assert(pqm.get()); + assert(qm.get()); boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - pqm->payload(), + qm->payload(), th, &handleAsyncDequeueResult, &m_resultQueue)); @@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, if (th.isValid()) { th.incrOpCnt(); } - m_store->submitDequeue(pqm->enqHandle(), + m_store->submitDequeue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 1126d67775..bf88e32345 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -49,7 +49,6 @@ namespace asyncPerf { class MessageConsumer; class Messages; -class PersistableQueuedMessage; class QueuedMessage; class SimpleMessage; @@ -136,10 +135,10 @@ private: // -- Async ops --- bool asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr pqm); + boost::shared_ptr qm); static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); bool asyncDequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr pqm); + boost::shared_ptr qm); static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); // --- Async op counter --- diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 3d27449aa4..6e15526e8f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -23,7 +23,6 @@ #include "TxnPublish.h" -#include "PersistableQueuedMessage.h" #include "QueuedMessage.h" #include "SimpleMessage.h" #include "SimpleQueue.h" @@ -97,13 +96,7 @@ TxnPublish::contentSize() void TxnPublish::deliverTo(const boost::shared_ptr& queue) { - boost::shared_ptr qm; - if (m_msg->isPersistent() && queue->getStore()) { - qm = boost::shared_ptr(new PersistableQueuedMessage(queue.get(), m_msg)); - } else { - qm = boost::shared_ptr(new QueuedMessage(queue.get(), m_msg)); - } - m_queues.push_back(qm); + m_queues.push_back(boost::shared_ptr(new QueuedMessage(queue.get(), m_msg))); m_delivered = true; } -- cgit v1.2.1