summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp263
1 files changed, 263 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
new file mode 100644
index 0000000000..8336d36b80
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/journal/txn_map.h"
+
+#include "qpid/linearstore/journal/slock.h"
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+// return/error codes
+int16_t txn_map::TMAP_RID_NOT_FOUND = -2;
+int16_t txn_map::TMAP_XID_NOT_FOUND = -1;
+int16_t txn_map::TMAP_OK = 0;
+int16_t txn_map::TMAP_NOT_SYNCED = 0;
+int16_t txn_map::TMAP_SYNCED = 1;
+
+txn_data_t::txn_data_t(const uint64_t rid,
+ const uint64_t drid,
+ const uint64_t fid,
+ const uint64_t foffs,
+ const bool enq_flag,
+ const bool tpc_flag,
+ const bool commit_flag):
+ rid_(rid),
+ drid_(drid),
+ fid_(fid),
+ foffs_(foffs),
+ enq_flag_(enq_flag),
+ tpc_flag_(tpc_flag),
+ commit_flag_(commit_flag),
+ aio_compl_(false)
+{}
+
+txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) :
+ enqCnt(0U),
+ deqCnt(0U),
+ tpcCnt(0U),
+ abortCnt(0U),
+ commitCnt(0U),
+ rid(0ULL)
+{
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) {
+ if (i->enq_flag_) {
+ ++enqCnt;
+ rid = i->rid_;
+ } else {
+ ++deqCnt;
+ if (i->commit_flag_) {
+ ++commitCnt;
+ } else {
+ ++abortCnt;
+ }
+ }
+ if (i->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) {
+ throw jexception("Inconsistent 2PC count"); // TODO: complete exception details
+ }
+ if (abortCnt > 0 && commitCnt > 0) {
+ throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details
+ }
+}
+
+txn_map::txn_map():
+ _map()/*,
+ _pfid_txn_cnt()*/
+{}
+
+txn_map::~txn_map() {}
+
+bool
+txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td)
+{
+ bool ok = true;
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ {
+ txn_data_list_t list;
+ list.push_back(td);
+ std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+ if (!ret.second) // duplicate
+ ok = false;
+ }
+ else
+ itr->second.push_back(td);
+ return ok;
+}
+
+const txn_data_list_t
+txn_map::get_tdata_list(const std::string& xid)
+{
+ slock s(_mutex);
+ return get_tdata_list_nolock(xid);
+}
+
+const txn_data_list_t
+txn_map::get_tdata_list_nolock(const std::string& xid)
+{
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return _empty_data_list;
+ return itr->second;
+}
+
+const txn_data_list_t
+txn_map::get_remove_tdata_list(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return _empty_data_list;
+ txn_data_list_t list = itr->second;
+ _map.erase(itr);
+ return list;
+}
+
+bool
+txn_map::in_map(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr= _map.find(xid);
+ return itr != _map.end();
+}
+
+uint32_t
+txn_map::enq_cnt()
+{
+ return cnt(true);
+}
+
+uint32_t
+txn_map::deq_cnt()
+{
+ return cnt(true);
+}
+
+uint32_t
+txn_map::cnt(const bool enq_flag)
+{
+ slock s(_mutex);
+ uint32_t c = 0;
+ for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+ {
+ for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->enq_flag_ == enq_flag)
+ c++;
+ }
+ }
+ return c;
+}
+
+int16_t
+txn_map::is_txn_synced(const std::string& xid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ return TMAP_XID_NOT_FOUND;
+ bool is_synced = true;
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ {
+ if (!litr->aio_compl_)
+ {
+ is_synced = false;
+ break;
+ }
+ }
+ return is_synced ? TMAP_SYNCED : TMAP_NOT_SYNCED;
+}
+
+int16_t
+txn_map::set_aio_compl(const std::string& xid, const uint64_t rid)
+{
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // xid not found in map
+ return TMAP_XID_NOT_FOUND;
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ {
+ if (litr->rid_ == rid)
+ {
+ litr->aio_compl_ = true;
+ return TMAP_OK; // rid found
+ }
+ }
+ // xid present, but rid not found
+ return TMAP_RID_NOT_FOUND;
+}
+
+bool
+txn_map::data_exists(const std::string& xid, const uint64_t rid)
+{
+ bool found = false;
+ {
+ slock s(_mutex);
+ txn_data_list_t tdl = get_tdata_list_nolock(xid);
+ tdl_itr_t itr = tdl.begin();
+ while (itr != tdl.end() && !found)
+ {
+ found = itr->rid_ == rid;
+ itr++;
+ }
+ }
+ return found;
+}
+
+bool
+txn_map::is_enq(const uint64_t rid)
+{
+ bool found = false;
+ {
+ slock s(_mutex);
+ for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
+ {
+ txn_data_list_t list = i->second;
+ for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++)
+ {
+ if (j->enq_flag_)
+ found = j->rid_ == rid;
+ else
+ found = j->drid_ == rid;
+ }
+ }
+ }
+ return found;
+}
+
+void
+txn_map::xid_list(std::vector<std::string>& xv)
+{
+ xv.clear();
+ {
+ slock s(_mutex);
+ for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ xv.push_back(itr->first);
+ }
+}
+
+}}}