summaryrefslogtreecommitdiff
path: root/storage/tokudb/PerconaFT/locktree/lock_request.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/locktree/lock_request.cc')
-rw-r--r--storage/tokudb/PerconaFT/locktree/lock_request.cc169
1 files changed, 121 insertions, 48 deletions
diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc
index 22b6da9afc4..943362e1b9d 100644
--- a/storage/tokudb/PerconaFT/locktree/lock_request.cc
+++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc
@@ -65,6 +65,7 @@ void lock_request::create(void) {
toku_cond_init(&m_wait_cond, nullptr);
m_start_test_callback = nullptr;
+ m_start_before_pending_test_callback = nullptr;
m_retry_test_callback = nullptr;
}
@@ -79,7 +80,7 @@ void lock_request::destroy(void) {
}
// set the lock request parameters. this API allows a lock request to be reused.
-void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) {
+void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
invariant(m_state != state::PENDING);
m_lt = lt;
m_txnid = txnid;
@@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT
m_state = state::INITIALIZED;
m_info = lt ? lt->get_lock_request_info() : nullptr;
m_big_txn = big_txn;
+ m_extra = extra;
}
// get rid of any stored left and right key copies and
@@ -173,6 +175,7 @@ int lock_request::start(void) {
m_state = state::PENDING;
m_start_time = toku_current_time_microsec() / 1000;
m_conflicting_txnid = conflicts.get(0);
+ if (m_start_before_pending_test_callback) m_start_before_pending_test_callback();
toku_mutex_lock(&m_info->mutex);
insert_into_lock_requests();
if (deadlock_exists(conflicts)) {
@@ -196,14 +199,32 @@ int lock_request::wait(uint64_t wait_time_ms) {
return wait(wait_time_ms, 0, nullptr);
}
-int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) {
+int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
+ void (*lock_wait_callback)(void *, TXNID, TXNID)) {
uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now;
uint64_t t_end = t_start + wait_time_ms * 1000;
toku_mutex_lock(&m_info->mutex);
+ // check again, this time locking out other retry calls
+ if (m_state == state::PENDING) {
+ GrowableArray<TXNID> conflicts_collector;
+ conflicts_collector.init();
+ retry(&conflicts_collector);
+ if (m_state == state::PENDING) {
+ report_waits(&conflicts_collector, lock_wait_callback);
+ }
+ conflicts_collector.deinit();
+ }
+
while (m_state == state::PENDING) {
+ // check if this thread is killed
+ if (killed_callback && killed_callback()) {
+ remove_from_lock_requests();
+ complete(DB_LOCK_NOTGRANTED);
+ continue;
+ }
// compute next wait time
uint64_t t_wait;
@@ -221,7 +242,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
invariant(r == 0 || r == ETIMEDOUT);
t_now = toku_current_time_microsec();
- if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) {
+ if (m_state == state::PENDING && t_now >= t_end) {
m_info->counters.timeout_count += 1;
// if we're still pending and we timed out, then remove our
@@ -273,14 +294,16 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}
-int lock_request::retry(void) {
+int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
+ invariant(m_state == state::PENDING);
int r;
- invariant(m_state == state::PENDING);
+ txnid_set conflicts;
+ conflicts.create();
if (m_type == type::WRITE) {
- r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
+ r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
} else {
- r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
+ r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
}
// if the acquisition succeeded then remove ourselves from the
@@ -290,59 +313,105 @@ int lock_request::retry(void) {
complete(r);
if (m_retry_test_callback) m_retry_test_callback(); // test callback
toku_cond_broadcast(&m_wait_cond);
+ } else {
+ m_conflicting_txnid = conflicts.get(0);
+ add_conflicts_to_waits(&conflicts, conflicts_collector);
}
+ conflicts.destroy();
return r;
}
-void lock_request::retry_all_lock_requests(locktree *lt) {
+void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();
- // if a thread reads this bit to be true, then it should go ahead and
- // take the locktree mutex and retry lock requests. we use this bit
- // to prevent every single thread from waiting on the locktree mutex
- // in order to retry requests, especially when no requests actually exist.
- //
- // it is important to note that this bit only provides an optimization.
- // it is not problematic for it to be true when it should be false,
- // but it can be problematic for it to be false when it should be true.
- // therefore, the lock request code must ensures that when lock requests
- // are added to this locktree, the bit is set.
- // see lock_request::insert_into_lock_requests()
- if (!info->should_retry_lock_requests) {
+ info->retry_want++;
+
+ // if there are no pending lock requests than there is nothing to do
+ // the unlocked data race on pending_is_empty is OK since lock requests
+ // are retried after added to the pending set.
+ if (info->pending_is_empty)
return;
- }
toku_mutex_lock(&info->mutex);
- // let other threads know that they need not retry lock requests at this time.
- //
- // the motivation here is that if a bunch of threads have already released
- // their locks in the rangetree, then its probably okay for only one thread
- // to iterate over the list of requests and retry them. otherwise, at high
- // thread counts and a large number of pending lock requests, you could
- // end up wasting a lot of cycles.
- info->should_retry_lock_requests = false;
-
- size_t i = 0;
- while (i < info->pending_lock_requests.size()) {
- lock_request *request;
- int r = info->pending_lock_requests.fetch(i, &request);
- invariant_zero(r);
-
- // retry the lock request. if it didn't succeed,
- // move on to the next lock request. otherwise
- // the request is gone from the list so we may
- // read the i'th entry for the next one.
- r = request->retry();
- if (r != 0) {
- i++;
+ GrowableArray<TXNID> conflicts_collector;
+ conflicts_collector.init();
+
+ // here is the group retry algorithm.
+ // get the latest retry_want count and use it as the generation number of this retry operation.
+ // if this retry generation is > the last retry generation, then do the lock retries. otherwise,
+ // no lock retries are needed.
+ unsigned long long retry_gen = info->retry_want.load();
+ if (retry_gen > info->retry_done) {
+
+ // retry all of the pending lock requests.
+ for (size_t i = 0; i < info->pending_lock_requests.size(); ) {
+ lock_request *request;
+ int r = info->pending_lock_requests.fetch(i, &request);
+ invariant_zero(r);
+
+ // retry this lock request. if it didn't succeed,
+ // move on to the next lock request. otherwise
+ // the request is gone from the list so we may
+ // read the i'th entry for the next one.
+ r = request->retry(&conflicts_collector);
+ if (r != 0) {
+ i++;
+ }
}
+ if (after_retry_all_test_callback) after_retry_all_test_callback();
+ info->retry_done = retry_gen;
+ }
+
+ toku_mutex_unlock(&info->mutex);
+
+ report_waits(&conflicts_collector, lock_wait_callback);
+ conflicts_collector.deinit();
+}
+
+void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
+ GrowableArray<TXNID> *wait_conflicts) {
+ size_t num_conflicts = conflicts->size();
+ for (size_t i = 0; i < num_conflicts; i++) {
+ wait_conflicts->push(m_txnid);
+ wait_conflicts->push(conflicts->get(i));
}
+}
+
+void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
+ void (*lock_wait_callback)(void *, TXNID, TXNID)) {
+ if (!lock_wait_callback)
+ return;
+ size_t num_conflicts = wait_conflicts->get_size();
+ for (size_t i = 0; i < num_conflicts; i += 2) {
+ TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
+ TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
+ (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
+ }
+}
+
+void *lock_request::get_extra(void) const {
+ return m_extra;
+}
- // future threads should only retry lock requests if some still exist
- info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
+void lock_request::kill_waiter(void) {
+ remove_from_lock_requests();
+ complete(DB_LOCK_NOTGRANTED);
+ toku_cond_broadcast(&m_wait_cond);
+}
+void lock_request::kill_waiter(locktree *lt, void *extra) {
+ lt_lock_request_info *info = lt->get_lock_request_info();
+ toku_mutex_lock(&info->mutex);
+ for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
+ lock_request *request;
+ int r = info->pending_lock_requests.fetch(i, &request);
+ if (r == 0 && request->get_extra() == extra) {
+ request->kill_waiter();
+ break;
+ }
+ }
toku_mutex_unlock(&info->mutex);
}
@@ -364,9 +433,7 @@ void lock_request::insert_into_lock_requests(void) {
invariant(r == DB_NOTFOUND);
r = m_info->pending_lock_requests.insert_at(this, idx);
invariant_zero(r);
-
- // ensure that this bit is true, now that at least one lock request is in the set
- m_info->should_retry_lock_requests = true;
+ m_info->pending_is_empty = false;
}
// remove this lock request from the locktree's set. must hold the mutex.
@@ -378,6 +445,8 @@ void lock_request::remove_from_lock_requests(void) {
invariant(request == this);
r = m_info->pending_lock_requests.delete_at(idx);
invariant_zero(r);
+ if (m_info->pending_lock_requests.size() == 0)
+ m_info->pending_is_empty = true;
}
int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) {
@@ -395,6 +464,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) {
m_start_test_callback = f;
}
+void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
+ m_start_before_pending_test_callback = f;
+}
+
void lock_request::set_retry_test_callback(void (*f)(void)) {
m_retry_test_callback = f;
}