summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
commita804510d81ade0594a75b5c9b8765cafcc233245 (patch)
tree8c6be643564b6d8c88619d17de7150c98a314781 /cpp/src/tests
parent1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff)
downloadqpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/CMakeLists.txt77
-rw-r--r--cpp/src/tests/asyncstore.cmake94
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp10
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Messages.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h31
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp37
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h13
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp86
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h26
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp36
20 files changed, 356 insertions, 206 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index ad9a518867..637442e128 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -344,78 +344,5 @@ add_library (dlclose_noop MODULE dlclose_noop.c)
#check-long:
# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
-
-# Async Store perf tests
-# ----------------------
-
-# New journal perf test (jrnl2Perf)
-set (jrnl2Perf_SOURCES
- storePerftools/jrnlPerf/Journal.cpp
- storePerftools/jrnlPerf/JournalParameters.cpp
- storePerftools/jrnlPerf/PerfTest.cpp
- storePerftools/jrnlPerf/TestResult.cpp
-
- storePerftools/common/Parameters.cpp
- storePerftools/common/PerftoolError.cpp
- storePerftools/common/ScopedTimable.cpp
- storePerftools/common/ScopedTimer.cpp
- storePerftools/common/Streamable.cpp
- storePerftools/common/TestParameters.cpp
- storePerftools/common/TestResult.cpp
- storePerftools/common/Thread.cpp
-)
-
-if (UNIX)
- add_executable (jrnl2Perf ${jrnl2Perf_SOURCES})
- set_target_properties (jrnl2Perf PROPERTIES
- COMPILE_FLAGS "-DJOURNAL2"
- )
- target_link_libraries (jrnl2Perf
- asyncStore
- qpidbroker
- rt
- )
-endif (UNIX)
-
-# Async store perf test (asyncPerf)
-set (asyncStorePerf_SOURCES
- storePerftools/asyncPerf/Deliverable.cpp
- storePerftools/asyncPerf/DeliveryRecord.cpp
- storePerftools/asyncPerf/MessageAsyncContext.cpp
- storePerftools/asyncPerf/MessageConsumer.cpp
- storePerftools/asyncPerf/MessageDeque.cpp
- storePerftools/asyncPerf/MessageProducer.cpp
- storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/QueueAsyncContext.cpp
- storePerftools/asyncPerf/QueuedMessage.cpp
- storePerftools/asyncPerf/SimpleMessage.cpp
- storePerftools/asyncPerf/SimpleQueue.cpp
- storePerftools/asyncPerf/TestOptions.cpp
- storePerftools/asyncPerf/TestResult.cpp
- storePerftools/asyncPerf/TxnAccept.cpp
- storePerftools/asyncPerf/TxnPublish.cpp
-
- storePerftools/common/Parameters.cpp
- storePerftools/common/PerftoolError.cpp
- storePerftools/common/ScopedTimable.cpp
- storePerftools/common/ScopedTimer.cpp
- storePerftools/common/Streamable.cpp
- storePerftools/common/TestOptions.cpp
- storePerftools/common/TestResult.cpp
- storePerftools/common/Thread.cpp
-)
-
-if (UNIX)
- add_executable (asyncStorePerf ${asyncStorePerf_SOURCES})
- set_target_properties (asyncStorePerf PROPERTIES
- COMPILE_FLAGS "-DJOURNAL2"
- )
- target_link_libraries (asyncStorePerf
- boost_program_options
- asyncStore
- qpidbroker
- qpidcommon
- qpidtypes
- rt
- )
-endif (UNIX)
+# Include async store tests
+include(asyncstore.cmake)
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
new file mode 100644
index 0000000000..cd20394908
--- /dev/null
+++ b/cpp/src/tests/asyncstore.cmake
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# Async store test CMake fragment, to be included in tests/CMakeLists.txt
+#
+
+# New journal perf test (jrnl2Perf)
+set (jrnl2Perf_SOURCES
+ storePerftools/jrnlPerf/Journal.cpp
+ storePerftools/jrnlPerf/JournalParameters.cpp
+ storePerftools/jrnlPerf/PerfTest.cpp
+ storePerftools/jrnlPerf/TestResult.cpp
+
+ storePerftools/common/Parameters.cpp
+ storePerftools/common/PerftoolError.cpp
+ storePerftools/common/ScopedTimable.cpp
+ storePerftools/common/ScopedTimer.cpp
+ storePerftools/common/Streamable.cpp
+ storePerftools/common/TestParameters.cpp
+ storePerftools/common/TestResult.cpp
+ storePerftools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (jrnl2Perf ${jrnl2Perf_SOURCES})
+ set_target_properties (jrnl2Perf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ )
+ target_link_libraries (jrnl2Perf
+ asyncStore
+ qpidbroker
+ rt
+ )
+endif (UNIX)
+
+# Async store perf test (asyncPerf)
+set (asyncStorePerf_SOURCES
+ storePerftools/asyncPerf/Deliverable.cpp
+ storePerftools/asyncPerf/DeliveryRecord.cpp
+ storePerftools/asyncPerf/MessageAsyncContext.cpp
+ storePerftools/asyncPerf/MessageConsumer.cpp
+ storePerftools/asyncPerf/MessageDeque.cpp
+ storePerftools/asyncPerf/MessageProducer.cpp
+ storePerftools/asyncPerf/PerfTest.cpp
+ storePerftools/asyncPerf/PersistableQueuedMessage.cpp
+ storePerftools/asyncPerf/QueueAsyncContext.cpp
+ storePerftools/asyncPerf/QueuedMessage.cpp
+ storePerftools/asyncPerf/SimpleMessage.cpp
+ storePerftools/asyncPerf/SimpleQueue.cpp
+ storePerftools/asyncPerf/TestOptions.cpp
+ storePerftools/asyncPerf/TestResult.cpp
+ storePerftools/asyncPerf/TxnAccept.cpp
+ storePerftools/asyncPerf/TxnPublish.cpp
+
+ storePerftools/common/Parameters.cpp
+ storePerftools/common/PerftoolError.cpp
+ storePerftools/common/ScopedTimable.cpp
+ storePerftools/common/ScopedTimer.cpp
+ storePerftools/common/Streamable.cpp
+ storePerftools/common/TestOptions.cpp
+ storePerftools/common/TestResult.cpp
+ storePerftools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (asyncStorePerf ${asyncStorePerf_SOURCES})
+ set_target_properties (asyncStorePerf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ )
+ target_link_libraries (asyncStorePerf
+ boost_program_options
+ asyncStore
+ qpidbroker
+ qpidcommon
+ qpidtypes
+ rt
+ )
+endif (UNIX)
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
index b7250ecf40..1728a2dc1e 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -31,7 +31,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-DeliveryRecord::DeliveryRecord(const QueuedMessage& qm,
+DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
MessageConsumer& mc,
bool accepted) :
m_queuedMessage(qm),
@@ -47,7 +47,7 @@ bool
DeliveryRecord::accept()
{
if (!m_ended) {
- m_queuedMessage.getQueue()->dequeue(m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(m_queuedMessage);
m_accepted = true;
setEnded();
}
@@ -64,7 +64,7 @@ bool
DeliveryRecord::setEnded()
{
m_ended = true;
- m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0);
+ m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0);
return isRedundant();
}
@@ -83,7 +83,7 @@ DeliveryRecord::isRedundant() const
void
DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
{
- m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage);
}
void
@@ -92,7 +92,7 @@ DeliveryRecord::committed() const
m_msgConsumer.commitComplete();
}
-QueuedMessage
+boost::shared_ptr<QueuedMessage>
DeliveryRecord::getQueuedMessage() const
{
return m_queuedMessage;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
index 427cf846f0..bb89787737 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -26,6 +26,8 @@
#include "QueuedMessage.h"
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace broker {
class TxnHandle;
@@ -39,7 +41,7 @@ class MessageConsumer;
class DeliveryRecord {
public:
- DeliveryRecord(const QueuedMessage& qm,
+ DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
MessageConsumer& mc,
bool accepted);
virtual ~DeliveryRecord();
@@ -50,9 +52,9 @@ public:
bool isRedundant() const;
void dequeue(qpid::broker::TxnHandle& txn);
void committed() const;
- QueuedMessage getQueuedMessage() const;
+ boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
private:
- QueuedMessage m_queuedMessage;
+ boost::shared_ptr<QueuedMessage> m_queuedMessage;
MessageConsumer& m_msgConsumer;
bool m_accepted : 1;
bool m_ended : 1;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
index c61ce352a1..8b79a91ac1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
@@ -42,7 +42,7 @@ MessageDeque::size()
}
bool
-MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/)
+MessageDeque::push(boost::shared_ptr<QueuedMessage>& added)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
m_messages.push_back(added);
@@ -50,7 +50,7 @@ MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/)
}
bool
-MessageDeque::consume(QueuedMessage& msg)
+MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
if (!m_messages.empty()) {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
index 75f422779e..021015f3e0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
@@ -46,10 +46,10 @@ public:
MessageDeque();
virtual ~MessageDeque();
uint32_t size();
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- bool consume(QueuedMessage& msg);
+ bool push(boost::shared_ptr<QueuedMessage>& added);
+ bool consume(boost::shared_ptr<QueuedMessage>& msg);
private:
- std::deque<QueuedMessage> m_messages;
+ std::deque<boost::shared_ptr<QueuedMessage> > m_messages;
qpid::sys::Mutex m_msgMutex;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
index 9b5bd0be99..c1bfa328ea 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
@@ -30,6 +30,7 @@
#ifndef tests_storePerftools_asyncPerf_Messages_h_
#define tests_storePerftools_asyncPerf_Messages_h_
+#include <boost/shared_ptr.hpp>
#include <stdint.h>
namespace tests {
@@ -43,8 +44,8 @@ class Messages
public:
virtual ~Messages() {}
virtual uint32_t size() = 0;
- virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
- virtual bool consume(QueuedMessage& msg) = 0;
+ virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0;
+ virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 4d145d321d..1497b678a0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -31,7 +31,10 @@
#include "tests/storePerftools/common/ScopedTimer.h"
#include "tests/storePerftools/common/Thread.h"
+#include "qpid/Modules.h" // Use with loading store as module
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/asyncStore/AsyncStoreOptions.h"
+#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -161,16 +164,15 @@ PerfTest::destroyQueues()
}
}
-}}} // namespace tests::storePerftools::asyncPerf
-
-// -----------------------------------------------------------------
-
int
-main(int argc, char** argv)
+runPerfTest(int argc, char** argv)
{
+ // Load async store module
+ qpid::tryShlib ("asyncStore.so", false);
+
qpid::CommonOptions co;
qpid::asyncStore::AsyncStoreOptions aso;
- tests::storePerftools::asyncPerf::TestOptions to;
+ TestOptions to;
qpid::Options opts;
opts.add(co).add(aso).add(to);
try {
@@ -203,5 +205,16 @@ main(int argc, char** argv)
// Print test result
std::cout << apt << std::endl;
//::sleep(1);
+
return 0;
}
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+// -----------------------------------------------------------------
+
+int
+main(int argc, char** argv)
+{
+ return tests::storePerftools::asyncPerf::runPerfTest(argc, argv);
+}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 6cdf015f76..7cbb71322f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -36,6 +36,9 @@
#include <deque>
namespace qpid {
+namespace broker {
+class AsyncStore;
+}
namespace asyncStore {
class AsyncStoreImpl;
class AsyncStoreOptions;
@@ -83,6 +86,8 @@ private:
};
+int runPerfTest(int argc, char** argv);
+
}}} // namespace tests::storePerftools::asyncPerf
#endif // tests_storePerftools_asyncPerf_PerfTest_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
new file mode 100644
index 0000000000..2eba7d5be5
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
@@ -0,0 +1,52 @@
+#include "PersistableQueuedMessage.h"
+
+#include "SimpleQueue.h"
+#include "SimpleMessage.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+PersistableQueuedMessage::PersistableQueuedMessage()
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg) :
+ QueuedMessage(q, msg),
+ m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) :
+ QueuedMessage(pm),
+ m_enqHandle(pm.m_enqHandle)
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) :
+ QueuedMessage(pm),
+ m_enqHandle(pm->m_enqHandle)
+{}
+
+PersistableQueuedMessage::~PersistableQueuedMessage()
+{}
+
+PersistableQueuedMessage&
+PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs)
+{
+ QueuedMessage::operator=(rhs);
+ m_enqHandle = rhs.m_enqHandle;
+ return *this;
+}
+
+const qpid::broker::EnqueueHandle&
+PersistableQueuedMessage::enqHandle() const
+{
+ return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+PersistableQueuedMessage::enqHandle()
+{
+ return m_enqHandle;
+}
+
+}}} // tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
new file mode 100644
index 0000000000..1e9446aa57
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
@@ -0,0 +1,31 @@
+#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
+#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
+
+#include "QueuedMessage.h"
+
+#include "qpid/broker/EnqueueHandle.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class PersistableQueuedMessage : public QueuedMessage {
+public:
+ PersistableQueuedMessage();
+ PersistableQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg);
+ PersistableQueuedMessage(const PersistableQueuedMessage& pqm);
+ PersistableQueuedMessage(PersistableQueuedMessage* const pqm);
+ virtual ~PersistableQueuedMessage();
+ PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs);
+
+ const qpid::broker::EnqueueHandle& enqHandle() const;
+ qpid::broker::EnqueueHandle& enqHandle();
+
+private:
+ qpid::broker::EnqueueHandle m_enqHandle;
+};
+
+}}} // tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index 112a5ab1dd..3a8850c699 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -31,6 +31,10 @@
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
+namespace qpid {
+namespace broker {
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+}}
namespace tests {
namespace storePerftools {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 11af7c9466..a733a96171 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -38,28 +38,25 @@ QueuedMessage::QueuedMessage() :
QueuedMessage::QueuedMessage(SimpleQueue* q,
boost::intrusive_ptr<SimpleMessage> msg) :
+ boost::enable_shared_from_this<QueuedMessage>(),
m_queue(q),
- m_msg(msg),
- m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0))
+ m_msg(msg)
{}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
+ boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm.m_queue),
- m_msg(qm.m_msg),
- m_enqHandle(qm.m_enqHandle)
+ m_msg(qm.m_msg)
{}
-QueuedMessage::~QueuedMessage()
+QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
+ boost::enable_shared_from_this<QueuedMessage>(),
+ m_queue(qm->m_queue),
+ m_msg(qm->m_msg)
{}
-QueuedMessage&
-QueuedMessage::operator=(const QueuedMessage& rhs)
-{
- m_queue = rhs.m_queue;
- m_msg = rhs.m_msg;
- m_enqHandle = rhs.m_enqHandle;
- return *this;
-}
+QueuedMessage::~QueuedMessage()
+{}
SimpleQueue*
QueuedMessage::getQueue() const
@@ -73,22 +70,10 @@ QueuedMessage::payload() const
return m_msg;
}
-const qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle() const
-{
- return m_enqHandle;
-}
-
-qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle()
-{
- return m_enqHandle;
-}
-
void
QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
{
- m_queue->enqueue(th, *this);
+ m_queue->enqueue(th, shared_from_this());
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index 12c8e4da08..7d4e5bbbe4 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -24,8 +24,9 @@
#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_
#define tests_storePerftools_asyncPerf_QueuedMessage_h_
-#include "qpid/broker/EnqueueHandle.h"
+#include "qpid/broker/AsyncStore.h"
+#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -42,19 +43,17 @@ namespace asyncPerf {
class SimpleMessage;
class SimpleQueue;
-class QueuedMessage
+class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage>
{
public:
QueuedMessage();
QueuedMessage(SimpleQueue* q,
boost::intrusive_ptr<SimpleMessage> msg);
QueuedMessage(const QueuedMessage& qm);
- ~QueuedMessage();
- QueuedMessage& operator=(const QueuedMessage& rhs);
+ QueuedMessage(QueuedMessage* const qm);
+ virtual ~QueuedMessage();
SimpleQueue* getQueue() const;
boost::intrusive_ptr<SimpleMessage> payload() const;
- const qpid::broker::EnqueueHandle& enqHandle() const;
- qpid::broker::EnqueueHandle& enqHandle();
// -- Transaction handling ---
void prepareEnqueue(qpid::broker::TxnHandle& th);
@@ -64,7 +63,6 @@ public:
private:
SimpleQueue* m_queue;
boost::intrusive_ptr<SimpleMessage> m_msg;
- qpid::broker::EnqueueHandle m_enqHandle;
};
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
index 29db6ceaf2..889f7a4cdd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
@@ -30,11 +30,20 @@ namespace storePerftools {
namespace asyncPerf {
SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_store(0),
+ m_msgHandle(qpid::broker::MessageHandle())
+{}
+
+SimpleMessage::SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store) :
+ qpid::broker::AsyncStore* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
- m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0))
+ m_store(store),
+ m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle())
{}
SimpleMessage::~SimpleMessage()
@@ -95,7 +104,7 @@ SimpleMessage::encodedHeaderSize() const
bool
SimpleMessage::isPersistent() const
{
- return m_msgHandle.isValid();
+ return m_store != 0;
}
uint64_t
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
index 1b3e034814..01f54c1c19 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
@@ -28,13 +28,6 @@
#include "qpid/broker/MessageHandle.h"
#include "qpid/broker/PersistableMessage.h"
-#include <set>
-
-namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}}
-
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -46,8 +39,10 @@ class SimpleMessage: public qpid::broker::PersistableMessage,
{
public:
SimpleMessage(const char* msgData,
+ const uint32_t msgSize);
+ SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store);
+ qpid::broker::AsyncStore* store);
virtual ~SimpleMessage();
const qpid::broker::MessageHandle& getHandle() const;
qpid::broker::MessageHandle& getHandle();
@@ -71,6 +66,8 @@ public:
private:
mutable uint64_t m_persistenceId;
const std::string m_msg;
+ qpid::broker::AsyncStore* m_store;
+
qpid::broker::MessageHandle m_msgHandle;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 8bb79367ed..3bce2fb52a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -26,13 +26,15 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "SimpleMessage.h"
+#include "PersistableQueuedMessage.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/AsyncResultHandle.h"
-#include "qpid/broker/TxnHandle.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat
SimpleQueue::SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq) :
qpid::broker::PersistableQueue(),
m_name(name),
@@ -117,7 +119,7 @@ SimpleQueue::getHandle()
return m_queueHandle;
}
-qpid::asyncStore::AsyncStoreImpl*
+qpid::broker::AsyncStore*
SimpleQueue::getStore()
{
return m_store;
@@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
+//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm1.get());
enqueue(s_nullTxnHandle, qm);
+//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm2.get());
push(qm);
}
bool
SimpleQueue::dispatch(MessageConsumer& mc)
{
- QueuedMessage qm;
+ boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
mc.record(dr);
@@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(QueuedMessage& qm)
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
{
return enqueue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return false;
}
bool
-SimpleQueue::dequeue(QueuedMessage& qm)
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
{
return dequeue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->dequeueAsync(shared_from_this(), m_store);
+//assert(qm.get());
+//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm.get());
+//return asyncDequeue(th, pqm);
+ return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return true;
}
@@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
push(qm);
}
@@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse()
// private
void
-SimpleQueue::push(QueuedMessage& qm,
+SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
- QueuedMessage removed;
- m_messages->push(qm, removed);
+boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+assert(pqm.get());
+
+ m_messages->push(qm);
}
// --- End Members & methods in msg handling path from qpid::Queue ---
@@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm,
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
- qm.payload()->setPersistenceId(m_store->getNextRid());
-//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(pqm.get());
+// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(qm.enqHandle(),
+ m_store->submitEnqueue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
+ assert(pqm.get());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(qm.enqHandle(),
+ m_store->submitDequeue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 59e12b5c93..81ea8b022b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -40,7 +40,6 @@ class AsyncStoreImpl;
}
namespace broker {
class AsyncResultQueue;
-class TxnHandle;
}
namespace framing {
class FieldTable;
@@ -52,9 +51,10 @@ namespace asyncPerf {
class MessageConsumer;
class Messages;
-class SimpleMessage;
+class PersistableQueuedMessage;
class QueueAsyncContext;
class QueuedMessage;
+class SimpleMessage;
class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
public qpid::broker::PersistableQueue,
@@ -63,14 +63,14 @@ class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
public:
SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq);
virtual ~SimpleQueue();
static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
- qpid::asyncStore::AsyncStoreImpl* getStore();
+ qpid::broker::AsyncStore* getStore();
void asyncCreate();
void asyncDestroy(const bool deleteQueue);
@@ -78,12 +78,12 @@ public:
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
bool dispatch(MessageConsumer& mc);
- bool enqueue(QueuedMessage& qm);
+ bool enqueue(boost::shared_ptr<QueuedMessage> qm);
bool enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
- bool dequeue(QueuedMessage& qm);
+ boost::shared_ptr<QueuedMessage> qm);
+ bool dequeue(boost::shared_ptr<QueuedMessage> qm);
bool dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<QueuedMessage> qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -106,9 +106,9 @@ private:
static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations
const std::string m_name;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- qpid::asyncStore::AsyncOpCounter m_asyncOpCounter;
+ qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter!
mutable uint64_t m_persistenceId;
std::string m_persistableData;
qpid::broker::QueueHandle m_queueHandle;
@@ -135,14 +135,14 @@ private:
};
UsageBarrier m_barrier;
std::auto_ptr<Messages> m_messages;
- void push(QueuedMessage& qm,
+ void push(boost::shared_ptr<QueuedMessage> qm,
bool isRecovery = false);
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<PersistableQueuedMessage> pqm);
bool asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<PersistableQueuedMessage> pqm);
// --- Async op counter ---
void destroyCheck(const std::string& opDescr) const;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
index 3c5b99b5d5..7bede50272 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -25,6 +25,8 @@
#include "DeliveryRecord.h"
+#include "qpid/log/Statement.h"
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -41,15 +43,14 @@ TxnAccept::~TxnAccept()
bool
TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
{
-//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush;
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->dequeue(th);
}
} catch (const std::exception& e) {
- std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)");
}
return false;
}
@@ -57,23 +58,20 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
void
TxnAccept::commit() throw()
{
-//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush;
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
(*i)->committed();
(*i)->setEnded();
}
} catch (const std::exception& e) {
- std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what());
} catch(...) {
- std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)");
}
}
void
TxnAccept::rollback() throw()
-{
-//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush;
-}
+{}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 10e48bef82..0c34520d06 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -21,11 +21,16 @@
* \file TxnPublish.cpp
*/
-#include "SimpleMessage.h"
-#include "SimpleQueue.h" // debug msg
#include "TxnPublish.h"
+#include "PersistableQueuedMessage.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h" // debug msg
+
+#include "qpid/log/Statement.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -33,9 +38,7 @@ namespace asyncPerf {
TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
-{
-//std::cout << "TTT new TxnPublish" << std::endl << std::flush;
-}
+{}
TxnPublish::~TxnPublish()
{}
@@ -43,7 +46,6 @@ TxnPublish::~TxnPublish()
bool
TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
{
-//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush;
try{
while (!m_queues.empty()) {
m_queues.front()->prepareEnqueue(th);
@@ -52,9 +54,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
return true;
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)");
}
return false;
}
@@ -62,30 +64,28 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
void
TxnPublish::commit() throw()
{
-//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)");
}
}
void
TxnPublish::rollback() throw()
{
-//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)");
}
}
@@ -98,8 +98,12 @@ TxnPublish::contentSize()
void
TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
-//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush;
- boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg));
+ boost::shared_ptr<QueuedMessage> qm;
+ if (m_msg->isPersistent() && queue->getStore()) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
+ }
m_queues.push_back(qm);
m_delivered = true;
}