diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp new file mode 100644 index 0000000000..e26f0b8b6f --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/TxnCtxt.h" + +#include "qpid/linearstore/DataTokenImpl.h" +#include "qpid/linearstore/JournalImpl.h" +#include "qpid/linearstore/StoreException.h" + +namespace qpid{ +namespace linearstore{ + +void TxnCtxt::completeTxn(bool commit) { + sync(); + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) { + commitTxn(static_cast<JournalImpl*>(*i), commit); + } + impactedQueues.clear(); + if (preparedXidStorePtr) + commitTxn(preparedXidStorePtr, commit); +} + +void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { + if (jc && loggedtx) { /* if using journal */ + boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl); + dtokp->addRef(); + dtokp->set_external_rid(true); + dtokp->set_rid(loggedtx->next()); + try { + if (commit) { + jc->txn_commit(dtokp.get(), getXid()); + sync(); + } else { + jc->txn_abort(dtokp.get(), getXid()); + } + } catch (const qpid::linearstore::journal::jexception& e) { + std::ostringstream oss; + oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what(); + THROW_STORE_EXCEPTION(oss.str()); + } + } +} + +// static +sys::uuid_t TxnCtxt::uuid; + +// static +IdSequence TxnCtxt::uuidSeq; + +// static +bool TxnCtxt::staticInit = TxnCtxt::setUuid(); + +// static +bool TxnCtxt::setUuid() { + qpid::sys::uuid_generate(uuid); + return true; +} + +TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) { + if (loggedtx) { +// // Human-readable tid: 53 bytes +// // uuit_t is a char[16] +// tid.reserve(53); +// uint64_t* u1 = (uint64_t*)uuid; +// uint64_t* u2 = (uint64_t*)(uuid + sizeof(uint64_t)); +// std::stringstream s; +// s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2; +// tid.assign(s.str()); + + // Binary tid: 24 bytes + tid.reserve(24); + uint64_t c = uuidSeq.next(); + tid.append((char*)&c, sizeof(c)); + tid.append((char*)&uuid, sizeof(uuid)); + } +} + +TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {} + +TxnCtxt::~TxnCtxt() { abort(); } + +void TxnCtxt::sync() { + if (loggedtx) { + try { + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) + jrnl_flush(static_cast<JournalImpl*>(*i)); + if (preparedXidStorePtr) + jrnl_flush(preparedXidStorePtr); + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) + jrnl_sync(static_cast<JournalImpl*>(*i), &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout); + if (preparedXidStorePtr) + jrnl_sync(preparedXidStorePtr, &qpid::linearstore::journal::jcntl::_aio_cmpl_timeout); + } catch (const qpid::linearstore::journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what()); + } + } +} + +void TxnCtxt::jrnl_flush(JournalImpl* jc) { + if (jc && !(jc->is_txn_synced(getXid()))) + jc->flush(false); +} + +void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) { + if (!jc || jc->is_txn_synced(getXid())) + return; + while (jc->get_wr_aio_evt_rem()) { + if (jc->get_wr_events(timeout) == qpid::linearstore::journal::jerrno::AIO_TIMEOUT && timeout) + THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()")); + } +} + +void TxnCtxt::begin(DbEnv* env, bool sync) { + int err; + try { err = env->txn_begin(0, &txn, 0); } + catch (const DbException&) { txn = 0; throw; } + if (err != 0) { + std::ostringstream oss; + oss << "Error: Env::txn_begin() returned error code: " << err; + THROW_STORE_EXCEPTION(oss.str()); + } + if (sync) + globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); +} + +void TxnCtxt::commit() { + if (txn) { + txn->commit(0); + txn = 0; + globalHolder.reset(); + } +} + +void TxnCtxt::abort(){ + if (txn) { + txn->abort(); + txn = 0; + globalHolder.reset(); + } +} + +DbTxn* TxnCtxt::get() { return txn; } + +bool TxnCtxt::isTPC() { return false; } + +const std::string& TxnCtxt::getXid() { return tid; } + +void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); } + +void TxnCtxt::complete(bool commit) { completeTxn(commit); } + +bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); } + +DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); } + +void TxnCtxt::incrDtokRef() { dtokp->addRef(); } + +void TxnCtxt::recoverDtok(const uint64_t rid, const std::string xid) { + dtokp->set_rid(rid); + dtokp->set_wstate(DataTokenImpl::ENQ); + dtokp->set_xid(xid); + dtokp->set_external_rid(true); +} + +TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {} + +}} |