diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp new file mode 100644 index 0000000000..8336d36b80 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/journal/txn_map.h" + +#include "qpid/linearstore/journal/slock.h" + +namespace qpid { +namespace linearstore { +namespace journal { + +// return/error codes +int16_t txn_map::TMAP_RID_NOT_FOUND = -2; +int16_t txn_map::TMAP_XID_NOT_FOUND = -1; +int16_t txn_map::TMAP_OK = 0; +int16_t txn_map::TMAP_NOT_SYNCED = 0; +int16_t txn_map::TMAP_SYNCED = 1; + +txn_data_t::txn_data_t(const uint64_t rid, + const uint64_t drid, + const uint64_t fid, + const uint64_t foffs, + const bool enq_flag, + const bool tpc_flag, + const bool commit_flag): + rid_(rid), + drid_(drid), + fid_(fid), + foffs_(foffs), + enq_flag_(enq_flag), + tpc_flag_(tpc_flag), + commit_flag_(commit_flag), + aio_compl_(false) +{} + +txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) : + enqCnt(0U), + deqCnt(0U), + tpcCnt(0U), + abortCnt(0U), + commitCnt(0U), + rid(0ULL) +{ + for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) { + if (i->enq_flag_) { + ++enqCnt; + rid = i->rid_; + } else { + ++deqCnt; + if (i->commit_flag_) { + ++commitCnt; + } else { + ++abortCnt; + } + } + if (i->tpc_flag_) { + ++tpcCnt; + } + } + if (tpcCnt > 0 && tpcCnt != tdl.size()) { + throw jexception("Inconsistent 2PC count"); // TODO: complete exception details + } + if (abortCnt > 0 && commitCnt > 0) { + throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details + } +} + +txn_map::txn_map(): + _map()/*, + _pfid_txn_cnt()*/ +{} + +txn_map::~txn_map() {} + +bool +txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td) +{ + bool ok = true; + slock s(_mutex); + xmap_itr itr = _map.find(xid); + if (itr == _map.end()) // not found in map + { + txn_data_list_t list; + list.push_back(td); + std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list)); + if (!ret.second) // duplicate + ok = false; + } + else + itr->second.push_back(td); + return ok; +} + +const txn_data_list_t +txn_map::get_tdata_list(const std::string& xid) +{ + slock s(_mutex); + return get_tdata_list_nolock(xid); +} + +const txn_data_list_t +txn_map::get_tdata_list_nolock(const std::string& xid) +{ + xmap_itr itr = _map.find(xid); + if (itr == _map.end()) // not found in map + return _empty_data_list; + return itr->second; +} + +const txn_data_list_t +txn_map::get_remove_tdata_list(const std::string& xid) +{ + slock s(_mutex); + xmap_itr itr = _map.find(xid); + if (itr == _map.end()) // not found in map + return _empty_data_list; + txn_data_list_t list = itr->second; + _map.erase(itr); + return list; +} + +bool +txn_map::in_map(const std::string& xid) +{ + slock s(_mutex); + xmap_itr itr= _map.find(xid); + return itr != _map.end(); +} + +uint32_t +txn_map::enq_cnt() +{ + return cnt(true); +} + +uint32_t +txn_map::deq_cnt() +{ + return cnt(true); +} + +uint32_t +txn_map::cnt(const bool enq_flag) +{ + slock s(_mutex); + uint32_t c = 0; + for (xmap_itr i = _map.begin(); i != _map.end(); i++) + { + for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++) + { + if (j->enq_flag_ == enq_flag) + c++; + } + } + return c; +} + +int16_t +txn_map::is_txn_synced(const std::string& xid) +{ + slock s(_mutex); + xmap_itr itr = _map.find(xid); + if (itr == _map.end()) // not found in map + return TMAP_XID_NOT_FOUND; + bool is_synced = true; + for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++) + { + if (!litr->aio_compl_) + { + is_synced = false; + break; + } + } + return is_synced ? TMAP_SYNCED : TMAP_NOT_SYNCED; +} + +int16_t +txn_map::set_aio_compl(const std::string& xid, const uint64_t rid) +{ + slock s(_mutex); + xmap_itr itr = _map.find(xid); + if (itr == _map.end()) // xid not found in map + return TMAP_XID_NOT_FOUND; + for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++) + { + if (litr->rid_ == rid) + { + litr->aio_compl_ = true; + return TMAP_OK; // rid found + } + } + // xid present, but rid not found + return TMAP_RID_NOT_FOUND; +} + +bool +txn_map::data_exists(const std::string& xid, const uint64_t rid) +{ + bool found = false; + { + slock s(_mutex); + txn_data_list_t tdl = get_tdata_list_nolock(xid); + tdl_itr_t itr = tdl.begin(); + while (itr != tdl.end() && !found) + { + found = itr->rid_ == rid; + itr++; + } + } + return found; +} + +bool +txn_map::is_enq(const uint64_t rid) +{ + bool found = false; + { + slock s(_mutex); + for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++) + { + txn_data_list_t list = i->second; + for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++) + { + if (j->enq_flag_) + found = j->rid_ == rid; + else + found = j->drid_ == rid; + } + } + } + return found; +} + +void +txn_map::xid_list(std::vector<std::string>& xv) +{ + xv.clear(); + { + slock s(_mutex); + for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++) + xv.push_back(itr->first); + } +} + +}}} |