summaryrefslogtreecommitdiff
path: root/storage/tokudb/ha_tokudb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/ha_tokudb.cc')
-rw-r--r--storage/tokudb/ha_tokudb.cc573
1 files changed, 271 insertions, 302 deletions
diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc
index 0cef79ed32c..0c2310f6685 100644
--- a/storage/tokudb/ha_tokudb.cc
+++ b/storage/tokudb/ha_tokudb.cc
@@ -120,14 +120,6 @@ extern "C" {
#include "hatoku_defines.h"
#include "hatoku_cmp.h"
-static inline void *thd_data_get(THD *thd, int slot) {
- return thd->ha_data[slot].ha_ptr;
-}
-
-static inline void thd_data_set(THD *thd, int slot, void *data) {
- thd->ha_data[slot].ha_ptr = data;
-}
-
static inline uint get_key_parts(const KEY *key);
#undef PACKAGE
@@ -144,8 +136,8 @@ static inline uint get_key_parts(const KEY *key);
#include "tokudb_buffer.h"
#include "tokudb_status.h"
#include "tokudb_card.h"
-#include "hatoku_hton.h"
#include "ha_tokudb.h"
+#include "hatoku_hton.h"
#include <mysql/plugin.h>
static const char *ha_tokudb_exts[] = {
@@ -477,10 +469,9 @@ typedef struct index_read_info {
DBT* orig_key;
} *INDEX_READ_INFO;
-
static int ai_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
- if (context->thd->killed) {
+ if (thd_killed(context->thd)) {
sprintf(context->write_status_msg, "The process has been killed, aborting add index.");
return ER_ABORTING_CONNECTION;
}
@@ -495,7 +486,7 @@ static int ai_poll_fun(void *extra, float progress) {
static int loader_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
- if (context->thd->killed) {
+ if (thd_killed(context->thd)) {
sprintf(context->write_status_msg, "The process has been killed, aborting bulk load.");
return ER_ABORTING_CONNECTION;
}
@@ -1016,8 +1007,7 @@ static uchar* pack_toku_field_blob(
static int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx) {
int error;
- tokudb_trx_data* trx = NULL;
- trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL));
+ tokudb_trx_data* trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL));
if (!trx) {
error = ENOMEM;
goto cleanup;
@@ -1259,6 +1249,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
tokudb_active_index = MAX_KEY;
invalidate_icp();
trx_handler_list.data = this;
+ in_rpl_write_rows = in_rpl_delete_rows = in_rpl_update_rows = false;
TOKUDB_HANDLER_DBUG_VOID_RETURN;
}
@@ -1614,8 +1605,7 @@ int ha_tokudb::initialize_share(
DB_TXN* txn = NULL;
bool do_commit = false;
THD* thd = ha_thd();
- tokudb_trx_data *trx = NULL;
- trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
+ tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
if (thd_sql_command(thd) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) {
txn = trx->sub_sp_level;
}
@@ -1649,8 +1639,7 @@ int ha_tokudb::initialize_share(
#if WITH_PARTITION_STORAGE_ENGINE
// verify frm data for non-partitioned tables
- if (TOKU_PARTITION_WRITE_FRM_DATA ||
- IF_PARTITIONING(table->part_info, NULL) == NULL) {
+ if (TOKU_PARTITION_WRITE_FRM_DATA || table->part_info == NULL) {
error = verify_frm_data(table->s->path.str, txn);
if (error)
goto exit;
@@ -1727,7 +1716,7 @@ int ha_tokudb::initialize_share(
}
share->ref_length = ref_length;
- error = estimate_num_rows(share->file,&num_rows, txn);
+ error = estimate_num_rows(share->file, &num_rows, txn);
//
// estimate_num_rows should not fail under normal conditions
//
@@ -1937,7 +1926,6 @@ exit:
//
int ha_tokudb::estimate_num_rows(DB* db, uint64_t* num_rows, DB_TXN* txn) {
int error = ENOSYS;
- DBC* crsr = NULL;
bool do_commit = false;
DB_BTREE_STAT64 dict_stats;
DB_TXN* txn_to_use = NULL;
@@ -1951,21 +1939,12 @@ int ha_tokudb::estimate_num_rows(DB* db, uint64_t* num_rows, DB_TXN* txn) {
txn_to_use = txn;
}
- error = db->stat64(
- share->file,
- txn_to_use,
- &dict_stats
- );
+ error = db->stat64(db, txn_to_use, &dict_stats);
if (error) { goto cleanup; }
*num_rows = dict_stats.bt_ndata;
error = 0;
cleanup:
- if (crsr != NULL) {
- int r = crsr->c_close(crsr);
- assert(r==0);
- crsr = NULL;
- }
if (do_commit) {
commit_txn(txn_to_use, 0);
txn_to_use = NULL;
@@ -3271,7 +3250,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
TOKUDB_HANDLER_DBUG_ENTER("%llu txn %p", (unsigned long long) rows, transaction);
#endif
THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
delay_updating_ai_metadata = true;
ai_metadata_update_required = false;
abort_loader = false;
@@ -3281,7 +3260,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
num_DBs_locked_in_bulk = true;
lock_count = 0;
- if (share->try_table_lock) {
+ if ((rows == 0 || rows > 1) && share->try_table_lock) {
if (get_prelock_empty(thd) && may_table_be_empty(transaction)) {
if (using_ignore || is_insert_ignore(thd) || thd->lex->duplicates != DUP_ERROR
|| table->s->next_number_key_offset) {
@@ -3340,7 +3319,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
TOKUDB_HANDLER_DBUG_ENTER("");
int error = 0;
THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
bool using_loader = (loader != NULL);
if (ai_metadata_update_required) {
tokudb_pthread_mutex_lock(&share->mutex);
@@ -3352,17 +3331,17 @@ int ha_tokudb::end_bulk_insert(bool abort) {
ai_metadata_update_required = false;
loader_error = 0;
if (loader) {
- if (!abort_loader && !thd->killed) {
+ if (!abort_loader && !thd_killed(thd)) {
DBUG_EXECUTE_IF("tokudb_end_bulk_insert_sleep", {
- const char *old_proc_info = tokudb_thd_get_proc_info(thd);
+ const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
thd_proc_info(thd, "DBUG sleep");
my_sleep(20000000);
- thd_proc_info(thd, old_proc_info);
+ thd_proc_info(thd, orig_proc_info);
});
error = loader->close(loader);
loader = NULL;
if (error) {
- if (thd->killed) {
+ if (thd_killed(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
}
goto cleanup;
@@ -3374,12 +3353,8 @@ int ha_tokudb::end_bulk_insert(bool abort) {
if (i == primary_key && !share->pk_has_string) {
continue;
}
- error = is_index_unique(
- &is_unique,
- transaction,
- share->key_file[i],
- &table->key_info[i]
- );
+ error = is_index_unique(&is_unique, transaction, share->key_file[i], &table->key_info[i],
+ DB_PRELOCKED_WRITE);
if (error) goto cleanup;
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
@@ -3419,6 +3394,7 @@ cleanup:
}
}
trx->stmt_progress.using_loader = false;
+ thd_proc_info(thd, 0);
TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error);
}
@@ -3426,7 +3402,7 @@ int ha_tokudb::end_bulk_insert() {
return end_bulk_insert( false );
}
-int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) {
+int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info, int lock_flags) {
int error;
DBC* tmp_cursor1 = NULL;
DBC* tmp_cursor2 = NULL;
@@ -3434,7 +3410,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
uint64_t cnt = 0;
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
THD* thd = ha_thd();
- const char *old_proc_info = tokudb_thd_get_proc_info(thd);
+ const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
memset(&key1, 0, sizeof(key1));
memset(&key2, 0, sizeof(key2));
memset(&val, 0, sizeof(val));
@@ -3442,49 +3418,23 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
memset(&packed_key2, 0, sizeof(packed_key2));
*is_unique = true;
- error = db->cursor(
- db,
- txn,
- &tmp_cursor1,
- DB_SERIALIZABLE
- );
+ error = db->cursor(db, txn, &tmp_cursor1, DB_SERIALIZABLE);
if (error) { goto cleanup; }
- error = db->cursor(
- db,
- txn,
- &tmp_cursor2,
- DB_SERIALIZABLE
- );
+ error = db->cursor(db, txn, &tmp_cursor2, DB_SERIALIZABLE);
if (error) { goto cleanup; }
-
- error = tmp_cursor1->c_get(
- tmp_cursor1,
- &key1,
- &val,
- DB_NEXT
- );
+ error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) { goto cleanup; }
- error = tmp_cursor2->c_get(
- tmp_cursor2,
- &key2,
- &val,
- DB_NEXT
- );
+ error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
if (error) { goto cleanup; }
- error = tmp_cursor2->c_get(
- tmp_cursor2,
- &key2,
- &val,
- DB_NEXT
- );
+ error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
@@ -3496,59 +3446,25 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
bool has_null1;
bool has_null2;
int cmp;
- place_key_into_mysql_buff(
- key_info,
- table->record[0],
- (uchar *) key1.data + 1
- );
- place_key_into_mysql_buff(
- key_info,
- table->record[1],
- (uchar *) key2.data + 1
- );
+ place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key1.data + 1);
+ place_key_into_mysql_buff(key_info, table->record[1], (uchar *) key2.data + 1);
- create_dbt_key_for_lookup(
- &packed_key1,
- key_info,
- key_buff,
- table->record[0],
- &has_null1
- );
- create_dbt_key_for_lookup(
- &packed_key2,
- key_info,
- key_buff2,
- table->record[1],
- &has_null2
- );
+ create_dbt_key_for_lookup(&packed_key1, key_info, key_buff, table->record[0], &has_null1);
+ create_dbt_key_for_lookup(&packed_key2, key_info, key_buff2, table->record[1], &has_null2);
if (!has_null1 && !has_null2) {
cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2);
if (cmp == 0) {
memcpy(key_buff, key1.data, key1.size);
- place_key_into_mysql_buff(
- key_info,
- table->record[0],
- (uchar *) key_buff + 1
- );
+ place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key_buff + 1);
*is_unique = false;
break;
}
}
- error = tmp_cursor1->c_get(
- tmp_cursor1,
- &key1,
- &val,
- DB_NEXT
- );
+ error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
if (error) { goto cleanup; }
- error = tmp_cursor2->c_get(
- tmp_cursor2,
- &key2,
- &val,
- DB_NEXT
- );
+ error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
if (error && (error != DB_NOTFOUND)) { goto cleanup; }
cnt++;
@@ -3560,7 +3476,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
share->rows,
key_info->name);
thd_proc_info(thd, status_msg);
- if (thd->killed) {
+ if (thd_killed(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
error = ER_QUERY_INTERRUPTED;
goto cleanup;
@@ -3571,7 +3487,7 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
error = 0;
cleanup:
- thd_proc_info(thd, old_proc_info);
+ thd_proc_info(thd, orig_proc_info);
if (tmp_cursor1) {
tmp_cursor1->c_close(tmp_cursor1);
tmp_cursor1 = NULL;
@@ -3646,12 +3562,27 @@ cleanup:
return error;
}
+static void maybe_do_unique_checks_delay(THD *thd) {
+ if (thd->slave_thread) {
+ uint64_t delay_ms = THDVAR(thd, rpl_unique_checks_delay);
+ if (delay_ms)
+ usleep(delay_ms * 1000);
+ }
+}
+
+static bool do_unique_checks(THD *thd, bool do_rpl_event) {
+ if (do_rpl_event && thd->slave_thread && opt_readonly && !THDVAR(thd, rpl_unique_checks))
+ return false;
+ else
+ return !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS);
+}
+
int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
- int error;
+ int error = 0;
//
// first do uniqueness checks
//
- if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
+ if (share->has_unique_keys && do_unique_checks(thd, in_rpl_write_rows)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
bool is_unique = false;
@@ -3664,13 +3595,18 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
if (!is_unique_key) {
continue;
}
+
+ maybe_do_unique_checks_delay(thd);
+
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
error = is_val_unique(&is_unique, record, &table->key_info[keynr], keynr, txn);
- if (error) { goto cleanup; }
+ if (error) {
+ goto cleanup;
+ }
if (!is_unique) {
error = DB_KEYEXIST;
last_dup_key = keynr;
@@ -3678,7 +3614,6 @@ int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
}
}
}
- error = 0;
cleanup:
return error;
}
@@ -3781,15 +3716,8 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) {
tokudb_my_free(tmp_pk_val_data);
}
-//
// set the put flags for the main dictionary
-//
-void ha_tokudb::set_main_dict_put_flags(
- THD* thd,
- bool opt_eligible,
- uint32_t* put_flags
- )
-{
+void ha_tokudb::set_main_dict_put_flags(THD* thd, bool opt_eligible, uint32_t* put_flags) {
uint32_t old_prelock_flags = 0;
uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
bool in_hot_index = share->num_DBs > curr_num_DBs;
@@ -3809,8 +3737,7 @@ void ha_tokudb::set_main_dict_put_flags(
{
*put_flags = old_prelock_flags;
}
- else if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)
- && !is_replace_into(thd) && !is_insert_ignore(thd))
+ else if (!do_unique_checks(thd, in_rpl_write_rows | in_rpl_update_rows) && !is_replace_into(thd) && !is_insert_ignore(thd))
{
*put_flags = old_prelock_flags;
}
@@ -3832,22 +3759,18 @@ void ha_tokudb::set_main_dict_put_flags(
int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) {
int error = 0;
- uint32_t put_flags = mult_put_flags[primary_key];
- THD *thd = ha_thd();
uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
-
assert(curr_num_DBs == 1);
-
+
+ uint32_t put_flags = mult_put_flags[primary_key];
+ THD *thd = ha_thd();
set_main_dict_put_flags(thd, true, &put_flags);
- error = share->file->put(
- share->file,
- txn,
- pk_key,
- pk_val,
- put_flags
- );
+ // for test, make unique checks have a very long duration
+ if ((put_flags & DB_OPFLAGS_MASK) == DB_NOOVERWRITE)
+ maybe_do_unique_checks_delay(thd);
+ error = share->file->put(share->file, txn, pk_key, pk_val, put_flags);
if (error) {
last_dup_key = primary_key;
goto cleanup;
@@ -3861,14 +3784,18 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
int error = 0;
uint curr_num_DBs = share->num_DBs;
set_main_dict_put_flags(thd, true, &mult_put_flags[primary_key]);
- uint32_t i, flags = mult_put_flags[primary_key];
+ uint32_t flags = mult_put_flags[primary_key];
+
+ // for test, make unique checks have a very long duration
+ if ((flags & DB_OPFLAGS_MASK) == DB_NOOVERWRITE)
+ maybe_do_unique_checks_delay(thd);
// the insert ignore optimization uses DB_NOOVERWRITE_NO_ERROR,
// which is not allowed with env->put_multiple.
// we have to insert the rows one by one in this case.
if (flags & DB_NOOVERWRITE_NO_ERROR) {
DB * src_db = share->key_file[primary_key];
- for (i = 0; i < curr_num_DBs; i++) {
+ for (uint32_t i = 0; i < curr_num_DBs; i++) {
DB * db = share->key_file[i];
if (i == primary_key) {
// if it's the primary key, insert the rows
@@ -3929,7 +3856,7 @@ out:
// error otherwise
//
int ha_tokudb::write_row(uchar * record) {
- TOKUDB_HANDLER_DBUG_ENTER("");
+ TOKUDB_HANDLER_DBUG_ENTER("%p", record);
DBT row, prim_key;
int error;
@@ -3967,10 +3894,7 @@ int ha_tokudb::write_row(uchar * record) {
if (share->has_auto_inc && record == table->record[0]) {
tokudb_pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
- table->field[share->ai_field_index]->key_type(),
- field_offset(table->field[share->ai_field_index], table),
- record
- );
+ table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), record);
if (curr_auto_inc > share->last_auto_increment) {
share->last_auto_increment = curr_auto_inc;
if (delay_updating_ai_metadata) {
@@ -4072,7 +3996,7 @@ int ha_tokudb::write_row(uchar * record) {
}
}
- trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
if (!error) {
added_rows++;
trx->stmt_progress.inserted++;
@@ -4129,7 +4053,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
THD* thd = ha_thd();
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
uint curr_num_DBs;
LINT_INIT(error);
@@ -4138,7 +4062,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
memset((void *) &prim_row, 0, sizeof(prim_row));
memset((void *) &old_prim_row, 0, sizeof(old_prim_row));
-
ha_statistic_increment(&SSV::ha_update_count);
#if MYSQL_VERSION_ID < 50600
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) {
@@ -4185,7 +4108,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
txn = using_ignore ? sub_trans : transaction;
-
if (hidden_primary_key) {
memset((void *) &prim_key, 0, sizeof(prim_key));
prim_key.data = (void *) current_ident;
@@ -4197,10 +4119,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row, &has_null);
}
- //
// do uniqueness checks
- //
- if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
+ if (share->has_unique_keys && do_unique_checks(thd, in_rpl_update_rows)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = (table->key_info[keynr].flags & HA_NOSAME) || (keynr == primary_key);
if (keynr == primary_key && !share->pk_has_string) {
@@ -4241,6 +4161,10 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
set_main_dict_put_flags(thd, false, &mult_put_flags[primary_key]);
+ // for test, make unique checks have a very long duration
+ if ((mult_put_flags[primary_key] & DB_OPFLAGS_MASK) == DB_NOOVERWRITE)
+ maybe_do_unique_checks_delay(thd);
+
error = db_env->update_multiple(
db_env,
share->key_file[primary_key],
@@ -4303,7 +4227,7 @@ int ha_tokudb::delete_row(const uchar * record) {
bool has_null;
THD* thd = ha_thd();
uint curr_num_DBs;
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
ha_statistic_increment(&SSV::ha_delete_count);
@@ -4459,6 +4383,20 @@ static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint k
return key_can_be_null && key_len > 0 && key[0] != 0;
}
+// Return true if bulk fetch can be used
+static bool tokudb_do_bulk_fetch(THD *thd) {
+ switch (thd_sql_command(thd)) {
+ case SQLCOM_SELECT:
+ case SQLCOM_CREATE_TABLE:
+ case SQLCOM_INSERT_SELECT:
+ case SQLCOM_REPLACE_SELECT:
+ case SQLCOM_DELETE:
+ return THDVAR(thd, bulk_fetch) != 0;
+ default:
+ return false;
+ }
+}
+
//
// Notification that a range query getting all elements that equal a key
// to take place. Will pre acquire read lock
@@ -4467,7 +4405,7 @@ static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint k
// error otherwise
//
int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) {
- TOKUDB_HANDLER_DBUG_ENTER("");
+ TOKUDB_HANDLER_DBUG_ENTER("%p %u", key, key_len);
int error = 0;
DBT start_key, end_key;
THD* thd = ha_thd();
@@ -4491,7 +4429,7 @@ int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) {
range_lock_grabbed = true;
range_lock_grabbed_null = index_key_is_null(table, tokudb_active_index, key, key_len);
- doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT);
+ doing_bulk_fetch = tokudb_do_bulk_fetch(thd);
bulk_fetch_iteration = 0;
rows_fetched_using_bulk_fetch = 0;
error = 0;
@@ -4603,6 +4541,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
}
invalidate_bulk_fetch();
doing_bulk_fetch = false;
+ maybe_index_scan = false;
error = 0;
exit:
TOKUDB_HANDLER_DBUG_RETURN(error);
@@ -4870,7 +4809,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
int error = 0;
uint32_t flags = 0;
THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
struct smart_dbt_info info;
struct index_read_info ir_info;
@@ -5345,86 +5284,91 @@ cleanup:
}
int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_key_read) {
- int error = 0;
- uint32_t flags = SET_PRELOCK_FLAG(0);
- THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
- bool need_val;
+ int error = 0;
HANDLE_INVALID_CURSOR();
- // we need to read the val of what we retrieve if
- // we do NOT have a covering index AND we are using a clustering secondary
- // key
- need_val = (do_key_read == 0) &&
- (tokudb_active_index == primary_key ||
- key_is_clustering(&table->key_info[tokudb_active_index])
- );
-
- if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) {
- error = read_data_from_range_query_buff(buf, need_val, do_key_read);
- }
- else if (icp_went_out_of_range) {
- icp_went_out_of_range = false;
- error = HA_ERR_END_OF_FILE;
+ if (maybe_index_scan) {
+ maybe_index_scan = false;
+ if (!range_lock_grabbed) {
+ error = prepare_index_scan();
+ }
}
- else {
- invalidate_bulk_fetch();
- if (doing_bulk_fetch) {
- struct smart_dbt_bf_info bf_info;
- bf_info.ha = this;
- // you need the val if you have a clustering index and key_read is not 0;
- bf_info.direction = direction;
- bf_info.thd = ha_thd();
- bf_info.need_val = need_val;
- bf_info.buf = buf;
- bf_info.key_to_compare = key_to_compare;
- //
- // call c_getf_next with purpose of filling in range_query_buff
- //
- rows_fetched_using_bulk_fetch = 0;
- // it is expected that we can do ICP in the smart_dbt_bf_callback
- // as a result, it's possible we don't return any data because
- // none of the rows matched the index condition. Therefore, we need
- // this while loop. icp_out_of_range will be set if we hit a row that
- // the index condition states is out of our range. When that hits,
- // we know all the data in the buffer is the last data we will retrieve
- while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) {
- if (direction > 0) {
- error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info);
- } else {
- error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info);
- }
- }
- // if there is no data set and we went out of range,
- // then there is nothing to return
- if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) {
- icp_went_out_of_range = false;
- error = HA_ERR_END_OF_FILE;
- }
- if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
- bulk_fetch_iteration++;
- }
+
+ if (!error) {
+ uint32_t flags = SET_PRELOCK_FLAG(0);
- error = handle_cursor_error(error, HA_ERR_END_OF_FILE,tokudb_active_index);
- if (error) { goto cleanup; }
-
- //
- // now that range_query_buff is filled, read an element
- //
+ // we need to read the val of what we retrieve if
+ // we do NOT have a covering index AND we are using a clustering secondary
+ // key
+ bool need_val = (do_key_read == 0) &&
+ (tokudb_active_index == primary_key || key_is_clustering(&table->key_info[tokudb_active_index]));
+
+ if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) {
error = read_data_from_range_query_buff(buf, need_val, do_key_read);
}
+ else if (icp_went_out_of_range) {
+ icp_went_out_of_range = false;
+ error = HA_ERR_END_OF_FILE;
+ }
else {
- struct smart_dbt_info info;
- info.ha = this;
- info.buf = buf;
- info.keynr = tokudb_active_index;
+ invalidate_bulk_fetch();
+ if (doing_bulk_fetch) {
+ struct smart_dbt_bf_info bf_info;
+ bf_info.ha = this;
+ // you need the val if you have a clustering index and key_read is not 0;
+ bf_info.direction = direction;
+ bf_info.thd = ha_thd();
+ bf_info.need_val = need_val;
+ bf_info.buf = buf;
+ bf_info.key_to_compare = key_to_compare;
+ //
+ // call c_getf_next with purpose of filling in range_query_buff
+ //
+ rows_fetched_using_bulk_fetch = 0;
+ // it is expected that we can do ICP in the smart_dbt_bf_callback
+ // as a result, it's possible we don't return any data because
+ // none of the rows matched the index condition. Therefore, we need
+ // this while loop. icp_out_of_range will be set if we hit a row that
+ // the index condition states is out of our range. When that hits,
+ // we know all the data in the buffer is the last data we will retrieve
+ while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) {
+ if (direction > 0) {
+ error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info);
+ } else {
+ error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info);
+ }
+ }
+ // if there is no data set and we went out of range,
+ // then there is nothing to return
+ if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) {
+ icp_went_out_of_range = false;
+ error = HA_ERR_END_OF_FILE;
+ }
+ if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
+ bulk_fetch_iteration++;
+ }
- if (direction > 0) {
- error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info);
- } else {
- error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info);
+ error = handle_cursor_error(error, HA_ERR_END_OF_FILE,tokudb_active_index);
+ if (error) { goto cleanup; }
+
+ //
+ // now that range_query_buff is filled, read an element
+ //
+ error = read_data_from_range_query_buff(buf, need_val, do_key_read);
+ }
+ else {
+ struct smart_dbt_info info;
+ info.ha = this;
+ info.buf = buf;
+ info.keynr = tokudb_active_index;
+
+ if (direction > 0) {
+ error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info);
+ } else {
+ error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info);
+ }
+ error = handle_cursor_error(error, HA_ERR_END_OF_FILE, tokudb_active_index);
}
- error = handle_cursor_error(error, HA_ERR_END_OF_FILE, tokudb_active_index);
}
}
@@ -5436,12 +5380,15 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_
// read the full row by doing a point query into the
// main table.
//
-
if (!error && !do_key_read && (tokudb_active_index != primary_key) && !key_is_clustering(&table->key_info[tokudb_active_index])) {
error = read_full_row(buf);
}
- trx->stmt_progress.queried++;
- track_progress(thd);
+
+ if (!error) {
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
+ trx->stmt_progress.queried++;
+ track_progress(ha_thd());
+ }
cleanup:
return error;
}
@@ -5501,7 +5448,7 @@ int ha_tokudb::index_first(uchar * buf) {
struct smart_dbt_info info;
uint32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
HANDLE_INVALID_CURSOR();
ha_statistic_increment(&SSV::ha_read_first_count);
@@ -5510,8 +5457,7 @@ int ha_tokudb::index_first(uchar * buf) {
info.buf = buf;
info.keynr = tokudb_active_index;
- error = cursor->c_getf_first(cursor, flags,
- SMART_DBT_CALLBACK(key_read), &info);
+ error = cursor->c_getf_first(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
error = handle_cursor_error(error,HA_ERR_END_OF_FILE,tokudb_active_index);
//
@@ -5521,9 +5467,11 @@ int ha_tokudb::index_first(uchar * buf) {
if (!error && !key_read && (tokudb_active_index != primary_key) && !key_is_clustering(&table->key_info[tokudb_active_index])) {
error = read_full_row(buf);
}
- trx->stmt_progress.queried++;
+ if (trx) {
+ trx->stmt_progress.queried++;
+ }
track_progress(thd);
-
+ maybe_index_scan = true;
cleanup:
TOKUDB_HANDLER_DBUG_RETURN(error);
}
@@ -5544,7 +5492,7 @@ int ha_tokudb::index_last(uchar * buf) {
struct smart_dbt_info info;
uint32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd();
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
HANDLE_INVALID_CURSOR();
ha_statistic_increment(&SSV::ha_read_last_count);
@@ -5553,8 +5501,7 @@ int ha_tokudb::index_last(uchar * buf) {
info.buf = buf;
info.keynr = tokudb_active_index;
- error = cursor->c_getf_last(cursor, flags,
- SMART_DBT_CALLBACK(key_read), &info);
+ error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK(key_read), &info);
error = handle_cursor_error(error,HA_ERR_END_OF_FILE,tokudb_active_index);
//
// still need to get entire contents of the row if operation done on
@@ -5568,6 +5515,7 @@ int ha_tokudb::index_last(uchar * buf) {
trx->stmt_progress.queried++;
}
track_progress(thd);
+ maybe_index_scan = true;
cleanup:
TOKUDB_HANDLER_DBUG_RETURN(error);
}
@@ -5635,7 +5583,7 @@ int ha_tokudb::rnd_next(uchar * buf) {
void ha_tokudb::track_progress(THD* thd) {
- tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
if (trx) {
ulonglong num_written = trx->stmt_progress.inserted + trx->stmt_progress.updated + trx->stmt_progress.deleted;
bool update_status =
@@ -5691,13 +5639,11 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
DBUG_RETURN(to);
}
-//
// Retrieves a row with based on the primary key saved in pos
// Returns:
// 0 on success
// HA_ERR_KEY_NOT_FOUND if not found
// error otherwise
-//
int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
TOKUDB_HANDLER_DBUG_ENTER("");
DBT db_pos;
@@ -5710,12 +5656,20 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
ha_statistic_increment(&SSV::ha_read_rnd_count);
tokudb_active_index = MAX_KEY;
+ // test rpl slave by inducing a delay before the point query
+ THD *thd = ha_thd();
+ if (thd->slave_thread && (in_rpl_delete_rows || in_rpl_update_rows)) {
+ uint64_t delay_ms = THDVAR(thd, rpl_lookup_rows_delay);
+ if (delay_ms)
+ usleep(delay_ms * 1000);
+ }
+
info.ha = this;
info.buf = buf;
info.keynr = primary_key;
error = share->file->getf_set(share->file, transaction,
- get_cursor_isolation_flags(lock.type, ha_thd()),
+ get_cursor_isolation_flags(lock.type, thd),
key, smart_dbt_callback_rowread_ptquery, &info);
if (error == DB_NOTFOUND) {
@@ -5727,8 +5681,8 @@ cleanup:
TOKUDB_HANDLER_DBUG_RETURN(error);
}
-int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) {
- TOKUDB_HANDLER_DBUG_ENTER("");
+int ha_tokudb::prelock_range(const key_range *start_key, const key_range *end_key) {
+ TOKUDB_HANDLER_DBUG_ENTER("%p %p", start_key, end_key);
THD* thd = ha_thd();
int error = 0;
@@ -5793,11 +5747,8 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
goto cleanup;
}
- //
// at this point, determine if we will be doing bulk fetch
- // as of now, only do it if we are doing a select
- //
- doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT);
+ doing_bulk_fetch = tokudb_do_bulk_fetch(thd);
bulk_fetch_iteration = 0;
rows_fetched_using_bulk_fetch = 0;
@@ -5812,7 +5763,7 @@ cleanup:
// Forward scans use read_range_first()/read_range_next().
//
int ha_tokudb::prepare_range_scan( const key_range *start_key, const key_range *end_key) {
- TOKUDB_HANDLER_DBUG_ENTER("");
+ TOKUDB_HANDLER_DBUG_ENTER("%p %p", start_key, end_key);
int error = prelock_range(start_key, end_key);
if (!error) {
range_lock_grabbed = true;
@@ -5826,7 +5777,7 @@ int ha_tokudb::read_range_first(
bool eq_range,
bool sorted)
{
- TOKUDB_HANDLER_DBUG_ENTER("");
+ TOKUDB_HANDLER_DBUG_ENTER("%p %p %u %u", start_key, end_key, eq_range, sorted);
int error = prelock_range(start_key, end_key);
if (error) { goto cleanup; }
range_lock_grabbed = true;
@@ -6225,12 +6176,11 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
}
int error = 0;
- tokudb_trx_data *trx = NULL;
- trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
if (!trx) {
error = create_tokudb_trx_data_instance(&trx);
if (error) { goto cleanup; }
- thd_data_set(thd, tokudb_hton->slot, trx);
+ thd_set_ha_data(thd, tokudb_hton, trx);
}
if (trx->all == NULL) {
trx->sp_level = NULL;
@@ -6304,7 +6254,7 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_HANDLER_TRACE("q %s", thd->query());
int error = 0;
- tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
DBUG_ASSERT(trx);
/*
@@ -6404,7 +6354,7 @@ uint32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, THD
lock (if we don't want to use MySQL table locks at all) or add locks
for many tables (like we do when we are using a MERGE handler).
- Tokudb DB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which
+ TokuDB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which
signals that we are doing WRITES, but we are still allowing other
reader's and writer's.
@@ -6426,34 +6376,25 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l
}
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
- // if creating a hot index
- if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX && get_create_index_online(thd)) {
- rw_rdlock(&share->num_DBs_lock);
- if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) {
- lock_type = TL_WRITE_ALLOW_WRITE;
- }
- lock.type = lock_type;
- rw_unlock(&share->num_DBs_lock);
- }
-
- // 5.5 supports reads concurrent with alter table. just use the default lock type.
-#if MYSQL_VERSION_ID < 50500
- else if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX ||
- thd_sql_command(thd)== SQLCOM_ALTER_TABLE ||
- thd_sql_command(thd)== SQLCOM_DROP_INDEX) {
- // force alter table to lock out other readers
- lock_type = TL_WRITE;
- lock.type = lock_type;
- }
-#endif
- else {
- // If we are not doing a LOCK TABLE, then allow multiple writers
- if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) &&
- !thd->in_lock_tables && thd_sql_command(thd) != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) {
+ enum_sql_command sql_command = (enum_sql_command) thd_sql_command(thd);
+ if (!thd->in_lock_tables) {
+ if (sql_command == SQLCOM_CREATE_INDEX && get_create_index_online(thd)) {
+ // hot indexing
+ rw_rdlock(&share->num_DBs_lock);
+ if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) {
+ lock_type = TL_WRITE_ALLOW_WRITE;
+ }
+ rw_unlock(&share->num_DBs_lock);
+ } else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) &&
+ sql_command != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) {
+ // allow concurrent writes
lock_type = TL_WRITE_ALLOW_WRITE;
+ } else if (sql_command == SQLCOM_OPTIMIZE && lock_type == TL_READ_NO_INSERT) {
+ // hot optimize table
+ lock_type = TL_READ;
}
- lock.type = lock_type;
}
+ lock.type = lock_type;
}
*to++ = &lock;
if (tokudb_debug & TOKUDB_DEBUG_LOCK)
@@ -6909,7 +6850,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
- trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
+ trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
if (trx && trx->sub_sp_level && thd_sql_command(thd) == SQLCOM_CREATE_TABLE) {
txn = trx->sub_sp_level;
}
@@ -6946,7 +6887,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
if (error) { goto cleanup; }
#if WITH_PARTITION_STORAGE_ENGINE
- if (TOKU_PARTITION_WRITE_FRM_DATA || IF_PARTITIONING(form->part_info, NULL) == NULL) {
+ if (TOKU_PARTITION_WRITE_FRM_DATA || form->part_info == NULL) {
error = write_frm_data(status_block, txn, form->s->path.str);
if (error) { goto cleanup; }
}
@@ -7099,7 +7040,7 @@ int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_nam
DB_TXN *parent_txn = NULL;
tokudb_trx_data *trx = NULL;
- trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
+ trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
if (thd_sql_command(ha_thd()) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) {
parent_txn = trx->sub_sp_level;
}
@@ -7540,7 +7481,7 @@ int ha_tokudb::tokudb_add_index(
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT curr_pk_key, curr_pk_val;
- THD* thd = ha_thd();
+ THD* thd = ha_thd();
DB_LOADER* loader = NULL;
DB_INDEXER* indexer = NULL;
bool loader_save_space = get_load_save_space(thd);
@@ -7578,7 +7519,7 @@ int ha_tokudb::tokudb_add_index(
//
// status message to be shown in "show process list"
//
- const char *old_proc_info = tokudb_thd_get_proc_info(thd);
+ const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
thd_proc_info(thd, "Adding indexes");
@@ -7804,14 +7745,15 @@ int ha_tokudb::tokudb_add_index(
num_processed++;
if ((num_processed % 1000) == 0) {
- sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows);
+ sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.",
+ num_processed, (long long unsigned) share->rows);
thd_proc_info(thd, status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_report(thd, num_processed, (long long unsigned) share->rows);
#endif
- if (thd->killed) {
+ if (thd_killed(thd)) {
error = ER_ABORTING_CONNECTION;
goto cleanup;
}
@@ -7836,12 +7778,8 @@ int ha_tokudb::tokudb_add_index(
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_NOSAME) {
bool is_unique;
- error = is_index_unique(
- &is_unique,
- txn,
- share->key_file[curr_index],
- &key_info[i]
- );
+ error = is_index_unique(&is_unique, txn, share->key_file[curr_index], &key_info[i],
+ creating_hot_index ? 0 : DB_PRELOCKED_WRITE);
if (error) goto cleanup;
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
@@ -7899,7 +7837,7 @@ cleanup:
another transaction has accessed the table. \
To add indexes, make sure no transactions touch the table.", share->table_name);
}
- thd_proc_info(thd, old_proc_info);
+ thd_proc_info(thd, orig_proc_info);
TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error);
}
@@ -8251,15 +8189,46 @@ void ha_tokudb::cleanup_txn(DB_TXN *txn) {
}
void ha_tokudb::add_to_trx_handler_list() {
- tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
+ tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
trx->handlers = list_add(trx->handlers, &trx_handler_list);
}
void ha_tokudb::remove_from_trx_handler_list() {
- tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
+ tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
trx->handlers = list_delete(trx->handlers, &trx_handler_list);
}
+void ha_tokudb::rpl_before_write_rows() {
+ in_rpl_write_rows = true;
+}
+
+void ha_tokudb::rpl_after_write_rows() {
+ in_rpl_write_rows = false;
+}
+
+void ha_tokudb::rpl_before_delete_rows() {
+ in_rpl_delete_rows = true;
+}
+
+void ha_tokudb::rpl_after_delete_rows() {
+ in_rpl_delete_rows = false;
+}
+
+void ha_tokudb::rpl_before_update_rows() {
+ in_rpl_update_rows = true;
+}
+
+void ha_tokudb::rpl_after_update_rows() {
+ in_rpl_update_rows = false;
+}
+
+bool ha_tokudb::rpl_lookup_rows() {
+ if (!in_rpl_delete_rows && !in_rpl_update_rows)
+ return true;
+ else
+ return THDVAR(ha_thd(), rpl_lookup_rows);
+}
+
// table admin
#include "ha_tokudb_admin.cc"