summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp240
1 files changed, 240 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp
new file mode 100644
index 0000000000..2aea14bc21
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleTransactionContext.cpp
+ */
+
+#include "SimpleTransactionContext.h"
+
+#include "TransactionAsyncContext.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+#include <uuid/uuid.h>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+SimpleTransactionContext::SimpleTransactionContext(const std::string& xid) :
+ qpid::broker::TransactionContext(),
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_store(0),
+ m_txnHandle(0),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+ if (!m_tpcFlag) {
+ setLocalXid();
+ }
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+}
+
+SimpleTransactionContext::SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
+ const std::string& xid) :
+ m_store(store),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+ if (m_store != 0) {
+ m_txnHandle = store->createTxnHandle(xid);
+ }
+}
+
+SimpleTransactionContext::~SimpleTransactionContext()
+{}
+
+// static
+/*
+void
+SimpleTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerAsyncContext* bc)
+{
+ if (bc && res) {
+ TransactionAsyncContext* tac = dynamic_cast<TransactionAsyncContext*>(bc);
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(tac->getOpCode()) {
+ case qpid::asyncStore::AsyncOperation::TXN_PREPARE:
+ tac->getTransactionContext()->prepareComplete(tac);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_COMMIT:
+ tac->getTransactionContext()->commitComplete(tac);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_ABORT:
+ tac->getTransactionContext()->abortComplete(tac);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::SimpleTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode();
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+*/
+
+const qpid::broker::TxnHandle&
+SimpleTransactionContext::getHandle() const
+{
+ return m_txnHandle;
+}
+
+qpid::broker::TxnHandle&
+SimpleTransactionContext::getHandle()
+{
+ return m_txnHandle;
+}
+
+bool
+SimpleTransactionContext::is2pc() const
+{
+ return m_tpcFlag;
+}
+
+const std::string&
+SimpleTransactionContext::getXid() const
+{
+ return m_xid;
+}
+
+void
+SimpleTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+}
+
+void
+SimpleTransactionContext::prepare()
+{
+ if (m_tpcFlag) {
+ localPrepare();
+ m_prepared = true;
+ }
+ std::ostringstream oss;
+ oss << "SimpleTransactionContext::prepare(): xid=\"" << getXid()
+ << "\": Transaction Error: called prepare() on local transaction";
+ throw qpid::Exception(oss.str());
+}
+
+void
+SimpleTransactionContext::abort()
+{
+ // TODO: Check the following XA transaction semantics:
+ // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared.
+ if (!m_prepared) {
+ localPrepare();
+ }
+ if (m_store != 0) {
+// m_store->submitAbort(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+ }
+//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+void
+SimpleTransactionContext::commit()
+{
+ if (is2pc()) {
+ if (!m_prepared) {
+ std::ostringstream oss;
+ oss << "SimpleTransactionContext::abort(): xid=\"" << getXid()
+ << "\": Transaction Error: called commit() without prepare() on 2PC transaction";
+ throw qpid::Exception(oss.str());
+ }
+ } else {
+ localPrepare();
+ }
+ if (m_store != 0) {
+// m_store->submitCommit(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+ }
+//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+
+// private
+void
+SimpleTransactionContext::localPrepare()
+{
+ if (m_store != 0) {
+// m_store->submitPrepare(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+ }
+//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+// private
+void
+SimpleTransactionContext::setLocalXid()
+{
+ uuid_t uuid;
+ // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter?
+ // If this race condition affects the randomness of the UUID, then there could be a problem here.
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+}
+
+// private
+void
+SimpleTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+// while (!m_enqueuedMsgs.empty()) {
+// m_enqueuedMsgs.front()->clearTransaction();
+// m_enqueuedMsgs.pop_front();
+// }
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush;
+// assert(tc->getTransactionContext().get() == this);
+}
+
+
+// private
+void
+SimpleTransactionContext::abortComplete(const TransactionAsyncContext* tc)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush;
+ assert(tc->getTransactionContext().get() == this);
+}
+
+
+// private
+void
+SimpleTransactionContext::commitComplete(const TransactionAsyncContext* tc)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush;
+ assert(tc->getTransactionContext().get() == this);
+}
+
+}}} // namespace tests::storePerftools::asyncPerf