summaryrefslogtreecommitdiff
path: root/storage/tokudb/PerconaFT/locktree/lock_request.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/locktree/lock_request.cc')
-rw-r--r--storage/tokudb/PerconaFT/locktree/lock_request.cc402
1 files changed, 402 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc
new file mode 100644
index 00000000000..22b6da9afc4
--- /dev/null
+++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc
@@ -0,0 +1,402 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "portability/toku_race_tools.h"
+
+#include "ft/txn/txn.h"
+#include "locktree/locktree.h"
+#include "locktree/lock_request.h"
+#include "util/dbt.h"
+
+namespace toku {
+
+// initialize a lock request's internals
+void lock_request::create(void) {
+ m_txnid = TXNID_NONE;
+ m_conflicting_txnid = TXNID_NONE;
+ m_start_time = 0;
+ m_left_key = nullptr;
+ m_right_key = nullptr;
+ toku_init_dbt(&m_left_key_copy);
+ toku_init_dbt(&m_right_key_copy);
+
+ m_type = type::UNKNOWN;
+ m_lt = nullptr;
+
+ m_complete_r = 0;
+ m_state = state::UNINITIALIZED;
+ m_info = nullptr;
+
+ toku_cond_init(&m_wait_cond, nullptr);
+
+ m_start_test_callback = nullptr;
+ m_retry_test_callback = nullptr;
+}
+
+// destroy a lock request.
+void lock_request::destroy(void) {
+ invariant(m_state != state::PENDING);
+ invariant(m_state != state::DESTROYED);
+ m_state = state::DESTROYED;
+ toku_destroy_dbt(&m_left_key_copy);
+ toku_destroy_dbt(&m_right_key_copy);
+ toku_cond_destroy(&m_wait_cond);
+}
+
+// set the lock request parameters. this API allows a lock request to be reused.
+void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) {
+ invariant(m_state != state::PENDING);
+ m_lt = lt;
+ m_txnid = txnid;
+ m_left_key = left_key;
+ m_right_key = right_key;
+ toku_destroy_dbt(&m_left_key_copy);
+ toku_destroy_dbt(&m_right_key_copy);
+ m_type = lock_type;
+ m_state = state::INITIALIZED;
+ m_info = lt ? lt->get_lock_request_info() : nullptr;
+ m_big_txn = big_txn;
+}
+
+// get rid of any stored left and right key copies and
+// replace them with copies of the given left and right key
+void lock_request::copy_keys() {
+ if (!toku_dbt_is_infinite(m_left_key)) {
+ toku_clone_dbt(&m_left_key_copy, *m_left_key);
+ m_left_key = &m_left_key_copy;
+ }
+ if (!toku_dbt_is_infinite(m_right_key)) {
+ toku_clone_dbt(&m_right_key_copy, *m_right_key);
+ m_right_key = &m_right_key_copy;
+ }
+}
+
+// what are the conflicts for this pending lock request?
+void lock_request::get_conflicts(txnid_set *conflicts) {
+ invariant(m_state == state::PENDING);
+ const bool is_write_request = m_type == type::WRITE;
+ m_lt->get_conflicts(is_write_request, m_txnid, m_left_key, m_right_key, conflicts);
+}
+
+// build a wait-for-graph for this lock request and the given conflict set
+// for each transaction B that blocks A's lock request
+// if B is blocked then
+// add (A,T) to the WFG and if B is new, fill in the WFG from B
+void lock_request::build_wait_graph(wfg *wait_graph, const txnid_set &conflicts) {
+ size_t num_conflicts = conflicts.size();
+ for (size_t i = 0; i < num_conflicts; i++) {
+ TXNID conflicting_txnid = conflicts.get(i);
+ lock_request *conflicting_request = find_lock_request(conflicting_txnid);
+ invariant(conflicting_txnid != m_txnid);
+ invariant(conflicting_request != this);
+ if (conflicting_request) {
+ bool already_exists = wait_graph->node_exists(conflicting_txnid);
+ wait_graph->add_edge(m_txnid, conflicting_txnid);
+ if (!already_exists) {
+ // recursively build the wait for graph rooted at the conflicting
+ // request, given its set of lock conflicts.
+ txnid_set other_conflicts;
+ other_conflicts.create();
+ conflicting_request->get_conflicts(&other_conflicts);
+ conflicting_request->build_wait_graph(wait_graph, other_conflicts);
+ other_conflicts.destroy();
+ }
+ }
+ }
+}
+
+// returns: true if the current set of lock requests contains
+// a deadlock, false otherwise.
+bool lock_request::deadlock_exists(const txnid_set &conflicts) {
+ wfg wait_graph;
+ wait_graph.create();
+
+ build_wait_graph(&wait_graph, conflicts);
+ bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid);
+
+ wait_graph.destroy();
+ return deadlock;
+}
+
+// try to acquire a lock described by this lock request.
+int lock_request::start(void) {
+ int r;
+
+ txnid_set conflicts;
+ conflicts.create();
+ if (m_type == type::WRITE) {
+ r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
+ } else {
+ invariant(m_type == type::READ);
+ r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
+ }
+
+ // if the lock is not granted, save it to the set of lock requests
+ // and check for a deadlock. if there is one, complete it as failed
+ if (r == DB_LOCK_NOTGRANTED) {
+ copy_keys();
+ m_state = state::PENDING;
+ m_start_time = toku_current_time_microsec() / 1000;
+ m_conflicting_txnid = conflicts.get(0);
+ toku_mutex_lock(&m_info->mutex);
+ insert_into_lock_requests();
+ if (deadlock_exists(conflicts)) {
+ remove_from_lock_requests();
+ r = DB_LOCK_DEADLOCK;
+ }
+ toku_mutex_unlock(&m_info->mutex);
+ if (m_start_test_callback) m_start_test_callback(); // test callback
+ }
+
+ if (r != DB_LOCK_NOTGRANTED) {
+ complete(r);
+ }
+
+ conflicts.destroy();
+ return r;
+}
+
+// sleep on the lock request until it becomes resolved or the wait time has elapsed.
+int lock_request::wait(uint64_t wait_time_ms) {
+ return wait(wait_time_ms, 0, nullptr);
+}
+
+int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) {
+ uint64_t t_now = toku_current_time_microsec();
+ uint64_t t_start = t_now;
+ uint64_t t_end = t_start + wait_time_ms * 1000;
+
+ toku_mutex_lock(&m_info->mutex);
+
+ while (m_state == state::PENDING) {
+
+ // compute next wait time
+ uint64_t t_wait;
+ if (killed_time_ms == 0) {
+ t_wait = t_end;
+ } else {
+ t_wait = t_now + killed_time_ms * 1000;
+ if (t_wait > t_end)
+ t_wait = t_end;
+ }
+ struct timespec ts = {};
+ ts.tv_sec = t_wait / 1000000;
+ ts.tv_nsec = (t_wait % 1000000) * 1000;
+ int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts);
+ invariant(r == 0 || r == ETIMEDOUT);
+
+ t_now = toku_current_time_microsec();
+ if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) {
+ m_info->counters.timeout_count += 1;
+
+ // if we're still pending and we timed out, then remove our
+ // request from the set of lock requests and fail.
+ remove_from_lock_requests();
+
+ // complete sets m_state to COMPLETE, breaking us out of the loop
+ complete(DB_LOCK_NOTGRANTED);
+ }
+ }
+
+ uint64_t t_real_end = toku_current_time_microsec();
+ uint64_t duration = t_real_end - t_start;
+ m_info->counters.wait_count += 1;
+ m_info->counters.wait_time += duration;
+ if (duration >= 1000000) {
+ m_info->counters.long_wait_count += 1;
+ m_info->counters.long_wait_time += duration;
+ }
+ toku_mutex_unlock(&m_info->mutex);
+
+ invariant(m_state == state::COMPLETE);
+ return m_complete_r;
+}
+
+// complete this lock request with the given return value
+void lock_request::complete(int complete_r) {
+ m_complete_r = complete_r;
+ m_state = state::COMPLETE;
+}
+
+const DBT *lock_request::get_left_key(void) const {
+ return m_left_key;
+}
+
+const DBT *lock_request::get_right_key(void) const {
+ return m_right_key;
+}
+
+TXNID lock_request::get_txnid(void) const {
+ return m_txnid;
+}
+
+uint64_t lock_request::get_start_time(void) const {
+ return m_start_time;
+}
+
+TXNID lock_request::get_conflicting_txnid(void) const {
+ return m_conflicting_txnid;
+}
+
+int lock_request::retry(void) {
+ int r;
+
+ invariant(m_state == state::PENDING);
+ if (m_type == type::WRITE) {
+ r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
+ } else {
+ r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
+ }
+
+ // if the acquisition succeeded then remove ourselves from the
+ // set of lock requests, complete, and signal the waiting thread.
+ if (r == 0) {
+ remove_from_lock_requests();
+ complete(r);
+ if (m_retry_test_callback) m_retry_test_callback(); // test callback
+ toku_cond_broadcast(&m_wait_cond);
+ }
+
+ return r;
+}
+
+void lock_request::retry_all_lock_requests(locktree *lt) {
+ lt_lock_request_info *info = lt->get_lock_request_info();
+
+ // if a thread reads this bit to be true, then it should go ahead and
+ // take the locktree mutex and retry lock requests. we use this bit
+ // to prevent every single thread from waiting on the locktree mutex
+ // in order to retry requests, especially when no requests actually exist.
+ //
+ // it is important to note that this bit only provides an optimization.
+ // it is not problematic for it to be true when it should be false,
+ // but it can be problematic for it to be false when it should be true.
+ // therefore, the lock request code must ensures that when lock requests
+ // are added to this locktree, the bit is set.
+ // see lock_request::insert_into_lock_requests()
+ if (!info->should_retry_lock_requests) {
+ return;
+ }
+
+ toku_mutex_lock(&info->mutex);
+
+ // let other threads know that they need not retry lock requests at this time.
+ //
+ // the motivation here is that if a bunch of threads have already released
+ // their locks in the rangetree, then its probably okay for only one thread
+ // to iterate over the list of requests and retry them. otherwise, at high
+ // thread counts and a large number of pending lock requests, you could
+ // end up wasting a lot of cycles.
+ info->should_retry_lock_requests = false;
+
+ size_t i = 0;
+ while (i < info->pending_lock_requests.size()) {
+ lock_request *request;
+ int r = info->pending_lock_requests.fetch(i, &request);
+ invariant_zero(r);
+
+ // retry the lock request. if it didn't succeed,
+ // move on to the next lock request. otherwise
+ // the request is gone from the list so we may
+ // read the i'th entry for the next one.
+ r = request->retry();
+ if (r != 0) {
+ i++;
+ }
+ }
+
+ // future threads should only retry lock requests if some still exist
+ info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
+
+ toku_mutex_unlock(&info->mutex);
+}
+
+// find another lock request by txnid. must hold the mutex.
+lock_request *lock_request::find_lock_request(const TXNID &txnid) {
+ lock_request *request;
+ int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(txnid, &request, nullptr);
+ if (r != 0) {
+ request = nullptr;
+ }
+ return request;
+}
+
+// insert this lock request into the locktree's set. must hold the mutex.
+void lock_request::insert_into_lock_requests(void) {
+ uint32_t idx;
+ lock_request *request;
+ int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(m_txnid, &request, &idx);
+ invariant(r == DB_NOTFOUND);
+ r = m_info->pending_lock_requests.insert_at(this, idx);
+ invariant_zero(r);
+
+ // ensure that this bit is true, now that at least one lock request is in the set
+ m_info->should_retry_lock_requests = true;
+}
+
+// remove this lock request from the locktree's set. must hold the mutex.
+void lock_request::remove_from_lock_requests(void) {
+ uint32_t idx;
+ lock_request *request;
+ int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(m_txnid, &request, &idx);
+ invariant_zero(r);
+ invariant(request == this);
+ r = m_info->pending_lock_requests.delete_at(idx);
+ invariant_zero(r);
+}
+
+int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) {
+ TXNID request_txnid = request->m_txnid;
+ if (request_txnid < txnid) {
+ return -1;
+ } else if (request_txnid == txnid) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+void lock_request::set_start_test_callback(void (*f)(void)) {
+ m_start_test_callback = f;
+}
+
+void lock_request::set_retry_test_callback(void (*f)(void)) {
+ m_retry_test_callback = f;
+}
+
+} /* namespace toku */