diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-11-23 16:46:33 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-11-23 16:49:42 +0100 |
commit | 660a2928a535e36e5dda846677dce4ba96508cd7 (patch) | |
tree | ab73245a90d8f2e091825e43417c207379a2f1fc | |
parent | d145d1b6ee42ffa7f9ca2ce02478a31a09f3fe99 (diff) | |
download | mariadb-git-660a2928a535e36e5dda846677dce4ba96508cd7.tar.gz |
Fix optimistic parallel replication for TokuDB.
Make TokuDB report row lock waits with thd_rpl_deadlock_check(). This allows
parallel replication to properly detect conflicts, and kill and retry the
offending transaction.
-rw-r--r-- | storage/tokudb/PerconaFT/buildheader/make_tdb.cc | 2 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/ftcxx/db_env.hpp | 12 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/locktree/lock_request.cc | 45 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/locktree/lock_request.h | 12 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc | 2 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb-internal.h | 1 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb.cc | 7 | ||||
-rw-r--r-- | storage/tokudb/PerconaFT/src/ydb_row_lock.cc | 15 | ||||
-rw-r--r-- | storage/tokudb/hatoku_hton.cc | 87 |
9 files changed, 163 insertions, 20 deletions
diff --git a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc index cadaa48ccea..0145d631839 100644 --- a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc +++ b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc @@ -405,6 +405,7 @@ static void print_db_env_struct (void) { "int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))", "int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)", "int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)", + "int (*set_lock_wait_callback) (DB_ENV *env, lock_wait_callback callback)", "int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)", "int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)", "DB* (*get_db_for_directory) (DB_ENV*)", @@ -751,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE); printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n"); + printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n"); printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n"); printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n"); printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n"); diff --git a/storage/tokudb/PerconaFT/ftcxx/db_env.hpp b/storage/tokudb/PerconaFT/ftcxx/db_env.hpp index 071614b87e9..15b5ce55f72 100644 --- a/storage/tokudb/PerconaFT/ftcxx/db_env.hpp +++ b/storage/tokudb/PerconaFT/ftcxx/db_env.hpp @@ -202,6 +202,7 @@ namespace ftcxx { typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t); get_lock_wait_time_cb_func _get_lock_wait_time_cb; lock_timeout_callback _lock_timeout_callback; + lock_wait_callback _lock_wait_needed_callback; uint64_t (*_loader_memory_size_callback)(void); uint32_t _cachesize_gbytes; @@ -231,6 +232,7 @@ namespace ftcxx { _lock_wait_time_msec(0), _get_lock_wait_time_cb(nullptr), _lock_timeout_callback(nullptr), + _lock_wait_needed_callback(nullptr), _loader_memory_size_callback(nullptr), _cachesize_gbytes(0), _cachesize_bytes(0), @@ -296,6 +298,11 @@ namespace ftcxx { handle_ft_retval(r); } + if (_lock_wait_needed_callback) { + r = env->set_lock_wait_callback(env, _lock_wait_needed_callback); + handle_ft_retval(r); + } + if (_loader_memory_size_callback) { env->set_loader_memory_size(env, _loader_memory_size_callback); } @@ -419,6 +426,11 @@ namespace ftcxx { return *this; } + DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) { + _lock_wait_needed_callback = callback; + return *this; + } + DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) { _loader_memory_size_callback = callback; return *this; diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc index 1bc613533db..943362e1b9d 100644 --- a/storage/tokudb/PerconaFT/locktree/lock_request.cc +++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc @@ -199,7 +199,8 @@ int lock_request::wait(uint64_t wait_time_ms) { return wait(wait_time_ms, 0, nullptr); } -int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) { +int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void), + void (*lock_wait_callback)(void *, TXNID, TXNID)) { uint64_t t_now = toku_current_time_microsec(); uint64_t t_start = t_now; uint64_t t_end = t_start + wait_time_ms * 1000; @@ -208,7 +209,13 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil // check again, this time locking out other retry calls if (m_state == state::PENDING) { - retry(); + GrowableArray<TXNID> conflicts_collector; + conflicts_collector.init(); + retry(&conflicts_collector); + if (m_state == state::PENDING) { + report_waits(&conflicts_collector, lock_wait_callback); + } + conflicts_collector.deinit(); } while (m_state == state::PENDING) { @@ -287,7 +294,7 @@ TXNID lock_request::get_conflicting_txnid(void) const { return m_conflicting_txnid; } -int lock_request::retry(void) { +int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) { invariant(m_state == state::PENDING); int r; @@ -308,13 +315,14 @@ int lock_request::retry(void) { toku_cond_broadcast(&m_wait_cond); } else { m_conflicting_txnid = conflicts.get(0); + add_conflicts_to_waits(&conflicts, conflicts_collector); } conflicts.destroy(); return r; } -void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_test_callback)(void)) { +void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) { lt_lock_request_info *info = lt->get_lock_request_info(); info->retry_want++; @@ -327,6 +335,9 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ toku_mutex_lock(&info->mutex); + GrowableArray<TXNID> conflicts_collector; + conflicts_collector.init(); + // here is the group retry algorithm. // get the latest retry_want count and use it as the generation number of this retry operation. // if this retry generation is > the last retry generation, then do the lock retries. otherwise, @@ -344,7 +355,7 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ // move on to the next lock request. otherwise // the request is gone from the list so we may // read the i'th entry for the next one. - r = request->retry(); + r = request->retry(&conflicts_collector); if (r != 0) { i++; } @@ -354,6 +365,30 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ } toku_mutex_unlock(&info->mutex); + + report_waits(&conflicts_collector, lock_wait_callback); + conflicts_collector.deinit(); +} + +void lock_request::add_conflicts_to_waits(txnid_set *conflicts, + GrowableArray<TXNID> *wait_conflicts) { + size_t num_conflicts = conflicts->size(); + for (size_t i = 0; i < num_conflicts; i++) { + wait_conflicts->push(m_txnid); + wait_conflicts->push(conflicts->get(i)); + } +} + +void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts, + void (*lock_wait_callback)(void *, TXNID, TXNID)) { + if (!lock_wait_callback) + return; + size_t num_conflicts = wait_conflicts->get_size(); + for (size_t i = 0; i < num_conflicts; i += 2) { + TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i); + TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1); + (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid); + } } void *lock_request::get_extra(void) const { diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.h b/storage/tokudb/PerconaFT/locktree/lock_request.h index ab69253bcec..1fa94ef5b96 100644 --- a/storage/tokudb/PerconaFT/locktree/lock_request.h +++ b/storage/tokudb/PerconaFT/locktree/lock_request.h @@ -89,7 +89,8 @@ public: // returns: The return code of locktree::acquire_[write,read]_lock() // or simply DB_LOCK_NOTGRANTED if the wait time expired. int wait(uint64_t wait_time_ms); - int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)); + int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void), + void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr); // return: left end-point of the lock range const DBT *get_left_key(void) const; @@ -109,7 +110,7 @@ public: // effect: Retries all of the lock requests for the given locktree. // Any lock requests successfully restarted is completed and woken up. // The rest remain pending. - static void retry_all_lock_requests(locktree *lt, void (*after_retry_test_callback)(void) = nullptr); + static void retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, void (*after_retry_test_callback)(void) = nullptr); void set_start_test_callback(void (*f)(void)); void set_start_before_pending_test_callback(void (*f)(void)); @@ -162,7 +163,7 @@ private: // effect: tries again to acquire the lock described by this lock request // returns: 0 if retrying the request succeeded and is now complete - int retry(void); + int retry(GrowableArray<TXNID> *conflict_collector); void complete(int complete_r); @@ -194,6 +195,11 @@ private: static int find_by_txnid(lock_request * const &request, const TXNID &txnid); + // Report list of conflicts to lock wait callback. + static void report_waits(GrowableArray<TXNID> *wait_conflicts, + void (*lock_wait_callback)(void *, TXNID, TXNID)); + void add_conflicts_to_waits(txnid_set *conflicts, GrowableArray<TXNID> *wait_conflicts); + void (*m_start_test_callback)(void); void (*m_start_before_pending_test_callback)(void); void (*m_retry_test_callback)(void); diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc index 92122861819..8f0d86c9f64 100644 --- a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc @@ -87,7 +87,7 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrie buffer.destroy(); // retry pending lock requests - lock_request::retry_all_lock_requests(lt, after_retry_all); + lock_request::retry_all_lock_requests(lt, nullptr, after_retry_all); } request.destroy(); diff --git a/storage/tokudb/PerconaFT/src/ydb-internal.h b/storage/tokudb/PerconaFT/src/ydb-internal.h index d40f7795b0b..a1eb43a67c5 100644 --- a/storage/tokudb/PerconaFT/src/ydb-internal.h +++ b/storage/tokudb/PerconaFT/src/ydb-internal.h @@ -105,6 +105,7 @@ struct __toku_db_env_internal { TOKULOGGER logger; toku::locktree_manager ltm; lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock. + lock_wait_callback lock_wait_needed_callback; // Called when a lock request requires a wait. DB *directory; // Maps dnames to inames DB *persistent_environment; // Stores environment settings, can be used for upgrade diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc index d51ee81700f..f99b8dfa5da 100644 --- a/storage/tokudb/PerconaFT/src/ydb.cc +++ b/storage/tokudb/PerconaFT/src/ydb.cc @@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) { return 0; } +static int +env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) { + env->i->lock_wait_needed_callback = callback; + return 0; +} + static void format_time(const time_t *timer, char *buf) { ctime_r(timer, buf); @@ -2704,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { USENV(get_lock_timeout); USENV(set_lock_timeout); USENV(set_lock_timeout_callback); + USENV(set_lock_wait_callback); USENV(set_redzone); USENV(log_flush); USENV(log_archive); diff --git a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc index 913e1a44faf..597e6311eb8 100644 --- a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc +++ b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc @@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT toku::lock_request::type lock_type, toku::lock_request *request) { DB_TXN *txn_anc = txn_oldest_ancester(txn); TXNID txn_anc_id = txn_anc->id64(txn_anc); - request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc)); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); + request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc), client_extra); const int r = request->start(); if (r == 0) { @@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) { uint64_t killed_time_msec = env->i->default_killed_time_msec; if (env->i->get_killed_time_callback) killed_time_msec = env->i->get_killed_time_callback(killed_time_msec); - const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback); + const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback, + env->i->lock_wait_needed_callback); if (r == 0) { db_txn_note_row_lock(db, txn_anc, left_key, right_key); } else if (r == DB_LOCK_NOTGRANTED) { @@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) { // This lock request must succeed, so we do not want to wait toku::lock_request request; request.create(); - request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc)); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); + request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc), client_extra); int r = request.start(); invariant_zero(r); db_txn_note_row_lock(db, txn_anc, key, key); @@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) { // all of our locks have been released, so first try to wake up // pending lock requests, then release our reference on the lt - toku::lock_request::retry_all_lock_requests(lt); + toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback); // Release our reference on this locktree toku::locktree_manager *ltm = &txn->mgrp->i->ltm; diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc index 67adc480914..1154840bf3c 100644 --- a/storage/tokudb/hatoku_hton.cc +++ b/storage/tokudb/hatoku_hton.cc @@ -55,6 +55,7 @@ static bool tokudb_show_status( static void tokudb_handle_fatal_signal(handlerton* hton, THD* thd, int sig); #endif static int tokudb_close_connection(handlerton* hton, THD* thd); +static void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level); static int tokudb_commit(handlerton* hton, THD* thd, bool all); static int tokudb_rollback(handlerton* hton, THD* thd, bool all); #if TOKU_INCLUDE_XA @@ -147,6 +148,11 @@ static void tokudb_lock_timeout_callback( const DBT* right_key, uint64_t blocking_txnid); +static void tokudb_lock_wait_needed_callback( + void* arg, + uint64_t requesting_txnid, + uint64_t blocking_txnid); + #define ASSERT_MSGLEN 1024 void toku_hton_assert_fail( @@ -331,6 +337,7 @@ static int tokudb_init_func(void *p) { tokudb_hton->create = tokudb_create_handler; tokudb_hton->close_connection = tokudb_close_connection; + tokudb_hton->kill_query = tokudb_kill_query; tokudb_hton->savepoint_offset = sizeof(SP_INFO_T); tokudb_hton->savepoint_set = tokudb_savepoint; @@ -532,6 +539,7 @@ static int tokudb_init_func(void *p) { db_env->set_lock_timeout_callback(db_env, tokudb_lock_timeout_callback); db_env->set_dir_per_db(db_env, tokudb::sysvars::dir_per_db); + db_env->set_lock_wait_callback(db_env, tokudb_lock_wait_needed_callback); db_env->set_loader_memory_size( db_env, @@ -754,6 +762,12 @@ static int tokudb_close_connection(handlerton* hton, THD* thd) { return error; } +void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level) { + TOKUDB_DBUG_ENTER(""); + db_env->kill_waiter(db_env, thd); + DBUG_VOID_RETURN; +} + bool tokudb_flush_logs(handlerton * hton) { TOKUDB_DBUG_ENTER(""); int error; @@ -873,9 +887,9 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { tokudb_sync_on_commit(thd, trx, this_txn) ? 0 : DB_TXN_NOSYNC; TOKUDB_TRACE_FOR_FLAGS( TOKUDB_DEBUG_TXN, - "commit trx %u txn %p syncflag %u", + "commit trx %u txn %p %" PRIu64 " syncflag %u", all, - this_txn, + this_txn, this_txn->id64(this_txn), syncflag); // test hook to induce a crash on a debug build DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE();); @@ -904,9 +918,9 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { if (this_txn) { TOKUDB_TRACE_FOR_FLAGS( TOKUDB_DEBUG_TXN, - "rollback %u txn %p", + "rollback %u txn %p %" PRIu64, all, - this_txn); + this_txn, this_txn->id64(this_txn)); tokudb_cleanup_handlers(trx, this_txn); abort_txn_with_progress(this_txn, thd); *txn = NULL; @@ -952,9 +966,9 @@ static int tokudb_xa_prepare(handlerton* hton, THD* thd, bool all) { uint32_t syncflag = tokudb_sync_on_prepare() ? 0 : DB_TXN_NOSYNC; TOKUDB_TRACE_FOR_FLAGS( TOKUDB_DEBUG_XA, - "doing txn prepare:%d:%p", + "doing txn prepare:%d:%p %" PRIu64, all, - txn); + txn, txn->id64(txn)); // a TOKU_XA_XID is identical to a MYSQL_XID TOKU_XA_XID thd_xid; thd_get_xid(thd, (MYSQL_XID*) &thd_xid); @@ -1570,7 +1584,9 @@ static int tokudb_search_txn_callback( void* extra) { uint64_t txn_id = txn->id64(txn); - uint64_t client_id = txn->get_client_id(txn); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); struct tokudb_search_txn_extra* e = reinterpret_cast<struct tokudb_search_txn_extra*>(extra); if (e->match_txn_id == txn_id) { @@ -1748,6 +1764,63 @@ static void tokudb_lock_timeout_callback( } } +extern "C" int thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd); + +struct tokudb_search_txn_thd { + bool match_found; + uint64_t match_txn_id; + THD *match_client_thd; +}; + +static int tokudb_search_txn_thd_callback( + DB_TXN* txn, + iterate_row_locks_callback iterate_locks, + void* locks_extra, + void* extra) { + + uint64_t txn_id = txn->id64(txn); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); + struct tokudb_search_txn_thd* e = + reinterpret_cast<struct tokudb_search_txn_thd*>(extra); + if (e->match_txn_id == txn_id) { + e->match_found = true; + e->match_client_thd = reinterpret_cast<THD *>(client_extra); + return 1; + } + return 0; +} + +static bool tokudb_txn_id_to_thd( + uint64_t txnid, + THD **out_thd) { + + struct tokudb_search_txn_thd e = { + false, + txnid, + 0 + }; + db_env->iterate_live_transactions(db_env, tokudb_search_txn_thd_callback, &e); + if (e.match_found) { + *out_thd = e.match_client_thd; + } + return e.match_found; +} + +static void tokudb_lock_wait_needed_callback( + void *arg, + uint64_t requesting_txnid, + uint64_t blocking_txnid) { + + THD *requesting_thd; + THD *blocking_thd; + if (tokudb_txn_id_to_thd(requesting_txnid, &requesting_thd) && + tokudb_txn_id_to_thd(blocking_txnid, &blocking_thd)) { + thd_rpl_deadlock_check (requesting_thd, blocking_thd); + } +} + // Retrieves variables for information_schema.global_status. // Names (columnname) are automatically converted to upper case, // and prefixed with "TOKUDB_" |