summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp36
1 files changed, 20 insertions, 16 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 10e48bef82..0c34520d06 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -21,11 +21,16 @@
* \file TxnPublish.cpp
*/
-#include "SimpleMessage.h"
-#include "SimpleQueue.h" // debug msg
#include "TxnPublish.h"
+#include "PersistableQueuedMessage.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h" // debug msg
+
+#include "qpid/log/Statement.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -33,9 +38,7 @@ namespace asyncPerf {
TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
-{
-//std::cout << "TTT new TxnPublish" << std::endl << std::flush;
-}
+{}
TxnPublish::~TxnPublish()
{}
@@ -43,7 +46,6 @@ TxnPublish::~TxnPublish()
bool
TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
{
-//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush;
try{
while (!m_queues.empty()) {
m_queues.front()->prepareEnqueue(th);
@@ -52,9 +54,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
return true;
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)");
}
return false;
}
@@ -62,30 +64,28 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
void
TxnPublish::commit() throw()
{
-//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)");
}
}
void
TxnPublish::rollback() throw()
{
-//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)");
}
}
@@ -98,8 +98,12 @@ TxnPublish::contentSize()
void
TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
-//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush;
- boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg));
+ boost::shared_ptr<QueuedMessage> qm;
+ if (m_msg->isPersistent() && queue->getStore()) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
+ }
m_queues.push_back(qm);
m_delivered = true;
}