summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-14 13:06:21 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-14 13:06:21 +0000
commitb6851f7bafd90d24bb518b63e7fc9f91e1cd84eb (patch)
tree95673bd4889de541e6e8b853e1a5dadc6704827f /cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
parenta3861c46a7151a250fd06f54a60b9c1fe4bd6a1e (diff)
downloadqpid-python-b6851f7bafd90d24bb518b63e7fc9f91e1cd84eb.tar.gz
QPID-3858: Fix directory names to match namespaces in test dir; Changed MockPersistableQueue to use intrusive pointers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1338185 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp222
1 files changed, 222 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
new file mode 100644
index 0000000000..10be34c6f5
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockTransactionContext.cpp
+ */
+
+#include "MockTransactionContext.h"
+
+#include "QueuedMessage.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockTransactionContext::QueueContext ---
+
+MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_tc(tc),
+ m_op(op)
+{}
+
+MockTransactionContext::TransactionContext::~TransactionContext()
+{}
+
+const char*
+MockTransactionContext::TransactionContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockTransactionContext::TransactionContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockTransactionContext ---
+
+
+MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
+ const std::string& xid) :
+ m_store(store),
+ m_txnHandle(store->createTxnHandle(xid)),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+}
+
+MockTransactionContext::~MockTransactionContext()
+{}
+
+// static
+void
+MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ TransactionContext* tc = dynamic_cast<TransactionContext*>(bc);
+ if (tc->m_tc) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(tc->m_op) {
+ case qpid::asyncStore::AsyncOperation::TXN_PREPARE:
+ tc->m_tc->prepareComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_COMMIT:
+ tc->m_tc->commitComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_ABORT:
+ tc->m_tc->abortComplete(tc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::TxnHandle&
+MockTransactionContext::getHandle()
+{
+ return m_txnHandle;
+}
+
+bool
+MockTransactionContext::is2pc() const
+{
+ return m_txnHandle.is2pc();
+}
+
+const std::string&
+MockTransactionContext::getXid() const
+{
+ return m_txnHandle.getXid();
+}
+
+void
+MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+}
+
+void
+MockTransactionContext::prepare()
+{
+ if (m_txnHandle.is2pc()) {
+ localPrepare();
+ m_prepared = true;
+ }
+ std::ostringstream oss;
+ oss << "MockTransactionContext::prepare(): xid=\"" << getXid()
+ << "\": Transaction Error: called prepare() on local transaction";
+ throw qpid::Exception(oss.str());
+}
+
+void
+MockTransactionContext::abort()
+{
+ // TODO: Check the following XA transaction semantics:
+ // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared.
+ if (!m_prepared) {
+ localPrepare();
+ }
+ m_store->submitAbort(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+void
+MockTransactionContext::commit()
+{
+ if (is2pc()) {
+ if (!m_prepared) {
+ std::ostringstream oss;
+ oss << "MockTransactionContext::abort(): xid=\"" << getXid()
+ << "\": Transaction Error: called commit() without prepare() on 2PC transaction";
+ throw qpid::Exception(oss.str());
+ }
+ } else {
+ localPrepare();
+ }
+ m_store->submitCommit(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::localPrepare()
+{
+ m_store->submitPrepare(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+// protected
+void
+MockTransactionContext::prepareComplete(const TransactionContext* tc)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ while (!m_enqueuedMsgs.empty()) {
+ m_enqueuedMsgs.front()->clearTransaction();
+ m_enqueuedMsgs.pop_front();
+ }
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
+}
+
+
+// protected
+void
+MockTransactionContext::abortComplete(const TransactionContext* tc)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
+}
+
+
+// protected
+void
+MockTransactionContext::commitComplete(const TransactionContext* tc)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush;
+ assert(tc->m_tc == this);
+}
+
+}}} // namespace tests::storePerftools::asyncPerf