diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-14 13:06:21 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-14 13:06:21 +0000 |
commit | b6851f7bafd90d24bb518b63e7fc9f91e1cd84eb (patch) | |
tree | 95673bd4889de541e6e8b853e1a5dadc6704827f /cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp | |
parent | a3861c46a7151a250fd06f54a60b9c1fe4bd6a1e (diff) | |
download | qpid-python-b6851f7bafd90d24bb518b63e7fc9f91e1cd84eb.tar.gz |
QPID-3858: Fix directory names to match namespaces in test dir; Changed MockPersistableQueue to use intrusive pointers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1338185 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp new file mode 100644 index 0000000000..10be34c6f5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.cpp + */ + +#include "MockTransactionContext.h" + +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockTransactionContext::QueueContext --- + +MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_tc(tc), + m_op(op) +{} + +MockTransactionContext::TransactionContext::~TransactionContext() +{} + +const char* +MockTransactionContext::TransactionContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockTransactionContext::TransactionContext::destroy() +{ + delete this; +} + +// --- Class MockTransactionContext --- + + +MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, + const std::string& xid) : + m_store(store), + m_txnHandle(store->createTxnHandle(xid)), + m_prepared(false), + m_enqueuedMsgs() +{ +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + +MockTransactionContext::~MockTransactionContext() +{} + +// static +void +MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + TransactionContext* tc = dynamic_cast<TransactionContext*>(bc); + if (tc->m_tc) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(tc->m_op) { + case qpid::asyncStore::AsyncOperation::TXN_PREPARE: + tc->m_tc->prepareComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_COMMIT: + tc->m_tc->commitComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_ABORT: + tc->m_tc->abortComplete(tc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::TxnHandle& +MockTransactionContext::getHandle() +{ + return m_txnHandle; +} + +bool +MockTransactionContext::is2pc() const +{ + return m_txnHandle.is2pc(); +} + +const std::string& +MockTransactionContext::getXid() const +{ + return m_txnHandle.getXid(); +} + +void +MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); +} + +void +MockTransactionContext::prepare() +{ + if (m_txnHandle.is2pc()) { + localPrepare(); + m_prepared = true; + } + std::ostringstream oss; + oss << "MockTransactionContext::prepare(): xid=\"" << getXid() + << "\": Transaction Error: called prepare() on local transaction"; + throw qpid::Exception(oss.str()); +} + +void +MockTransactionContext::abort() +{ + // TODO: Check the following XA transaction semantics: + // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. + if (!m_prepared) { + localPrepare(); + } + m_store->submitAbort(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); +//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +void +MockTransactionContext::commit() +{ + if (is2pc()) { + if (!m_prepared) { + std::ostringstream oss; + oss << "MockTransactionContext::abort(): xid=\"" << getXid() + << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; + throw qpid::Exception(oss.str()); + } + } else { + localPrepare(); + } + m_store->submitCommit(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); +//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + + +// protected +void +MockTransactionContext::localPrepare() +{ + m_store->submitPrepare(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); +//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionContext* tc) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + while (!m_enqueuedMsgs.empty()) { + m_enqueuedMsgs.front()->clearTransaction(); + m_enqueuedMsgs.pop_front(); + } +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + + +// protected +void +MockTransactionContext::abortComplete(const TransactionContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + + +// protected +void +MockTransactionContext::commitComplete(const TransactionContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); +} + +}}} // namespace tests::storePerftools::asyncPerf |