diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-11-23 16:46:33 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-11-23 16:49:42 +0100 |
commit | 660a2928a535e36e5dda846677dce4ba96508cd7 (patch) | |
tree | ab73245a90d8f2e091825e43417c207379a2f1fc /storage/tokudb/PerconaFT/src | |
parent | d145d1b6ee42ffa7f9ca2ce02478a31a09f3fe99 (diff) | |
download | mariadb-git-660a2928a535e36e5dda846677dce4ba96508cd7.tar.gz |
Fix optimistic parallel replication for TokuDB.
Make TokuDB report row lock waits with thd_rpl_deadlock_check(). This allows
parallel replication to properly detect conflicts, and kill and retry the
offending transaction.
Diffstat (limited to 'storage/tokudb/PerconaFT/src')
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb-internal.h | 1 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb.cc | 7 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb_row_lock.cc | 15 |
3 files changed, 19 insertions, 4 deletions
diff --git a/storage/tokudb/PerconaFT/src/ydb-internal.h b/storage/tokudb/PerconaFT/src/ydb-internal.h index d40f7795b0b..a1eb43a67c5 100644 --- a/storage/tokudb/PerconaFT/src/ydb-internal.h +++ b/storage/tokudb/PerconaFT/src/ydb-internal.h @@ -105,6 +105,7 @@ struct __toku_db_env_internal { TOKULOGGER logger; toku::locktree_manager ltm; lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock. + lock_wait_callback lock_wait_needed_callback; // Called when a lock request requires a wait. DB *directory; // Maps dnames to inames DB *persistent_environment; // Stores environment settings, can be used for upgrade diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc index d51ee81700f..f99b8dfa5da 100644 --- a/storage/tokudb/PerconaFT/src/ydb.cc +++ b/storage/tokudb/PerconaFT/src/ydb.cc @@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) { return 0; } +static int +env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) { + env->i->lock_wait_needed_callback = callback; + return 0; +} + static void format_time(const time_t *timer, char *buf) { ctime_r(timer, buf); @@ -2704,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { USENV(get_lock_timeout); USENV(set_lock_timeout); USENV(set_lock_timeout_callback); + USENV(set_lock_wait_callback); USENV(set_redzone); USENV(log_flush); USENV(log_archive); diff --git a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc index 913e1a44faf..597e6311eb8 100644 --- a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc +++ b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc @@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT toku::lock_request::type lock_type, toku::lock_request *request) { DB_TXN *txn_anc = txn_oldest_ancester(txn); TXNID txn_anc_id = txn_anc->id64(txn_anc); - request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc)); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); + request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc), client_extra); const int r = request->start(); if (r == 0) { @@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) { uint64_t killed_time_msec = env->i->default_killed_time_msec; if (env->i->get_killed_time_callback) killed_time_msec = env->i->get_killed_time_callback(killed_time_msec); - const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback); + const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback, + env->i->lock_wait_needed_callback); if (r == 0) { db_txn_note_row_lock(db, txn_anc, left_key, right_key); } else if (r == DB_LOCK_NOTGRANTED) { @@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) { // This lock request must succeed, so we do not want to wait toku::lock_request request; request.create(); - request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc)); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); + request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc), client_extra); int r = request.start(); invariant_zero(r); db_txn_note_row_lock(db, txn_anc, key, key); @@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) { // all of our locks have been released, so first try to wake up // pending lock requests, then release our reference on the lt - toku::lock_request::retry_all_lock_requests(lt); + toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback); // Release our reference on this locktree toku::locktree_manager *ltm = &txn->mgrp->i->ltm; |