diff options
Diffstat (limited to 'storage/tokudb/ha_tokudb.cc')
-rw-r--r-- | storage/tokudb/ha_tokudb.cc | 573 |
1 files changed, 271 insertions, 302 deletions
diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 0cef79ed32c..0c2310f6685 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -120,14 +120,6 @@ extern "C" { #include "hatoku_defines.h" #include "hatoku_cmp.h" -static inline void *thd_data_get(THD *thd, int slot) { - return thd->ha_data[slot].ha_ptr; -} - -static inline void thd_data_set(THD *thd, int slot, void *data) { - thd->ha_data[slot].ha_ptr = data; -} - static inline uint get_key_parts(const KEY *key); #undef PACKAGE @@ -144,8 +136,8 @@ static inline uint get_key_parts(const KEY *key); #include "tokudb_buffer.h" #include "tokudb_status.h" #include "tokudb_card.h" -#include "hatoku_hton.h" #include "ha_tokudb.h" +#include "hatoku_hton.h" #include <mysql/plugin.h> static const char *ha_tokudb_exts[] = { @@ -477,10 +469,9 @@ typedef struct index_read_info { DBT* orig_key; } *INDEX_READ_INFO; - static int ai_poll_fun(void *extra, float progress) { LOADER_CONTEXT context = (LOADER_CONTEXT)extra; - if (context->thd->killed) { + if (thd_killed(context->thd)) { sprintf(context->write_status_msg, "The process has been killed, aborting add index."); return ER_ABORTING_CONNECTION; } @@ -495,7 +486,7 @@ static int ai_poll_fun(void *extra, float progress) { static int loader_poll_fun(void *extra, float progress) { LOADER_CONTEXT context = (LOADER_CONTEXT)extra; - if (context->thd->killed) { + if (thd_killed(context->thd)) { sprintf(context->write_status_msg, "The process has been killed, aborting bulk load."); return ER_ABORTING_CONNECTION; } @@ -1016,8 +1007,7 @@ static uchar* pack_toku_field_blob( static int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx) { int error; - tokudb_trx_data* trx = NULL; - trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL)); + tokudb_trx_data* trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL)); if (!trx) { error = ENOMEM; goto cleanup; @@ -1259,6 +1249,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t tokudb_active_index = MAX_KEY; invalidate_icp(); trx_handler_list.data = this; + in_rpl_write_rows = in_rpl_delete_rows = in_rpl_update_rows = false; TOKUDB_HANDLER_DBUG_VOID_RETURN; } @@ -1614,8 +1605,7 @@ int ha_tokudb::initialize_share( DB_TXN* txn = NULL; bool do_commit = false; THD* thd = ha_thd(); - tokudb_trx_data *trx = NULL; - trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot); + tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); if (thd_sql_command(thd) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) { txn = trx->sub_sp_level; } @@ -1649,8 +1639,7 @@ int ha_tokudb::initialize_share( #if WITH_PARTITION_STORAGE_ENGINE // verify frm data for non-partitioned tables - if (TOKU_PARTITION_WRITE_FRM_DATA || - IF_PARTITIONING(table->part_info, NULL) == NULL) { + if (TOKU_PARTITION_WRITE_FRM_DATA || table->part_info == NULL) { error = verify_frm_data(table->s->path.str, txn); if (error) goto exit; @@ -1727,7 +1716,7 @@ int ha_tokudb::initialize_share( } share->ref_length = ref_length; - error = estimate_num_rows(share->file,&num_rows, txn); + error = estimate_num_rows(share->file, &num_rows, txn); // // estimate_num_rows should not fail under normal conditions // @@ -1937,7 +1926,6 @@ exit: // int ha_tokudb::estimate_num_rows(DB* db, uint64_t* num_rows, DB_TXN* txn) { int error = ENOSYS; - DBC* crsr = NULL; bool do_commit = false; DB_BTREE_STAT64 dict_stats; DB_TXN* txn_to_use = NULL; @@ -1951,21 +1939,12 @@ int ha_tokudb::estimate_num_rows(DB* db, uint64_t* num_rows, DB_TXN* txn) { txn_to_use = txn; } - error = db->stat64( - share->file, - txn_to_use, - &dict_stats - ); + error = db->stat64(db, txn_to_use, &dict_stats); if (error) { goto cleanup; } *num_rows = dict_stats.bt_ndata; error = 0; cleanup: - if (crsr != NULL) { - int r = crsr->c_close(crsr); - assert(r==0); - crsr = NULL; - } if (do_commit) { commit_txn(txn_to_use, 0); txn_to_use = NULL; @@ -3271,7 +3250,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { TOKUDB_HANDLER_DBUG_ENTER("%llu txn %p", (unsigned long long) rows, transaction); #endif THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); delay_updating_ai_metadata = true; ai_metadata_update_required = false; abort_loader = false; @@ -3281,7 +3260,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { num_DBs_locked_in_bulk = true; lock_count = 0; - if (share->try_table_lock) { + if ((rows == 0 || rows > 1) && share->try_table_lock) { if (get_prelock_empty(thd) && may_table_be_empty(transaction)) { if (using_ignore || is_insert_ignore(thd) || thd->lex->duplicates != DUP_ERROR || table->s->next_number_key_offset) { @@ -3340,7 +3319,7 @@ int ha_tokudb::end_bulk_insert(bool abort) { TOKUDB_HANDLER_DBUG_ENTER(""); int error = 0; THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); bool using_loader = (loader != NULL); if (ai_metadata_update_required) { tokudb_pthread_mutex_lock(&share->mutex); @@ -3352,17 +3331,17 @@ int ha_tokudb::end_bulk_insert(bool abort) { ai_metadata_update_required = false; loader_error = 0; if (loader) { - if (!abort_loader && !thd->killed) { + if (!abort_loader && !thd_killed(thd)) { DBUG_EXECUTE_IF("tokudb_end_bulk_insert_sleep", { - const char *old_proc_info = tokudb_thd_get_proc_info(thd); + const char *orig_proc_info = tokudb_thd_get_proc_info(thd); thd_proc_info(thd, "DBUG sleep"); my_sleep(20000000); - thd_proc_info(thd, old_proc_info); + thd_proc_info(thd, orig_proc_info); }); error = loader->close(loader); loader = NULL; if (error) { - if (thd->killed) { + if (thd_killed(thd)) { my_error(ER_QUERY_INTERRUPTED, MYF(0)); } goto cleanup; @@ -3374,12 +3353,8 @@ int ha_tokudb::end_bulk_insert(bool abort) { if (i == primary_key && !share->pk_has_string) { continue; } - error = is_index_unique( - &is_unique, - transaction, - share->key_file[i], - &table->key_info[i] - ); + error = is_index_unique(&is_unique, transaction, share->key_file[i], &table->key_info[i], + DB_PRELOCKED_WRITE); if (error) goto cleanup; if (!is_unique) { error = HA_ERR_FOUND_DUPP_KEY; @@ -3419,6 +3394,7 @@ cleanup: } } trx->stmt_progress.using_loader = false; + thd_proc_info(thd, 0); TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error); } @@ -3426,7 +3402,7 @@ int ha_tokudb::end_bulk_insert() { return end_bulk_insert( false ); } -int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) { +int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info, int lock_flags) { int error; DBC* tmp_cursor1 = NULL; DBC* tmp_cursor2 = NULL; @@ -3434,7 +3410,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in uint64_t cnt = 0; char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. THD* thd = ha_thd(); - const char *old_proc_info = tokudb_thd_get_proc_info(thd); + const char *orig_proc_info = tokudb_thd_get_proc_info(thd); memset(&key1, 0, sizeof(key1)); memset(&key2, 0, sizeof(key2)); memset(&val, 0, sizeof(val)); @@ -3442,49 +3418,23 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in memset(&packed_key2, 0, sizeof(packed_key2)); *is_unique = true; - error = db->cursor( - db, - txn, - &tmp_cursor1, - DB_SERIALIZABLE - ); + error = db->cursor(db, txn, &tmp_cursor1, DB_SERIALIZABLE); if (error) { goto cleanup; } - error = db->cursor( - db, - txn, - &tmp_cursor2, - DB_SERIALIZABLE - ); + error = db->cursor(db, txn, &tmp_cursor2, DB_SERIALIZABLE); if (error) { goto cleanup; } - - error = tmp_cursor1->c_get( - tmp_cursor1, - &key1, - &val, - DB_NEXT - ); + error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags); if (error == DB_NOTFOUND) { *is_unique = true; error = 0; goto cleanup; } else if (error) { goto cleanup; } - error = tmp_cursor2->c_get( - tmp_cursor2, - &key2, - &val, - DB_NEXT - ); + error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags); if (error) { goto cleanup; } - error = tmp_cursor2->c_get( - tmp_cursor2, - &key2, - &val, - DB_NEXT - ); + error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags); if (error == DB_NOTFOUND) { *is_unique = true; error = 0; @@ -3496,59 +3446,25 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in bool has_null1; bool has_null2; int cmp; - place_key_into_mysql_buff( - key_info, - table->record[0], - (uchar *) key1.data + 1 - ); - place_key_into_mysql_buff( - key_info, - table->record[1], - (uchar *) key2.data + 1 - ); + place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key1.data + 1); + place_key_into_mysql_buff(key_info, table->record[1], (uchar *) key2.data + 1); - create_dbt_key_for_lookup( - &packed_key1, - key_info, - key_buff, - table->record[0], - &has_null1 - ); - create_dbt_key_for_lookup( - &packed_key2, - key_info, - key_buff2, - table->record[1], - &has_null2 - ); + create_dbt_key_for_lookup(&packed_key1, key_info, key_buff, table->record[0], &has_null1); + create_dbt_key_for_lookup(&packed_key2, key_info, key_buff2, table->record[1], &has_null2); if (!has_null1 && !has_null2) { cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2); if (cmp == 0) { memcpy(key_buff, key1.data, key1.size); - place_key_into_mysql_buff( - key_info, - table->record[0], - (uchar *) key_buff + 1 - ); + place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key_buff + 1); *is_unique = false; break; } } - error = tmp_cursor1->c_get( - tmp_cursor1, - &key1, - &val, - DB_NEXT - ); + error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags); if (error) { goto cleanup; } - error = tmp_cursor2->c_get( - tmp_cursor2, - &key2, - &val, - DB_NEXT - ); + error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags); if (error && (error != DB_NOTFOUND)) { goto cleanup; } cnt++; @@ -3560,7 +3476,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in share->rows, key_info->name); thd_proc_info(thd, status_msg); - if (thd->killed) { + if (thd_killed(thd)) { my_error(ER_QUERY_INTERRUPTED, MYF(0)); error = ER_QUERY_INTERRUPTED; goto cleanup; @@ -3571,7 +3487,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in error = 0; cleanup: - thd_proc_info(thd, old_proc_info); + thd_proc_info(thd, orig_proc_info); if (tmp_cursor1) { tmp_cursor1->c_close(tmp_cursor1); tmp_cursor1 = NULL; @@ -3646,12 +3562,27 @@ cleanup: return error; } +static void maybe_do_unique_checks_delay(THD *thd) { + if (thd->slave_thread) { + uint64_t delay_ms = THDVAR(thd, rpl_unique_checks_delay); + if (delay_ms) + usleep(delay_ms * 1000); + } +} + +static bool do_unique_checks(THD *thd, bool do_rpl_event) { + if (do_rpl_event && thd->slave_thread && opt_readonly && !THDVAR(thd, rpl_unique_checks)) + return false; + else + return !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS); +} + int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) { - int error; + int error = 0; // // first do uniqueness checks // - if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { + if (share->has_unique_keys && do_unique_checks(thd, in_rpl_write_rows)) { for (uint keynr = 0; keynr < table_share->keys; keynr++) { bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key); bool is_unique = false; @@ -3664,13 +3595,18 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) { if (!is_unique_key) { continue; } + + maybe_do_unique_checks_delay(thd); + // // if unique key, check uniqueness constraint // but, we do not need to check it if the key has a null // and we do not need to check it if unique_checks is off // error = is_val_unique(&is_unique, record, &table->key_info[keynr], keynr, txn); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } if (!is_unique) { error = DB_KEYEXIST; last_dup_key = keynr; @@ -3678,7 +3614,6 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) { } } } - error = 0; cleanup: return error; } @@ -3781,15 +3716,8 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { tokudb_my_free(tmp_pk_val_data); } -// // set the put flags for the main dictionary -// -void ha_tokudb::set_main_dict_put_flags( - THD* thd, - bool opt_eligible, - uint32_t* put_flags - ) -{ +void ha_tokudb::set_main_dict_put_flags(THD* thd, bool opt_eligible, uint32_t* put_flags) { uint32_t old_prelock_flags = 0; uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); bool in_hot_index = share->num_DBs > curr_num_DBs; @@ -3809,8 +3737,7 @@ void ha_tokudb::set_main_dict_put_flags( { *put_flags = old_prelock_flags; } - else if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) - && !is_replace_into(thd) && !is_insert_ignore(thd)) + else if (!do_unique_checks(thd, in_rpl_write_rows | in_rpl_update_rows) && !is_replace_into(thd) && !is_insert_ignore(thd)) { *put_flags = old_prelock_flags; } @@ -3832,22 +3759,18 @@ void ha_tokudb::set_main_dict_put_flags( int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) { int error = 0; - uint32_t put_flags = mult_put_flags[primary_key]; - THD *thd = ha_thd(); uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); - assert(curr_num_DBs == 1); - + + uint32_t put_flags = mult_put_flags[primary_key]; + THD *thd = ha_thd(); set_main_dict_put_flags(thd, true, &put_flags); - error = share->file->put( - share->file, - txn, - pk_key, - pk_val, - put_flags - ); + // for test, make unique checks have a very long duration + if ((put_flags & DB_OPFLAGS_MASK) == DB_NOOVERWRITE) + maybe_do_unique_checks_delay(thd); + error = share->file->put(share->file, txn, pk_key, pk_val, put_flags); if (error) { last_dup_key = primary_key; goto cleanup; @@ -3861,14 +3784,18 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN int error = 0; uint curr_num_DBs = share->num_DBs; set_main_dict_put_flags(thd, true, &mult_put_flags[primary_key]); - uint32_t i, flags = mult_put_flags[primary_key]; + uint32_t flags = mult_put_flags[primary_key]; + + // for test, make unique checks have a very long duration + if ((flags & DB_OPFLAGS_MASK) == DB_NOOVERWRITE) + maybe_do_unique_checks_delay(thd); // the insert ignore optimization uses DB_NOOVERWRITE_NO_ERROR, // which is not allowed with env->put_multiple. // we have to insert the rows one by one in this case. if (flags & DB_NOOVERWRITE_NO_ERROR) { DB * src_db = share->key_file[primary_key]; - for (i = 0; i < curr_num_DBs; i++) { + for (uint32_t i = 0; i < curr_num_DBs; i++) { DB * db = share->key_file[i]; if (i == primary_key) { // if it's the primary key, insert the rows @@ -3929,7 +3856,7 @@ out: // error otherwise // int ha_tokudb::write_row(uchar * record) { - TOKUDB_HANDLER_DBUG_ENTER(""); + TOKUDB_HANDLER_DBUG_ENTER("%p", record); DBT row, prim_key; int error; @@ -3967,10 +3894,7 @@ int ha_tokudb::write_row(uchar * record) { if (share->has_auto_inc && record == table->record[0]) { tokudb_pthread_mutex_lock(&share->mutex); ulonglong curr_auto_inc = retrieve_auto_increment( - table->field[share->ai_field_index]->key_type(), - field_offset(table->field[share->ai_field_index], table), - record - ); + table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), record); if (curr_auto_inc > share->last_auto_increment) { share->last_auto_increment = curr_auto_inc; if (delay_updating_ai_metadata) { @@ -4072,7 +3996,7 @@ int ha_tokudb::write_row(uchar * record) { } } - trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); if (!error) { added_rows++; trx->stmt_progress.inserted++; @@ -4129,7 +4053,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { THD* thd = ha_thd(); DB_TXN* sub_trans = NULL; DB_TXN* txn = NULL; - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); uint curr_num_DBs; LINT_INIT(error); @@ -4138,7 +4062,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { memset((void *) &prim_row, 0, sizeof(prim_row)); memset((void *) &old_prim_row, 0, sizeof(old_prim_row)); - ha_statistic_increment(&SSV::ha_update_count); #if MYSQL_VERSION_ID < 50600 if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) { @@ -4185,7 +4108,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { } txn = using_ignore ? sub_trans : transaction; - if (hidden_primary_key) { memset((void *) &prim_key, 0, sizeof(prim_key)); prim_key.data = (void *) current_ident; @@ -4197,10 +4119,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row, &has_null); } - // // do uniqueness checks - // - if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { + if (share->has_unique_keys && do_unique_checks(thd, in_rpl_update_rows)) { for (uint keynr = 0; keynr < table_share->keys; keynr++) { bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key); if (keynr == primary_key && !share->pk_has_string) { @@ -4241,6 +4161,10 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { set_main_dict_put_flags(thd, false, &mult_put_flags[primary_key]); + // for test, make unique checks have a very long duration + if ((mult_put_flags[primary_key] & DB_OPFLAGS_MASK) == DB_NOOVERWRITE) + maybe_do_unique_checks_delay(thd); + error = db_env->update_multiple( db_env, share->key_file[primary_key], @@ -4303,7 +4227,7 @@ int ha_tokudb::delete_row(const uchar * record) { bool has_null; THD* thd = ha_thd(); uint curr_num_DBs; - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);; ha_statistic_increment(&SSV::ha_delete_count); @@ -4459,6 +4383,20 @@ static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint k return key_can_be_null && key_len > 0 && key[0] != 0; } +// Return true if bulk fetch can be used +static bool tokudb_do_bulk_fetch(THD *thd) { + switch (thd_sql_command(thd)) { + case SQLCOM_SELECT: + case SQLCOM_CREATE_TABLE: + case SQLCOM_INSERT_SELECT: + case SQLCOM_REPLACE_SELECT: + case SQLCOM_DELETE: + return THDVAR(thd, bulk_fetch) != 0; + default: + return false; + } +} + // // Notification that a range query getting all elements that equal a key // to take place. Will pre acquire read lock @@ -4467,7 +4405,7 @@ static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint k // error otherwise // int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) { - TOKUDB_HANDLER_DBUG_ENTER(""); + TOKUDB_HANDLER_DBUG_ENTER("%p %u", key, key_len); int error = 0; DBT start_key, end_key; THD* thd = ha_thd(); @@ -4491,7 +4429,7 @@ int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) { range_lock_grabbed = true; range_lock_grabbed_null = index_key_is_null(table, tokudb_active_index, key, key_len); - doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT); + doing_bulk_fetch = tokudb_do_bulk_fetch(thd); bulk_fetch_iteration = 0; rows_fetched_using_bulk_fetch = 0; error = 0; @@ -4603,6 +4541,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { } invalidate_bulk_fetch(); doing_bulk_fetch = false; + maybe_index_scan = false; error = 0; exit: TOKUDB_HANDLER_DBUG_RETURN(error); @@ -4870,7 +4809,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ int error = 0; uint32_t flags = 0; THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);; struct smart_dbt_info info; struct index_read_info ir_info; @@ -5345,86 +5284,91 @@ cleanup: } int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_key_read) { - int error = 0; - uint32_t flags = SET_PRELOCK_FLAG(0); - THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; - bool need_val; + int error = 0; HANDLE_INVALID_CURSOR(); - // we need to read the val of what we retrieve if - // we do NOT have a covering index AND we are using a clustering secondary - // key - need_val = (do_key_read == 0) && - (tokudb_active_index == primary_key || - key_is_clustering(&table->key_info[tokudb_active_index]) - ); - - if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) { - error = read_data_from_range_query_buff(buf, need_val, do_key_read); - } - else if (icp_went_out_of_range) { - icp_went_out_of_range = false; - error = HA_ERR_END_OF_FILE; + if (maybe_index_scan) { + maybe_index_scan = false; + if (!range_lock_grabbed) { + error = prepare_index_scan(); + } } - else { - invalidate_bulk_fetch(); - if (doing_bulk_fetch) { - struct smart_dbt_bf_info bf_info; - bf_info.ha = this; - // you need the val if you have a clustering index and key_read is not 0; - bf_info.direction = direction; - bf_info.thd = ha_thd(); - bf_info.need_val = need_val; - bf_info.buf = buf; - bf_info.key_to_compare = key_to_compare; - // - // call c_getf_next with purpose of filling in range_query_buff - // - rows_fetched_using_bulk_fetch = 0; - // it is expected that we can do ICP in the smart_dbt_bf_callback - // as a result, it's possible we don't return any data because - // none of the rows matched the index condition. Therefore, we need - // this while loop. icp_out_of_range will be set if we hit a row that - // the index condition states is out of our range. When that hits, - // we know all the data in the buffer is the last data we will retrieve - while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) { - if (direction > 0) { - error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info); - } else { - error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info); - } - } - // if there is no data set and we went out of range, - // then there is nothing to return - if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) { - icp_went_out_of_range = false; - error = HA_ERR_END_OF_FILE; - } - if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) { - bulk_fetch_iteration++; - } + + if (!error) { + uint32_t flags = SET_PRELOCK_FLAG(0); - error = handle_cursor_error(error, HA_ERR_END_OF_FILE,tokudb_active_index); - if (error) { goto cleanup; } - - // - // now that range_query_buff is filled, read an element - // + // we need to read the val of what we retrieve if + // we do NOT have a covering index AND we are using a clustering secondary + // key + bool need_val = (do_key_read == 0) && + (tokudb_active_index == primary_key || key_is_clustering(&table->key_info[tokudb_active_index])); + + if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) { error = read_data_from_range_query_buff(buf, need_val, do_key_read); } + else if (icp_went_out_of_range) { + icp_went_out_of_range = false; + error = HA_ERR_END_OF_FILE; + } else { - struct smart_dbt_info info; - info.ha = this; - info.buf = buf; - info.keynr = tokudb_active_index; + invalidate_bulk_fetch(); + if (doing_bulk_fetch) { + struct smart_dbt_bf_info bf_info; + bf_info.ha = this; + // you need the val if you have a clustering index and key_read is not 0; + bf_info.direction = direction; + bf_info.thd = ha_thd(); + bf_info.need_val = need_val; + bf_info.buf = buf; + bf_info.key_to_compare = key_to_compare; + // + // call c_getf_next with purpose of filling in range_query_buff + // + rows_fetched_using_bulk_fetch = 0; + // it is expected that we can do ICP in the smart_dbt_bf_callback + // as a result, it's possible we don't return any data because + // none of the rows matched the index condition. Therefore, we need + // this while loop. icp_out_of_range will be set if we hit a row that + // the index condition states is out of our range. When that hits, + // we know all the data in the buffer is the last data we will retrieve + while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) { + if (direction > 0) { + error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info); + } else { + error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info); + } + } + // if there is no data set and we went out of range, + // then there is nothing to return + if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) { + icp_went_out_of_range = false; + error = HA_ERR_END_OF_FILE; + } + if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) { + bulk_fetch_iteration++; + } - if (direction > 0) { - error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); - } else { - error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); + error = handle_cursor_error(error, HA_ERR_END_OF_FILE,tokudb_active_index); + if (error) { goto cleanup; } + + // + // now that range_query_buff is filled, read an element + // + error = read_data_from_range_query_buff(buf, need_val, do_key_read); + } + else { + struct smart_dbt_info info; + info.ha = this; + info.buf = buf; + info.keynr = tokudb_active_index; + + if (direction > 0) { + error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); + } else { + error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); + } + error = handle_cursor_error(error, HA_ERR_END_OF_FILE, tokudb_active_index); } - error = handle_cursor_error(error, HA_ERR_END_OF_FILE, tokudb_active_index); } } @@ -5436,12 +5380,15 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_ // read the full row by doing a point query into the // main table. // - if (!error && !do_key_read && (tokudb_active_index != primary_key) && !key_is_clustering(&table->key_info[tokudb_active_index])) { error = read_full_row(buf); } - trx->stmt_progress.queried++; - track_progress(thd); + + if (!error) { + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); + trx->stmt_progress.queried++; + track_progress(ha_thd()); + } cleanup: return error; } @@ -5501,7 +5448,7 @@ int ha_tokudb::index_first(uchar * buf) { struct smart_dbt_info info; uint32_t flags = SET_PRELOCK_FLAG(0); THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);; HANDLE_INVALID_CURSOR(); ha_statistic_increment(&SSV::ha_read_first_count); @@ -5510,8 +5457,7 @@ int ha_tokudb::index_first(uchar * buf) { info.buf = buf; info.keynr = tokudb_active_index; - error = cursor->c_getf_first(cursor, flags, - SMART_DBT_CALLBACK(key_read), &info); + error = cursor->c_getf_first(cursor, flags, SMART_DBT_CALLBACK(key_read), &info); error = handle_cursor_error(error,HA_ERR_END_OF_FILE,tokudb_active_index); // @@ -5521,9 +5467,11 @@ int ha_tokudb::index_first(uchar * buf) { if (!error && !key_read && (tokudb_active_index != primary_key) && !key_is_clustering(&table->key_info[tokudb_active_index])) { error = read_full_row(buf); } - trx->stmt_progress.queried++; + if (trx) { + trx->stmt_progress.queried++; + } track_progress(thd); - + maybe_index_scan = true; cleanup: TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -5544,7 +5492,7 @@ int ha_tokudb::index_last(uchar * buf) { struct smart_dbt_info info; uint32_t flags = SET_PRELOCK_FLAG(0); THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);; HANDLE_INVALID_CURSOR(); ha_statistic_increment(&SSV::ha_read_last_count); @@ -5553,8 +5501,7 @@ int ha_tokudb::index_last(uchar * buf) { info.buf = buf; info.keynr = tokudb_active_index; - error = cursor->c_getf_last(cursor, flags, - SMART_DBT_CALLBACK(key_read), &info); + error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK(key_read), &info); error = handle_cursor_error(error,HA_ERR_END_OF_FILE,tokudb_active_index); // // still need to get entire contents of the row if operation done on @@ -5568,6 +5515,7 @@ int ha_tokudb::index_last(uchar * buf) { trx->stmt_progress.queried++; } track_progress(thd); + maybe_index_scan = true; cleanup: TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -5635,7 +5583,7 @@ int ha_tokudb::rnd_next(uchar * buf) { void ha_tokudb::track_progress(THD* thd) { - tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); if (trx) { ulonglong num_written = trx->stmt_progress.inserted + trx->stmt_progress.updated + trx->stmt_progress.deleted; bool update_status = @@ -5691,13 +5639,11 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) { DBUG_RETURN(to); } -// // Retrieves a row with based on the primary key saved in pos // Returns: // 0 on success // HA_ERR_KEY_NOT_FOUND if not found // error otherwise -// int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { TOKUDB_HANDLER_DBUG_ENTER(""); DBT db_pos; @@ -5710,12 +5656,20 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ha_statistic_increment(&SSV::ha_read_rnd_count); tokudb_active_index = MAX_KEY; + // test rpl slave by inducing a delay before the point query + THD *thd = ha_thd(); + if (thd->slave_thread && (in_rpl_delete_rows || in_rpl_update_rows)) { + uint64_t delay_ms = THDVAR(thd, rpl_lookup_rows_delay); + if (delay_ms) + usleep(delay_ms * 1000); + } + info.ha = this; info.buf = buf; info.keynr = primary_key; error = share->file->getf_set(share->file, transaction, - get_cursor_isolation_flags(lock.type, ha_thd()), + get_cursor_isolation_flags(lock.type, thd), key, smart_dbt_callback_rowread_ptquery, &info); if (error == DB_NOTFOUND) { @@ -5727,8 +5681,8 @@ cleanup: TOKUDB_HANDLER_DBUG_RETURN(error); } -int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) { - TOKUDB_HANDLER_DBUG_ENTER(""); +int ha_tokudb::prelock_range(const key_range *start_key, const key_range *end_key) { + TOKUDB_HANDLER_DBUG_ENTER("%p %p", start_key, end_key); THD* thd = ha_thd(); int error = 0; @@ -5793,11 +5747,8 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k goto cleanup; } - // // at this point, determine if we will be doing bulk fetch - // as of now, only do it if we are doing a select - // - doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT); + doing_bulk_fetch = tokudb_do_bulk_fetch(thd); bulk_fetch_iteration = 0; rows_fetched_using_bulk_fetch = 0; @@ -5812,7 +5763,7 @@ cleanup: // Forward scans use read_range_first()/read_range_next(). // int ha_tokudb::prepare_range_scan( const key_range *start_key, const key_range *end_key) { - TOKUDB_HANDLER_DBUG_ENTER(""); + TOKUDB_HANDLER_DBUG_ENTER("%p %p", start_key, end_key); int error = prelock_range(start_key, end_key); if (!error) { range_lock_grabbed = true; @@ -5826,7 +5777,7 @@ int ha_tokudb::read_range_first( bool eq_range, bool sorted) { - TOKUDB_HANDLER_DBUG_ENTER(""); + TOKUDB_HANDLER_DBUG_ENTER("%p %p %u %u", start_key, end_key, eq_range, sorted); int error = prelock_range(start_key, end_key); if (error) { goto cleanup; } range_lock_grabbed = true; @@ -6225,12 +6176,11 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { } int error = 0; - tokudb_trx_data *trx = NULL; - trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); if (!trx) { error = create_tokudb_trx_data_instance(&trx); if (error) { goto cleanup; } - thd_data_set(thd, tokudb_hton->slot, trx); + thd_set_ha_data(thd, tokudb_hton, trx); } if (trx->all == NULL) { trx->sp_level = NULL; @@ -6304,7 +6254,7 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { TOKUDB_HANDLER_TRACE("q %s", thd->query()); int error = 0; - tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); DBUG_ASSERT(trx); /* @@ -6404,7 +6354,7 @@ uint32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, THD lock (if we don't want to use MySQL table locks at all) or add locks for many tables (like we do when we are using a MERGE handler). - Tokudb DB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which + TokuDB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which signals that we are doing WRITES, but we are still allowing other reader's and writer's. @@ -6426,34 +6376,25 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l } if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) { - // if creating a hot index - if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX && get_create_index_online(thd)) { - rw_rdlock(&share->num_DBs_lock); - if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) { - lock_type = TL_WRITE_ALLOW_WRITE; - } - lock.type = lock_type; - rw_unlock(&share->num_DBs_lock); - } - - // 5.5 supports reads concurrent with alter table. just use the default lock type. -#if MYSQL_VERSION_ID < 50500 - else if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX || - thd_sql_command(thd)== SQLCOM_ALTER_TABLE || - thd_sql_command(thd)== SQLCOM_DROP_INDEX) { - // force alter table to lock out other readers - lock_type = TL_WRITE; - lock.type = lock_type; - } -#endif - else { - // If we are not doing a LOCK TABLE, then allow multiple writers - if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && - !thd->in_lock_tables && thd_sql_command(thd) != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) { + enum_sql_command sql_command = (enum_sql_command) thd_sql_command(thd); + if (!thd->in_lock_tables) { + if (sql_command == SQLCOM_CREATE_INDEX && get_create_index_online(thd)) { + // hot indexing + rw_rdlock(&share->num_DBs_lock); + if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) { + lock_type = TL_WRITE_ALLOW_WRITE; + } + rw_unlock(&share->num_DBs_lock); + } else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && + sql_command != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) { + // allow concurrent writes lock_type = TL_WRITE_ALLOW_WRITE; + } else if (sql_command == SQLCOM_OPTIMIZE && lock_type == TL_READ_NO_INSERT) { + // hot optimize table + lock_type = TL_READ; } - lock.type = lock_type; } + lock.type = lock_type; } *to++ = &lock; if (tokudb_debug & TOKUDB_DEBUG_LOCK) @@ -6909,7 +6850,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); if (newname == NULL){ error = ENOMEM; goto cleanup;} - trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot); + trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); if (trx && trx->sub_sp_level && thd_sql_command(thd) == SQLCOM_CREATE_TABLE) { txn = trx->sub_sp_level; } @@ -6946,7 +6887,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in if (error) { goto cleanup; } #if WITH_PARTITION_STORAGE_ENGINE - if (TOKU_PARTITION_WRITE_FRM_DATA || IF_PARTITIONING(form->part_info, NULL) == NULL) { + if (TOKU_PARTITION_WRITE_FRM_DATA || form->part_info == NULL) { error = write_frm_data(status_block, txn, form->s->path.str); if (error) { goto cleanup; } } @@ -7099,7 +7040,7 @@ int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_nam DB_TXN *parent_txn = NULL; tokudb_trx_data *trx = NULL; - trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); if (thd_sql_command(ha_thd()) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) { parent_txn = trx->sub_sp_level; } @@ -7540,7 +7481,7 @@ int ha_tokudb::tokudb_add_index( DBC* tmp_cursor = NULL; int cursor_ret_val = 0; DBT curr_pk_key, curr_pk_val; - THD* thd = ha_thd(); + THD* thd = ha_thd(); DB_LOADER* loader = NULL; DB_INDEXER* indexer = NULL; bool loader_save_space = get_load_save_space(thd); @@ -7578,7 +7519,7 @@ int ha_tokudb::tokudb_add_index( // // status message to be shown in "show process list" // - const char *old_proc_info = tokudb_thd_get_proc_info(thd); + const char *orig_proc_info = tokudb_thd_get_proc_info(thd); char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. ulonglong num_processed = 0; //variable that stores number of elements inserted thus far thd_proc_info(thd, "Adding indexes"); @@ -7804,14 +7745,15 @@ int ha_tokudb::tokudb_add_index( num_processed++; if ((num_processed % 1000) == 0) { - sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows); + sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", + num_processed, (long long unsigned) share->rows); thd_proc_info(thd, status_msg); #ifdef HA_TOKUDB_HAS_THD_PROGRESS thd_progress_report(thd, num_processed, (long long unsigned) share->rows); #endif - if (thd->killed) { + if (thd_killed(thd)) { error = ER_ABORTING_CONNECTION; goto cleanup; } @@ -7836,12 +7778,8 @@ int ha_tokudb::tokudb_add_index( for (uint i = 0; i < num_of_keys; i++, curr_index++) { if (key_info[i].flags & HA_NOSAME) { bool is_unique; - error = is_index_unique( - &is_unique, - txn, - share->key_file[curr_index], - &key_info[i] - ); + error = is_index_unique(&is_unique, txn, share->key_file[curr_index], &key_info[i], + creating_hot_index ? 0 : DB_PRELOCKED_WRITE); if (error) goto cleanup; if (!is_unique) { error = HA_ERR_FOUND_DUPP_KEY; @@ -7899,7 +7837,7 @@ cleanup: another transaction has accessed the table. \ To add indexes, make sure no transactions touch the table.", share->table_name); } - thd_proc_info(thd, old_proc_info); + thd_proc_info(thd, orig_proc_info); TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error); } @@ -8251,15 +8189,46 @@ void ha_tokudb::cleanup_txn(DB_TXN *txn) { } void ha_tokudb::add_to_trx_handler_list() { - tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot); + tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); trx->handlers = list_add(trx->handlers, &trx_handler_list); } void ha_tokudb::remove_from_trx_handler_list() { - tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot); + tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); trx->handlers = list_delete(trx->handlers, &trx_handler_list); } +void ha_tokudb::rpl_before_write_rows() { + in_rpl_write_rows = true; +} + +void ha_tokudb::rpl_after_write_rows() { + in_rpl_write_rows = false; +} + +void ha_tokudb::rpl_before_delete_rows() { + in_rpl_delete_rows = true; +} + +void ha_tokudb::rpl_after_delete_rows() { + in_rpl_delete_rows = false; +} + +void ha_tokudb::rpl_before_update_rows() { + in_rpl_update_rows = true; +} + +void ha_tokudb::rpl_after_update_rows() { + in_rpl_update_rows = false; +} + +bool ha_tokudb::rpl_lookup_rows() { + if (!in_rpl_delete_rows && !in_rpl_update_rows) + return true; + else + return THDVAR(ha_thd(), rpl_lookup_rows); +} + // table admin #include "ha_tokudb_admin.cc" |