summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-25 12:03:34 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-25 12:03:34 +0000
commitb435b07eb8fa9db484f85b39daaf43642dd623ca (patch)
treec327f38fb21c75267abdb9cd338fe3778d883140
parent75e9139d60bc049fabf9b3b779ddd157bb5160bb (diff)
downloadqpid-python-b435b07eb8fa9db484f85b39daaf43642dd623ca.tar.gz
QPID-3858: WIP: Removed PersistableQueuedMessage again. The non-durable transactional enqueues are broken.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1365545 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/asyncstore.cmake1
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h31
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp24
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp37
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp9
8 files changed, 41 insertions, 122 deletions
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
index 9c0fd0c1b5..94631bb8ea 100644
--- a/cpp/src/tests/asyncstore.cmake
+++ b/cpp/src/tests/asyncstore.cmake
@@ -58,7 +58,6 @@ set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/MessageDeque.cpp
storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/PersistableQueuedMessage.cpp
storePerftools/asyncPerf/QueuedMessage.cpp
storePerftools/asyncPerf/SimpleMessage.cpp
storePerftools/asyncPerf/SimpleQueue.cpp
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
deleted file mode 100644
index 2eba7d5be5..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-#include "PersistableQueuedMessage.h"
-
-#include "SimpleQueue.h"
-#include "SimpleMessage.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-PersistableQueuedMessage::PersistableQueuedMessage()
-{}
-
-PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q,
- boost::intrusive_ptr<SimpleMessage> msg) :
- QueuedMessage(q, msg),
- m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
-{}
-
-PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) :
- QueuedMessage(pm),
- m_enqHandle(pm.m_enqHandle)
-{}
-
-PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) :
- QueuedMessage(pm),
- m_enqHandle(pm->m_enqHandle)
-{}
-
-PersistableQueuedMessage::~PersistableQueuedMessage()
-{}
-
-PersistableQueuedMessage&
-PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs)
-{
- QueuedMessage::operator=(rhs);
- m_enqHandle = rhs.m_enqHandle;
- return *this;
-}
-
-const qpid::broker::EnqueueHandle&
-PersistableQueuedMessage::enqHandle() const
-{
- return m_enqHandle;
-}
-
-qpid::broker::EnqueueHandle&
-PersistableQueuedMessage::enqHandle()
-{
- return m_enqHandle;
-}
-
-}}} // tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
deleted file mode 100644
index 1e9446aa57..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
+++ /dev/null
@@ -1,31 +0,0 @@
-#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
-#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
-
-#include "QueuedMessage.h"
-
-#include "qpid/broker/EnqueueHandle.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class PersistableQueuedMessage : public QueuedMessage {
-public:
- PersistableQueuedMessage();
- PersistableQueuedMessage(SimpleQueue* q,
- boost::intrusive_ptr<SimpleMessage> msg);
- PersistableQueuedMessage(const PersistableQueuedMessage& pqm);
- PersistableQueuedMessage(PersistableQueuedMessage* const pqm);
- virtual ~PersistableQueuedMessage();
- PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs);
-
- const qpid::broker::EnqueueHandle& enqHandle() const;
- qpid::broker::EnqueueHandle& enqHandle();
-
-private:
- qpid::broker::EnqueueHandle m_enqHandle;
-};
-
-}}} // tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index a733a96171..572089faaf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -41,18 +41,24 @@ QueuedMessage::QueuedMessage(SimpleQueue* q,
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(q),
m_msg(msg)
-{}
+{
+ if (m_queue->getStore()) {
+ m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+ }
+}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm.m_queue),
- m_msg(qm.m_msg)
+ m_msg(qm.m_msg),
+ m_enqHandle(qm.m_enqHandle)
{}
QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm->m_queue),
- m_msg(qm->m_msg)
+ m_msg(qm->m_msg),
+ m_enqHandle(qm->m_enqHandle)
{}
QueuedMessage::~QueuedMessage()
@@ -70,6 +76,18 @@ QueuedMessage::payload() const
return m_msg;
}
+const qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle() const
+{
+ return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle()
+{
+ return m_enqHandle;
+}
+
void
QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index d872cfde58..dd10f8b501 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -25,6 +25,7 @@
#define tests_storePerftools_asyncPerf_QueuedMessage_h_
#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/EnqueueHandle.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -54,6 +55,8 @@ public:
virtual ~QueuedMessage();
SimpleQueue* getQueue() const;
boost::intrusive_ptr<SimpleMessage> payload() const;
+ const qpid::broker::EnqueueHandle& enqHandle() const;
+ qpid::broker::EnqueueHandle& enqHandle();
// --- Transaction handling ---
void prepareEnqueue(qpid::broker::TxnHandle& th);
@@ -63,6 +66,7 @@ public:
private:
SimpleQueue* m_queue;
boost::intrusive_ptr<SimpleMessage> m_msg;
+ qpid::broker::EnqueueHandle m_enqHandle;
};
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 292fc35925..f297e83402 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -26,7 +26,7 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "PersistableQueuedMessage.h"
+#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "qpid/broker/AsyncResultHandle.h"
@@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
+ boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
enqueue(s_nullTxnHandle, qm);
push(qm);
}
@@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncEnqueue(th, qm);
}
return false;
}
@@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncDequeue(th, qm);
}
return true;
}
@@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
- push(qm);
+ push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
@@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncEnqueueResult,
&m_resultQueue));
@@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(pqm->enqHandle(), th, qac);
+ m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncDequeueResult,
&m_resultQueue));
@@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(pqm->enqHandle(),
+ m_store->submitDequeue(qm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 1126d67775..bf88e32345 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -49,7 +49,6 @@ namespace asyncPerf {
class MessageConsumer;
class Messages;
-class PersistableQueuedMessage;
class QueuedMessage;
class SimpleMessage;
@@ -136,10 +135,10 @@ private:
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm);
+ boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
bool asyncDequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm);
+ boost::shared_ptr<QueuedMessage> qm);
static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Async op counter ---
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 3d27449aa4..6e15526e8f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -23,7 +23,6 @@
#include "TxnPublish.h"
-#include "PersistableQueuedMessage.h"
#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
@@ -97,13 +96,7 @@ TxnPublish::contentSize()
void
TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (m_msg->isPersistent() && queue->getStore()) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
- }
- m_queues.push_back(qm);
+ m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
m_delivered = true;
}