diff options
Diffstat (limited to 'cpp/src/qpid/legacystore/TxnCtxt.cpp')
-rw-r--r-- | cpp/src/qpid/legacystore/TxnCtxt.cpp | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/TxnCtxt.cpp b/cpp/src/qpid/legacystore/TxnCtxt.cpp new file mode 100644 index 0000000000..1db41f4c70 --- /dev/null +++ b/cpp/src/qpid/legacystore/TxnCtxt.cpp @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/legacystore/TxnCtxt.h" + +#include <sstream> + +#include "qpid/legacystore/jrnl/jexception.h" +#include "qpid/legacystore/StoreException.h" + +namespace mrg { +namespace msgstore { + +void TxnCtxt::completeTxn(bool commit) { + sync(); + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) { + commitTxn(static_cast<JournalImpl*>(*i), commit); + } + impactedQueues.clear(); + if (preparedXidStorePtr) + commitTxn(preparedXidStorePtr, commit); +} + +void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { + if (jc && loggedtx) { /* if using journal */ + boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl); + dtokp->addRef(); + dtokp->set_external_rid(true); + dtokp->set_rid(loggedtx->next()); + try { + if (commit) { + jc->txn_commit(dtokp.get(), getXid()); + sync(); + } else { + jc->txn_abort(dtokp.get(), getXid()); + } + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Error commit") + e.what()); + } + } +} + +// static +uuid_t TxnCtxt::uuid; + +// static +IdSequence TxnCtxt::uuidSeq; + +// static +bool TxnCtxt::staticInit = TxnCtxt::setUuid(); + +// static +bool TxnCtxt::setUuid() { + ::uuid_generate(uuid); + return true; +} + +TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) { + if (loggedtx) { +// // Human-readable tid: 53 bytes +// // uuit_t is a char[16] +// tid.reserve(53); +// u_int64_t* u1 = (u_int64_t*)uuid; +// u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t)); +// std::stringstream s; +// s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2; +// tid.assign(s.str()); + + // Binary tid: 24 bytes + tid.reserve(24); + u_int64_t c = uuidSeq.next(); + tid.append((char*)&c, sizeof(c)); + tid.append((char*)&uuid, sizeof(uuid)); + } +} + +TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {} + +TxnCtxt::~TxnCtxt() { abort(); } + +void TxnCtxt::sync() { + if (loggedtx) { + try { + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) + jrnl_flush(static_cast<JournalImpl*>(*i)); + if (preparedXidStorePtr) + jrnl_flush(preparedXidStorePtr); + for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) + jrnl_sync(static_cast<JournalImpl*>(*i), &journal::jcntl::_aio_cmpl_timeout); + if (preparedXidStorePtr) + jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout); + } catch (const journal::jexception& e) { + THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what()); + } + } +} + +void TxnCtxt::jrnl_flush(JournalImpl* jc) { + if (jc && !(jc->is_txn_synced(getXid()))) + jc->flush(); +} + +void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) { + if (!jc || jc->is_txn_synced(getXid())) + return; + while (jc->get_wr_aio_evt_rem()) { + if (jc->get_wr_events(timeout) == journal::jerrno::AIO_TIMEOUT && timeout) + THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()")); + } +} + +void TxnCtxt::begin(DbEnv* env, bool sync) { + int err; + try { err = env->txn_begin(0, &txn, 0); } + catch (const DbException&) { txn = 0; throw; } + if (err != 0) { + std::ostringstream oss; + oss << "Error: Env::txn_begin() returned error code: " << err; + THROW_STORE_EXCEPTION(oss.str()); + } + if (sync) + globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); +} + +void TxnCtxt::commit() { + if (txn) { + txn->commit(0); + txn = 0; + globalHolder.reset(); + } +} + +void TxnCtxt::abort(){ + if (txn) { + txn->abort(); + txn = 0; + globalHolder.reset(); + } +} + +DbTxn* TxnCtxt::get() { return txn; } + +bool TxnCtxt::isTPC() { return false; } + +const std::string& TxnCtxt::getXid() { return tid; } + +void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); } + +void TxnCtxt::complete(bool commit) { completeTxn(commit); } + +bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); } + +DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); } + +void TxnCtxt::incrDtokRef() { dtokp->addRef(); } + +void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) { + dtokp->set_rid(rid); + dtokp->set_wstate(DataTokenImpl::ENQ); + dtokp->set_xid(xid); + dtokp->set_external_rid(true); +} + +TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {} + +}} |