diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/locktree/lock_request.cc')
-rw-r--r-- | storage/tokudb/PerconaFT/locktree/lock_request.cc | 169 |
1 files changed, 121 insertions, 48 deletions
diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc index 22b6da9afc4..943362e1b9d 100644 --- a/storage/tokudb/PerconaFT/locktree/lock_request.cc +++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc @@ -65,6 +65,7 @@ void lock_request::create(void) { toku_cond_init(&m_wait_cond, nullptr); m_start_test_callback = nullptr; + m_start_before_pending_test_callback = nullptr; m_retry_test_callback = nullptr; } @@ -79,7 +80,7 @@ void lock_request::destroy(void) { } // set the lock request parameters. this API allows a lock request to be reused. -void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) { +void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) { invariant(m_state != state::PENDING); m_lt = lt; m_txnid = txnid; @@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT m_state = state::INITIALIZED; m_info = lt ? lt->get_lock_request_info() : nullptr; m_big_txn = big_txn; + m_extra = extra; } // get rid of any stored left and right key copies and @@ -173,6 +175,7 @@ int lock_request::start(void) { m_state = state::PENDING; m_start_time = toku_current_time_microsec() / 1000; m_conflicting_txnid = conflicts.get(0); + if (m_start_before_pending_test_callback) m_start_before_pending_test_callback(); toku_mutex_lock(&m_info->mutex); insert_into_lock_requests(); if (deadlock_exists(conflicts)) { @@ -196,14 +199,32 @@ int lock_request::wait(uint64_t wait_time_ms) { return wait(wait_time_ms, 0, nullptr); } -int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) { +int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void), + void (*lock_wait_callback)(void *, TXNID, TXNID)) { uint64_t t_now = toku_current_time_microsec(); uint64_t t_start = t_now; uint64_t t_end = t_start + wait_time_ms * 1000; toku_mutex_lock(&m_info->mutex); + // check again, this time locking out other retry calls + if (m_state == state::PENDING) { + GrowableArray<TXNID> conflicts_collector; + conflicts_collector.init(); + retry(&conflicts_collector); + if (m_state == state::PENDING) { + report_waits(&conflicts_collector, lock_wait_callback); + } + conflicts_collector.deinit(); + } + while (m_state == state::PENDING) { + // check if this thread is killed + if (killed_callback && killed_callback()) { + remove_from_lock_requests(); + complete(DB_LOCK_NOTGRANTED); + continue; + } // compute next wait time uint64_t t_wait; @@ -221,7 +242,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil invariant(r == 0 || r == ETIMEDOUT); t_now = toku_current_time_microsec(); - if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) { + if (m_state == state::PENDING && t_now >= t_end) { m_info->counters.timeout_count += 1; // if we're still pending and we timed out, then remove our @@ -273,14 +294,16 @@ TXNID lock_request::get_conflicting_txnid(void) const { return m_conflicting_txnid; } -int lock_request::retry(void) { +int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) { + invariant(m_state == state::PENDING); int r; - invariant(m_state == state::PENDING); + txnid_set conflicts; + conflicts.create(); if (m_type == type::WRITE) { - r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn); + r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn); } else { - r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn); + r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn); } // if the acquisition succeeded then remove ourselves from the @@ -290,59 +313,105 @@ int lock_request::retry(void) { complete(r); if (m_retry_test_callback) m_retry_test_callback(); // test callback toku_cond_broadcast(&m_wait_cond); + } else { + m_conflicting_txnid = conflicts.get(0); + add_conflicts_to_waits(&conflicts, conflicts_collector); } + conflicts.destroy(); return r; } -void lock_request::retry_all_lock_requests(locktree *lt) { +void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) { lt_lock_request_info *info = lt->get_lock_request_info(); - // if a thread reads this bit to be true, then it should go ahead and - // take the locktree mutex and retry lock requests. we use this bit - // to prevent every single thread from waiting on the locktree mutex - // in order to retry requests, especially when no requests actually exist. - // - // it is important to note that this bit only provides an optimization. - // it is not problematic for it to be true when it should be false, - // but it can be problematic for it to be false when it should be true. - // therefore, the lock request code must ensures that when lock requests - // are added to this locktree, the bit is set. - // see lock_request::insert_into_lock_requests() - if (!info->should_retry_lock_requests) { + info->retry_want++; + + // if there are no pending lock requests than there is nothing to do + // the unlocked data race on pending_is_empty is OK since lock requests + // are retried after added to the pending set. + if (info->pending_is_empty) return; - } toku_mutex_lock(&info->mutex); - // let other threads know that they need not retry lock requests at this time. - // - // the motivation here is that if a bunch of threads have already released - // their locks in the rangetree, then its probably okay for only one thread - // to iterate over the list of requests and retry them. otherwise, at high - // thread counts and a large number of pending lock requests, you could - // end up wasting a lot of cycles. - info->should_retry_lock_requests = false; - - size_t i = 0; - while (i < info->pending_lock_requests.size()) { - lock_request *request; - int r = info->pending_lock_requests.fetch(i, &request); - invariant_zero(r); - - // retry the lock request. if it didn't succeed, - // move on to the next lock request. otherwise - // the request is gone from the list so we may - // read the i'th entry for the next one. - r = request->retry(); - if (r != 0) { - i++; + GrowableArray<TXNID> conflicts_collector; + conflicts_collector.init(); + + // here is the group retry algorithm. + // get the latest retry_want count and use it as the generation number of this retry operation. + // if this retry generation is > the last retry generation, then do the lock retries. otherwise, + // no lock retries are needed. + unsigned long long retry_gen = info->retry_want.load(); + if (retry_gen > info->retry_done) { + + // retry all of the pending lock requests. + for (size_t i = 0; i < info->pending_lock_requests.size(); ) { + lock_request *request; + int r = info->pending_lock_requests.fetch(i, &request); + invariant_zero(r); + + // retry this lock request. if it didn't succeed, + // move on to the next lock request. otherwise + // the request is gone from the list so we may + // read the i'th entry for the next one. + r = request->retry(&conflicts_collector); + if (r != 0) { + i++; + } } + if (after_retry_all_test_callback) after_retry_all_test_callback(); + info->retry_done = retry_gen; + } + + toku_mutex_unlock(&info->mutex); + + report_waits(&conflicts_collector, lock_wait_callback); + conflicts_collector.deinit(); +} + +void lock_request::add_conflicts_to_waits(txnid_set *conflicts, + GrowableArray<TXNID> *wait_conflicts) { + size_t num_conflicts = conflicts->size(); + for (size_t i = 0; i < num_conflicts; i++) { + wait_conflicts->push(m_txnid); + wait_conflicts->push(conflicts->get(i)); } +} + +void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts, + void (*lock_wait_callback)(void *, TXNID, TXNID)) { + if (!lock_wait_callback) + return; + size_t num_conflicts = wait_conflicts->get_size(); + for (size_t i = 0; i < num_conflicts; i += 2) { + TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i); + TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1); + (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid); + } +} + +void *lock_request::get_extra(void) const { + return m_extra; +} - // future threads should only retry lock requests if some still exist - info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; +void lock_request::kill_waiter(void) { + remove_from_lock_requests(); + complete(DB_LOCK_NOTGRANTED); + toku_cond_broadcast(&m_wait_cond); +} +void lock_request::kill_waiter(locktree *lt, void *extra) { + lt_lock_request_info *info = lt->get_lock_request_info(); + toku_mutex_lock(&info->mutex); + for (size_t i = 0; i < info->pending_lock_requests.size(); i++) { + lock_request *request; + int r = info->pending_lock_requests.fetch(i, &request); + if (r == 0 && request->get_extra() == extra) { + request->kill_waiter(); + break; + } + } toku_mutex_unlock(&info->mutex); } @@ -364,9 +433,7 @@ void lock_request::insert_into_lock_requests(void) { invariant(r == DB_NOTFOUND); r = m_info->pending_lock_requests.insert_at(this, idx); invariant_zero(r); - - // ensure that this bit is true, now that at least one lock request is in the set - m_info->should_retry_lock_requests = true; + m_info->pending_is_empty = false; } // remove this lock request from the locktree's set. must hold the mutex. @@ -378,6 +445,8 @@ void lock_request::remove_from_lock_requests(void) { invariant(request == this); r = m_info->pending_lock_requests.delete_at(idx); invariant_zero(r); + if (m_info->pending_lock_requests.size() == 0) + m_info->pending_is_empty = true; } int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) { @@ -395,6 +464,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) { m_start_test_callback = f; } +void lock_request::set_start_before_pending_test_callback(void (*f)(void)) { + m_start_before_pending_test_callback = f; +} + void lock_request::set_retry_test_callback(void (*f)(void)) { m_retry_test_callback = f; } |