diff options
author | Sergei Golubchik <serg@mariadb.org> | 2016-06-30 16:38:05 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2016-06-30 16:38:05 +0200 |
commit | 932646b1ff6a8f5815a961340a9e1ee4702f5b44 (patch) | |
tree | 5bc42ace8ae1f7e4d00baffd468bdb7564e851f1 /storage/tokudb/ha_tokudb.cc | |
parent | 0bb30f3603b519780eaf3fe0527b1c6af285229a (diff) | |
parent | 33492ec8d4e2077cf8e07d0628a959d8729bd1f9 (diff) | |
download | mariadb-git-932646b1ff6a8f5815a961340a9e1ee4702f5b44.tar.gz |
Merge branch '10.1' into 10.2
Diffstat (limited to 'storage/tokudb/ha_tokudb.cc')
-rw-r--r-- | storage/tokudb/ha_tokudb.cc | 3083 |
1 files changed, 1896 insertions, 1187 deletions
diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 2e3542e6eb4..79d85a9b77c 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -23,60 +23,31 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." -#ifdef USE_PRAGMA_IMPLEMENTATION -#pragma implementation // gcc: Class implementation -#endif - -#include <my_config.h> -extern "C" { -#include "stdint.h" -#define __STDC_FORMAT_MACROS -#include "inttypes.h" -#if defined(_WIN32) -#include "misc.h" -#endif -} - -#define MYSQL_SERVER 1 -#include "mysql_version.h" -#include "sql_table.h" -#include "handler.h" -#include "table.h" -#include "log.h" -#include "sql_class.h" -#include "sql_show.h" -#include "discover.h" - -#if (50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) || (50700 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50799) -#include <binlog.h> -#endif - -#include "db.h" -#include "toku_os.h" -#include "hatoku_defines.h" +#include "hatoku_hton.h" #include "hatoku_cmp.h" +#include "tokudb_buffer.h" +#include "tokudb_status.h" +#include "tokudb_card.h" +#include "ha_tokudb.h" -static inline uint get_key_parts(const KEY *key); - -#undef PACKAGE -#undef VERSION -#undef HAVE_DTRACE -#undef _DTRACE_VERSION -/* We define DTRACE after mysql_priv.h in case it disabled dtrace in the main server */ -#ifdef HAVE_DTRACE -#define _DTRACE_VERSION 1 +#if TOKU_INCLUDE_EXTENDED_KEYS +static inline uint get_ext_key_parts(const KEY *key) { +#if (50609 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) || \ + (50700 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50799) + return key->actual_key_parts; +#elif defined(MARIADB_BASE_VERSION) + return key->ext_key_parts; #else +#error +#endif +} #endif -#include "tokudb_buffer.h" -#include "tokudb_status.h" -#include "tokudb_card.h" -#include "ha_tokudb.h" -#include "hatoku_hton.h" -#include <mysql/plugin.h> +HASH TOKUDB_SHARE::_open_tables; +tokudb::thread::mutex_t TOKUDB_SHARE::_open_tables_mutex; -static const char *ha_tokudb_exts[] = { +static const char* ha_tokudb_exts[] = { ha_tokudb_ext, NullS }; @@ -84,10 +55,15 @@ static const char *ha_tokudb_exts[] = { // // This offset is calculated starting from AFTER the NULL bytes // -static inline uint32_t get_fixed_field_size(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, uint keynr) { +static inline uint32_t get_fixed_field_size( + KEY_AND_COL_INFO* kc_info, + TABLE_SHARE* table_share, + uint keynr) { + uint offset = 0; for (uint i = 0; i < table_share->fields; i++) { - if (is_fixed_field(kc_info, i) && !bitmap_is_set(&kc_info->key_filters[keynr],i)) { + if (is_fixed_field(kc_info, i) && + !bitmap_is_set(&kc_info->key_filters[keynr], i)) { offset += kc_info->field_lengths[i]; } } @@ -95,10 +71,15 @@ static inline uint32_t get_fixed_field_size(KEY_AND_COL_INFO* kc_info, TABLE_SHA } -static inline uint32_t get_len_of_offsets(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, uint keynr) { +static inline uint32_t get_len_of_offsets( + KEY_AND_COL_INFO* kc_info, + TABLE_SHARE* table_share, + uint keynr) { + uint len = 0; for (uint i = 0; i < table_share->fields; i++) { - if (is_variable_field(kc_info, i) && !bitmap_is_set(&kc_info->key_filters[keynr],i)) { + if (is_variable_field(kc_info, i) && + !bitmap_is_set(&kc_info->key_filters[keynr], i)) { len += kc_info->num_offset_bytes; } } @@ -106,32 +87,36 @@ static inline uint32_t get_len_of_offsets(KEY_AND_COL_INFO* kc_info, TABLE_SHARE } -static int allocate_key_and_col_info ( TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info) { +static int allocate_key_and_col_info( + TABLE_SHARE* table_share, + KEY_AND_COL_INFO* kc_info) { + int error; // // initialize all of the bitmaps // for (uint i = 0; i < MAX_KEY + 1; i++) { - error = bitmap_init( - &kc_info->key_filters[i], - NULL, - table_share->fields, - false - ); + error = + bitmap_init( + &kc_info->key_filters[i], + NULL, + table_share->fields, + false); if (error) { goto exit; } } - + // // create the field lengths // - kc_info->multi_ptr = tokudb_my_multi_malloc(MYF(MY_WME+MY_ZEROFILL), - &kc_info->field_types, (uint)(table_share->fields * sizeof (uint8_t)), - &kc_info->field_lengths, (uint)(table_share->fields * sizeof (uint16_t)), - &kc_info->length_bytes, (uint)(table_share->fields * sizeof (uint8_t)), - &kc_info->blob_fields, (uint)(table_share->fields * sizeof (uint32_t)), - NullS); + kc_info->multi_ptr = tokudb::memory::multi_malloc( + MYF(MY_WME+MY_ZEROFILL), + &kc_info->field_types, (uint)(table_share->fields * sizeof (uint8_t)), + &kc_info->field_lengths, (uint)(table_share->fields * sizeof (uint16_t)), + &kc_info->length_bytes, (uint)(table_share->fields * sizeof (uint8_t)), + &kc_info->blob_fields, (uint)(table_share->fields * sizeof (uint32_t)), + NullS); if (kc_info->multi_ptr == NULL) { error = ENOMEM; goto exit; @@ -141,7 +126,7 @@ exit: for (uint i = 0; MAX_KEY + 1; i++) { bitmap_free(&kc_info->key_filters[i]); } - tokudb_my_free(kc_info->multi_ptr); + tokudb::memory::free(kc_info->multi_ptr); } return error; } @@ -150,136 +135,306 @@ static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) { for (uint i = 0; i < MAX_KEY+1; i++) { bitmap_free(&kc_info->key_filters[i]); } - + for (uint i = 0; i < MAX_KEY+1; i++) { - tokudb_my_free(kc_info->cp_info[i]); + tokudb::memory::free(kc_info->cp_info[i]); kc_info->cp_info[i] = NULL; // 3144 } - tokudb_my_free(kc_info->multi_ptr); + tokudb::memory::free(kc_info->multi_ptr); kc_info->field_types = NULL; kc_info->field_lengths = NULL; kc_info->length_bytes = NULL; kc_info->blob_fields = NULL; } -void TOKUDB_SHARE::init(void) { - use_count = 0; - thr_lock_init(&lock); - tokudb_pthread_mutex_init(&mutex, MY_MUTEX_INIT_FAST); - my_rwlock_init(&num_DBs_lock, 0); - tokudb_pthread_cond_init(&m_openclose_cond, NULL); - m_state = CLOSED; -} -void TOKUDB_SHARE::destroy(void) { - assert(m_state == CLOSED); - thr_lock_delete(&lock); - tokudb_pthread_mutex_destroy(&mutex); - rwlock_destroy(&num_DBs_lock); - tokudb_pthread_cond_destroy(&m_openclose_cond); - tokudb_my_free(rec_per_key); - rec_per_key = NULL; -} +uchar* TOKUDB_SHARE::hash_get_key( + TOKUDB_SHARE* share, + size_t* length, + TOKUDB_UNUSED(my_bool not_used)) { -// MUST have tokudb_mutex locked on input -static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) { - TOKUDB_SHARE *share = NULL; + *length = share->_full_table_name.length(); + return (uchar *) share->_full_table_name.c_ptr(); +} +void TOKUDB_SHARE::hash_free_element(TOKUDB_SHARE* share) { + share->destroy(); + delete share; +} +void TOKUDB_SHARE::static_init() { + my_hash_init( + &_open_tables, + table_alias_charset, + 32, + 0, + 0, + (my_hash_get_key)hash_get_key, + (my_hash_free_key)hash_free_element, 0); +} +void TOKUDB_SHARE::static_destroy() { + my_hash_free(&_open_tables); +} +const char* TOKUDB_SHARE::get_state_string(share_state_t state) { + static const char* state_string[] = { + "CLOSED", + "OPENED", + "ERROR" + }; + assert_always(state == CLOSED || state == OPENED || state == ERROR); + return state_string[state]; +} +void* TOKUDB_SHARE::operator new(size_t sz) { + return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); +} +void TOKUDB_SHARE::operator delete(void* p) { + tokudb::memory::free(p); +} +TOKUDB_SHARE::TOKUDB_SHARE() : + _num_DBs_lock(), + _mutex() { +} +void TOKUDB_SHARE::init(const char* table_name) { + _use_count = 0; + thr_lock_init(&_thr_lock); + _state = CLOSED; + _row_delta_activity = 0; + _allow_auto_analysis = true; + + _full_table_name.append(table_name); + + String tmp_dictionary_name; + tokudb_split_dname( + table_name, + _database_name, + _table_name, + tmp_dictionary_name); + + TOKUDB_SHARE_DBUG_ENTER("file[%s]:state[%s]:use_count[%d]", + _full_table_name.ptr(), + get_state_string(_state), + _use_count); + TOKUDB_SHARE_DBUG_VOID_RETURN(); +} +void TOKUDB_SHARE::destroy() { + TOKUDB_SHARE_DBUG_ENTER("file[%s]:state[%s]:use_count[%d]", + _full_table_name.ptr(), + get_state_string(_state), + _use_count); + + assert_always(_use_count == 0); + assert_always( + _state == TOKUDB_SHARE::CLOSED || _state == TOKUDB_SHARE::ERROR); + thr_lock_delete(&_thr_lock); + TOKUDB_SHARE_DBUG_VOID_RETURN(); +} +TOKUDB_SHARE* TOKUDB_SHARE::get_share( + const char* table_name, + TABLE_SHARE* table_share, + THR_LOCK_DATA* data, + bool create_new) { + + _open_tables_mutex.lock(); int error = 0; uint length = (uint) strlen(table_name); - if (!(share = (TOKUDB_SHARE *) my_hash_search(&tokudb_open_tables, (uchar *) table_name, length))) { - char *tmp_name; - + TOKUDB_SHARE* share = + (TOKUDB_SHARE*)my_hash_search( + &_open_tables, + (uchar*)table_name, + length); + + TOKUDB_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_SHARE, + "existing share[%s] %s:share[%p]", + table_name, + share == NULL ? "not found" : "found", + share); + + if (!share) { + if (create_new == false) + goto exit; // create share and fill it with all zeroes // hence, all pointers are initialized to NULL - share = (TOKUDB_SHARE *) tokudb_my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), - &share, sizeof(*share), - &tmp_name, length + 1, - NullS - ); - assert(share); - - share->init(); + share = new TOKUDB_SHARE; + assert_always(share); - share->table_name_length = length; - share->table_name = tmp_name; - strmov(share->table_name, table_name); + share->init(table_name); - error = my_hash_insert(&tokudb_open_tables, (uchar *) share); + error = my_hash_insert(&_open_tables, (uchar*)share); if (error) { free_key_and_col_info(&share->kc_info); + share->destroy(); + tokudb::memory::free((uchar*)share); + share = NULL; goto exit; } } + share->addref(); + + if (data) + thr_lock_data_init(&(share->_thr_lock), data, NULL); + exit: - if (error) { - share->destroy(); - tokudb_my_free((uchar *) share); - share = NULL; - } + _open_tables_mutex.unlock(); return share; } +void TOKUDB_SHARE::drop_share(TOKUDB_SHARE* share) { + TOKUDB_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_SHARE, + "share[%p]:file[%s]:state[%s]:use_count[%d]", + share, + share->_full_table_name.ptr(), + get_state_string(share->_state), + share->_use_count); + + _open_tables_mutex.lock(); + my_hash_delete(&_open_tables, (uchar*)share); + _open_tables_mutex.unlock(); +} +TOKUDB_SHARE::share_state_t TOKUDB_SHARE::addref() { + TOKUDB_SHARE_TRACE_FOR_FLAGS((TOKUDB_DEBUG_ENTER & TOKUDB_DEBUG_SHARE), + "file[%s]:state[%s]:use_count[%d]", + _full_table_name.ptr(), + get_state_string(_state), + _use_count); + + lock(); + _use_count++; + + return _state; +} +int TOKUDB_SHARE::release() { + TOKUDB_SHARE_DBUG_ENTER("file[%s]:state[%s]:use_count[%d]", + _full_table_name.ptr(), + get_state_string(_state), + _use_count); -static int free_share(TOKUDB_SHARE * share) { int error, result = 0; - tokudb_pthread_mutex_lock(&share->mutex); - DBUG_PRINT("info", ("share->use_count %u", share->use_count)); - if (!--share->use_count) { - share->m_state = TOKUDB_SHARE::CLOSING; - tokudb_pthread_mutex_unlock(&share->mutex); - - // - // number of open DB's may not be equal to number of keys we have because add_index - // may have added some. So, we loop through entire array and close any non-NULL value - // It is imperative that we reset a DB to NULL once we are done with it. - // - for (uint i = 0; i < sizeof(share->key_file)/sizeof(share->key_file[0]); i++) { - if (share->key_file[i]) { - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - TOKUDB_TRACE("dbclose:%p", share->key_file[i]); - } - error = share->key_file[i]->close(share->key_file[i], 0); - assert(error == 0); + _mutex.lock(); + assert_always(_use_count != 0); + _use_count--; + if (_use_count == 0 && _state == TOKUDB_SHARE::OPENED) { + // number of open DB's may not be equal to number of keys we have + // because add_index may have added some. So, we loop through entire + // array and close any non-NULL value. It is imperative that we reset + // a DB to NULL once we are done with it. + for (uint i = 0; i < sizeof(key_file)/sizeof(key_file[0]); i++) { + if (key_file[i]) { + TOKUDB_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_OPEN, + "dbclose:%p", + key_file[i]); + error = key_file[i]->close(key_file[i], 0); + assert_always(error == 0); if (error) { result = error; } - if (share->key_file[i] == share->file) - share->file = NULL; - share->key_file[i] = NULL; + if (key_file[i] == file) + file = NULL; + key_file[i] = NULL; } } - error = tokudb::close_status(&share->status_block); - assert(error == 0); + error = tokudb::metadata::close(&status_block); + assert_always(error == 0); - free_key_and_col_info(&share->kc_info); + free_key_and_col_info(&kc_info); - tokudb_pthread_mutex_lock(&tokudb_mutex); - tokudb_pthread_mutex_lock(&share->mutex); - share->m_state = TOKUDB_SHARE::CLOSED; - if (share->use_count > 0) { - tokudb_pthread_cond_broadcast(&share->m_openclose_cond); - tokudb_pthread_mutex_unlock(&share->mutex); - tokudb_pthread_mutex_unlock(&tokudb_mutex); - } else { - - my_hash_delete(&tokudb_open_tables, (uchar *) share); - - tokudb_pthread_mutex_unlock(&share->mutex); - tokudb_pthread_mutex_unlock(&tokudb_mutex); + if (_rec_per_key) { + tokudb::memory::free(_rec_per_key); + _rec_per_key = NULL; + _rec_per_keys = 0; + } - share->destroy(); - tokudb_my_free((uchar *) share); + for (uint i = 0; i < _keys; i++) { + tokudb::memory::free(_key_descriptors[i]._name); } - } else { - tokudb_pthread_mutex_unlock(&share->mutex); + tokudb::memory::free(_key_descriptors); + _keys = _max_key_parts = 0; _key_descriptors = NULL; + + _state = TOKUDB_SHARE::CLOSED; } + _mutex.unlock(); - return result; + TOKUDB_SHARE_DBUG_RETURN(result); +} +void TOKUDB_SHARE::update_row_count( + THD* thd, + uint64_t added, + uint64_t deleted, + uint64_t updated) { + + uint64_t delta = added + deleted + updated; + lock(); + if (deleted > added && _rows < (deleted - added)) { + _rows = 0; + } else { + _rows += added - deleted; + } + _row_delta_activity += delta; + if (_row_delta_activity == (uint64_t)~0) + _row_delta_activity = 1; + + ulonglong auto_threshold = tokudb::sysvars::auto_analyze(thd); + if (delta && auto_threshold > 0 && _allow_auto_analysis) { + ulonglong pct_of_rows_changed_to_trigger; + pct_of_rows_changed_to_trigger = ((_rows * auto_threshold) / 100); + if (_row_delta_activity >= pct_of_rows_changed_to_trigger) { + char msg[200]; + snprintf( + msg, + sizeof(msg), + "TokuDB: Auto %s background analysis for %s, delta_activity " + "%llu is greater than %llu percent of %llu rows.", + tokudb::sysvars::analyze_in_background(thd) > 0 ? + "scheduling" : "running", + full_table_name(), + _row_delta_activity, + auto_threshold, + (ulonglong)(_rows)); + + // analyze_standard will unlock _mutex regardless of success/failure + int ret = analyze_standard(thd, NULL); + if (ret == 0) { + sql_print_information("%s - succeeded.", msg); + } else { + sql_print_information( + "%s - failed, likely a job already running.", + msg); + } + } + } + unlock(); } +void TOKUDB_SHARE::set_cardinality_counts_in_table(TABLE* table) { + lock(); + uint32_t next_key_part = 0; + for (uint32_t i = 0; i < table->s->keys; i++) { + KEY* key = &table->key_info[i]; + bool is_unique_key = + (i == table->s->primary_key) || (key->flags & HA_NOSAME); + for (uint32_t j = 0; j < get_ext_key_parts(key); j++) { + if (j >= key->user_defined_key_parts) { + // MySQL 'hidden' keys, really needs deeper investigation + // into MySQL hidden keys vs TokuDB hidden keys + key->rec_per_key[j] = 1; + continue; + } + + assert_always(next_key_part < _rec_per_keys); + ulong val = _rec_per_key[next_key_part++]; + val = (val * tokudb::sysvars::cardinality_scale_percent) / 100; + if (val == 0 || _rows == 0 || + (is_unique_key && j == get_ext_key_parts(key) - 1)) { + val = 1; + } + key->rec_per_key[j] = val; + } + } + unlock(); +} #define HANDLE_INVALID_CURSOR() \ if (cursor == NULL) { \ @@ -288,7 +443,6 @@ static int free_share(TOKUDB_SHARE * share) { } const char *ha_tokudb::table_type() const { - extern const char *tokudb_hton_name; return tokudb_hton_name; } @@ -296,7 +450,7 @@ const char *ha_tokudb::index_type(uint inx) { return "BTREE"; } -/* +/* * returns NULL terminated file extension string */ const char **ha_tokudb::bas_ext() const { @@ -315,46 +469,23 @@ static inline bool is_replace_into(THD* thd) { return thd->lex->duplicates == DUP_REPLACE; } -static inline bool do_ignore_flag_optimization(THD* thd, TABLE* table, bool opt_eligible) { +static inline bool do_ignore_flag_optimization( + THD* thd, + TABLE* table, + bool opt_eligible) { + bool do_opt = false; - if (opt_eligible) { - if (is_replace_into(thd) || is_insert_ignore(thd)) { - uint pk_insert_mode = get_pk_insert_mode(thd); - if ((!table->triggers && pk_insert_mode < 2) || pk_insert_mode == 0) { - if (mysql_bin_log.is_open() && thd->variables.binlog_format != BINLOG_FORMAT_STMT) { - do_opt = false; - } else { - do_opt = true; - } - } - } + if (opt_eligible && + (is_replace_into(thd) || is_insert_ignore(thd)) && + tokudb::sysvars::pk_insert_mode(thd) == 1 && + !table->triggers && + !(mysql_bin_log.is_open() && + thd->variables.binlog_format != BINLOG_FORMAT_STMT)) { + do_opt = true; } return do_opt; } -static inline uint get_key_parts(const KEY *key) { -#if (50609 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) || \ - (50700 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50799) || \ - (100009 <= MYSQL_VERSION_ID) - return key->user_defined_key_parts; -#else - return key->key_parts; -#endif -} - -#if TOKU_INCLUDE_EXTENDED_KEYS -static inline uint get_ext_key_parts(const KEY *key) { -#if (50609 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) || \ - (50700 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50799) - return key->actual_key_parts; -#elif defined(MARIADB_BASE_VERSION) - return key->ext_key_parts; -#else -#error -#endif -} -#endif - ulonglong ha_tokudb::table_flags() const { return int_table_flags | HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; } @@ -365,11 +496,9 @@ ulonglong ha_tokudb::table_flags() const { // ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const { TOKUDB_HANDLER_DBUG_ENTER(""); - assert(table_share); - ulong flags = (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | HA_KEYREAD_ONLY | HA_READ_RANGE); -#if defined(MARIADB_BASE_VERSION) || (50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) - flags |= HA_DO_INDEX_COND_PUSHDOWN; -#endif + assert_always(table_share); + ulong flags = (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | + HA_KEYREAD_ONLY | HA_READ_RANGE | HA_DO_INDEX_COND_PUSHDOWN); if (key_is_clustering(&table_share->key_info[idx])) { flags |= HA_CLUSTERED_INDEX; } @@ -434,13 +563,13 @@ static int loader_poll_fun(void *extra, float progress) { static void loader_ai_err_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) { LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra; - assert(context->ha); + assert_always(context->ha); context->ha->set_loader_error(err); } static void loader_dup_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) { LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra; - assert(context->ha); + assert_always(context->ha); context->ha->set_loader_error(err); if (err == DB_KEYEXIST) { context->ha->set_dup_value_for_pk(key); @@ -620,8 +749,7 @@ static ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar break; default: - DBUG_ASSERT(0); - unsigned_autoinc = 0; + assert_unreachable(); } if (signed_autoinc < 0) { @@ -632,21 +760,6 @@ static ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar unsigned_autoinc : (ulonglong) signed_autoinc; } -static inline bool -is_null_field( TABLE* table, Field* field, const uchar* record) { - uint null_offset; - bool ret_val; - if (!field->real_maybe_null()) { - ret_val = false; - goto exitpt; - } - null_offset = get_null_offset(table,field); - ret_val = (record[null_offset] & field->null_bit) ? true: false; - -exitpt: - return ret_val; -} - static inline ulong field_offset(Field* field, TABLE* table) { return((ulong) (field->ptr - table->record[0])); } @@ -692,29 +805,36 @@ static int filter_key_part_compare (const void* left, const void* right) { // if key, table have proper info set. I had to verify by checking // in the debugger. // -void set_key_filter(MY_BITMAP* key_filter, KEY* key, TABLE* table, bool get_offset_from_keypart) { +void set_key_filter( + MY_BITMAP* key_filter, + KEY* key, + TABLE* table, + bool get_offset_from_keypart) { + FILTER_KEY_PART_INFO parts[MAX_REF_PARTS]; uint curr_skip_index = 0; - for (uint i = 0; i < get_key_parts(key); i++) { + for (uint i = 0; i < key->user_defined_key_parts; i++) { // // horrendous hack due to bugs in mysql, basically // we cannot always reliably get the offset from the same source // - parts[i].offset = get_offset_from_keypart ? key->key_part[i].offset : field_offset(key->key_part[i].field, table); + parts[i].offset = + get_offset_from_keypart ? + key->key_part[i].offset : + field_offset(key->key_part[i].field, table); parts[i].part_index = i; } qsort( parts, // start of array - get_key_parts(key), //num elements + key->user_defined_key_parts, //num elements sizeof(*parts), //size of each element - filter_key_part_compare - ); + filter_key_part_compare); for (uint i = 0; i < table->s->fields; i++) { Field* field = table->field[i]; uint curr_field_offset = field_offset(field, table); - if (curr_skip_index < get_key_parts(key)) { + if (curr_skip_index < key->user_defined_key_parts) { uint curr_skip_offset = 0; curr_skip_offset = parts[curr_skip_index].offset; if (curr_skip_offset == curr_field_offset) { @@ -830,7 +950,7 @@ static inline uchar* write_var_field( int2store(to_tokudb_offset_ptr,offset); break; default: - assert(false); + assert_unreachable(); break; } return to_tokudb_data + data_length; @@ -850,8 +970,7 @@ static inline uint32_t get_var_data_length( data_length = uint2korr(from_mysql); break; default: - assert(false); - break; + assert_unreachable(); } return data_length; } @@ -894,8 +1013,7 @@ static inline void unpack_var_field( int2store(to_mysql, from_tokudb_data_len); break; default: - assert(false); - break; + assert_unreachable(); } // // store the data @@ -928,7 +1046,7 @@ static uchar* pack_toku_field_blob( length = uint4korr(from_mysql); break; default: - assert(false); + assert_unreachable(); } if (length > 0) { @@ -940,7 +1058,9 @@ static uchar* pack_toku_field_blob( static int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx) { int error; - tokudb_trx_data* trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL)); + tokudb_trx_data* trx = (tokudb_trx_data *) tokudb::memory::malloc( + sizeof(*trx), + MYF(MY_ZEROFILL)); if (!trx) { error = ENOMEM; goto cleanup; @@ -1011,32 +1131,27 @@ static inline int tokudb_generate_row( void* old_ptr = dest_key->data; void* new_ptr = NULL; new_ptr = realloc(old_ptr, max_key_len); - assert(new_ptr); + assert_always(new_ptr); dest_key->data = new_ptr; dest_key->ulen = max_key_len; } buff = (uchar *)dest_key->data; - assert(buff != NULL && max_key_len > 0); - } - else { - assert(false); + assert_always(buff != NULL && max_key_len > 0); + } else { + assert_unreachable(); } - dest_key->size = pack_key_from_desc( - buff, - row_desc, - desc_size, - src_key, - src_val - ); - assert(dest_key->ulen >= dest_key->size); - if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY && !max_key_len) { + dest_key->size = pack_key_from_desc(buff, row_desc, desc_size, src_key, + src_val); + assert_always(dest_key->ulen >= dest_key->size); + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK_KEY)) && + !max_key_len) { max_key_len = max_key_size_from_desc(row_desc, desc_size); max_key_len += src_key->size; } if (max_key_len) { - assert(max_key_len >= dest_key->size); + assert_always(max_key_len >= dest_key->size); } row_desc += desc_size; @@ -1045,8 +1160,7 @@ static inline int tokudb_generate_row( if (dest_val != NULL) { if (!is_key_clustering(row_desc, desc_size) || src_val->size == 0) { dest_val->size = 0; - } - else { + } else { uchar* buff = NULL; if (dest_val->flags == 0) { dest_val->ulen = 0; @@ -1059,23 +1173,21 @@ static inline int tokudb_generate_row( void* old_ptr = dest_val->data; void* new_ptr = NULL; new_ptr = realloc(old_ptr, src_val->size); - assert(new_ptr); + assert_always(new_ptr); dest_val->data = new_ptr; dest_val->ulen = src_val->size; } buff = (uchar *)dest_val->data; - assert(buff != NULL); - } - else { - assert(false); + assert_always(buff != NULL); + } else { + assert_unreachable(); } dest_val->size = pack_clustering_val_from_desc( buff, row_desc, desc_size, - src_val - ); - assert(dest_val->ulen >= dest_val->size); + src_val); + assert_always(dest_val->ulen >= dest_val->size); } } error = 0; @@ -1143,6 +1255,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t read_key = false; added_rows = 0; deleted_rows = 0; + updated_rows = 0; last_dup_key = UINT_MAX; using_ignore = false; using_ignore_no_key = false; @@ -1224,41 +1337,42 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) { static int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) { int error; char* newname = NULL; - newname = (char *)tokudb_my_malloc( - get_max_dict_name_path_length(name), - MYF(MY_WME)); + size_t newname_len = get_max_dict_name_path_length(name); + newname = (char*)tokudb::memory::malloc(newname_len, MYF(MY_WME)); if (newname == NULL) { error = ENOMEM; goto cleanup; } - make_name(newname, name, "status"); - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - TOKUDB_TRACE("open:%s", newname); - } + make_name(newname, newname_len, name, "status"); + TOKUDB_TRACE_FOR_FLAGS(TOKUDB_DEBUG_OPEN, "open:%s", newname); - error = tokudb::open_status(db_env, ptr, newname, txn); + error = tokudb::metadata::open(db_env, ptr, newname, txn); cleanup: - tokudb_my_free(newname); + tokudb::memory::free(newname); return error; } -int ha_tokudb::open_main_dictionary(const char* name, bool is_read_only, DB_TXN* txn) { +int ha_tokudb::open_main_dictionary( + const char* name, + bool is_read_only, + DB_TXN* txn) { + int error; char* newname = NULL; + size_t newname_len = 0; uint open_flags = (is_read_only ? DB_RDONLY : 0) | DB_THREAD; - assert(share->file == NULL); - assert(share->key_file[primary_key] == NULL); - - newname = (char *)tokudb_my_malloc( - get_max_dict_name_path_length(name), - MYF(MY_WME|MY_ZEROFILL) - ); + assert_always(share->file == NULL); + assert_always(share->key_file[primary_key] == NULL); + newname_len = get_max_dict_name_path_length(name); + newname = (char*)tokudb::memory::malloc( + newname_len, + MYF(MY_WME|MY_ZEROFILL)); if (newname == NULL) { error = ENOMEM; goto exit; } - make_name(newname, name, "main"); + make_name(newname, newname_len, name, "main"); error = db_create(&share->file, db_env, 0); if (error) { @@ -1266,14 +1380,24 @@ int ha_tokudb::open_main_dictionary(const char* name, bool is_read_only, DB_TXN* } share->key_file[primary_key] = share->file; - error = share->file->open(share->file, txn, newname, NULL, DB_BTREE, open_flags, 0); + error = + share->file->open( + share->file, + txn, + newname, + NULL, + DB_BTREE, + open_flags, + 0); if (error) { goto exit; } - - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - TOKUDB_HANDLER_TRACE("open:%s:file=%p", newname, share->file); - } + + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_OPEN, + "open:%s:file=%p", + newname, + share->file); error = 0; exit: @@ -1283,34 +1407,42 @@ exit: share->file, 0 ); - assert(r==0); + assert_always(r==0); share->file = NULL; share->key_file[primary_key] = NULL; } } - tokudb_my_free(newname); + tokudb::memory::free(newname); return error; } // -// Open a secondary table, the key will be a secondary index, the data will be a primary key +// Open a secondary table, the key will be a secondary index, the data will +// be a primary key // -int ha_tokudb::open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, bool is_read_only, DB_TXN* txn) { +int ha_tokudb::open_secondary_dictionary( + DB** ptr, + KEY* key_info, + const char* name, + bool is_read_only, + DB_TXN* txn) { + int error = ENOSYS; char dict_name[MAX_DICT_NAME_LEN]; uint open_flags = (is_read_only ? DB_RDONLY : 0) | DB_THREAD; char* newname = NULL; - uint newname_len = 0; - + size_t newname_len = 0; + sprintf(dict_name, "key-%s", key_info->name); newname_len = get_max_dict_name_path_length(name); - newname = (char *)tokudb_my_malloc(newname_len, MYF(MY_WME|MY_ZEROFILL)); + newname = + (char*)tokudb::memory::malloc(newname_len, MYF(MY_WME|MY_ZEROFILL)); if (newname == NULL) { error = ENOMEM; goto cleanup; } - make_name(newname, name, dict_name); + make_name(newname, newname_len, name, dict_name); if ((error = db_create(ptr, db_env, 0))) { @@ -1319,22 +1451,25 @@ int ha_tokudb::open_secondary_dictionary(DB** ptr, KEY* key_info, const char* na } - if ((error = (*ptr)->open(*ptr, txn, newname, NULL, DB_BTREE, open_flags, 0))) { + error = (*ptr)->open(*ptr, txn, newname, NULL, DB_BTREE, open_flags, 0); + if (error) { my_errno = error; goto cleanup; } - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - TOKUDB_HANDLER_TRACE("open:%s:file=%p", newname, *ptr); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_OPEN, + "open:%s:file=%p", + newname, + *ptr); cleanup: if (error) { if (*ptr) { int r = (*ptr)->close(*ptr, 0); - assert(r==0); + assert_always(r==0); *ptr = NULL; } } - tokudb_my_free(newname); + tokudb::memory::free(newname); return error; } @@ -1343,11 +1478,10 @@ static int initialize_col_pack_info(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* tabl // // set up the cp_info // - assert(kc_info->cp_info[keynr] == NULL); - kc_info->cp_info[keynr] = (COL_PACK_INFO *)tokudb_my_malloc( - table_share->fields*sizeof(COL_PACK_INFO), - MYF(MY_WME | MY_ZEROFILL) - ); + assert_always(kc_info->cp_info[keynr] == NULL); + kc_info->cp_info[keynr] = (COL_PACK_INFO*)tokudb::memory::malloc( + table_share->fields * sizeof(COL_PACK_INFO), + MYF(MY_WME | MY_ZEROFILL)); if (kc_info->cp_info[keynr] == NULL) { error = ENOMEM; goto exit; @@ -1396,12 +1530,18 @@ exit: // reset the kc_info state at keynr static void reset_key_and_col_info(KEY_AND_COL_INFO *kc_info, uint keynr) { bitmap_clear_all(&kc_info->key_filters[keynr]); - tokudb_my_free(kc_info->cp_info[keynr]); + tokudb::memory::free(kc_info->cp_info[keynr]); kc_info->cp_info[keynr] = NULL; kc_info->mcp_info[keynr] = (MULTI_COL_PACK_INFO) { 0, 0 }; } -static int initialize_key_and_col_info(TABLE_SHARE* table_share, TABLE* table, KEY_AND_COL_INFO* kc_info, uint hidden_primary_key, uint primary_key) { +static int initialize_key_and_col_info( + TABLE_SHARE* table_share, + TABLE* table, + KEY_AND_COL_INFO* kc_info, + uint hidden_primary_key, + uint primary_key) { + int error = 0; uint32_t curr_blob_field_index = 0; uint32_t max_var_bytes = 0; @@ -1420,7 +1560,7 @@ static int initialize_key_and_col_info(TABLE_SHARE* table_share, TABLE* table, K case toku_type_fixbinary: case toku_type_fixstring: pack_length = field->pack_length(); - assert(pack_length < 1<<16); + assert_always(pack_length < 1<<16); kc_info->field_types[i] = KEY_AND_COL_INFO::TOKUDB_FIXED_FIELD; kc_info->field_lengths[i] = (uint16_t)pack_length; kc_info->length_bytes[i] = 0; @@ -1436,11 +1576,12 @@ static int initialize_key_and_col_info(TABLE_SHARE* table_share, TABLE* table, K case toku_type_varbinary: kc_info->field_types[i] = KEY_AND_COL_INFO::TOKUDB_VARIABLE_FIELD; kc_info->field_lengths[i] = 0; - kc_info->length_bytes[i] = (uchar)((Field_varstring *)field)->length_bytes; + kc_info->length_bytes[i] = + (uchar)((Field_varstring*)field)->length_bytes; max_var_bytes += field->field_length; break; default: - assert(false); + assert_unreachable(); } } kc_info->num_blobs = curr_blob_field_index; @@ -1452,54 +1593,54 @@ static int initialize_key_and_col_info(TABLE_SHARE* table_share, TABLE* table, K // if (max_var_bytes < 256) { kc_info->num_offset_bytes = 1; - } - else { + } else { kc_info->num_offset_bytes = 2; } - for (uint i = 0; i < table_share->keys + tokudb_test(hidden_primary_key); i++) { + for (uint i = 0; + i < table_share->keys + tokudb_test(hidden_primary_key); + i++) { // // do the cluster/primary key filtering calculations // - if (! (i==primary_key && hidden_primary_key) ){ - if ( i == primary_key ) { + if (!(i==primary_key && hidden_primary_key)) { + if (i == primary_key) { set_key_filter( &kc_info->key_filters[primary_key], &table_share->key_info[primary_key], table, - true - ); - } - else { + true); + } else { set_key_filter( &kc_info->key_filters[i], &table_share->key_info[i], table, - true - ); + true); if (!hidden_primary_key) { set_key_filter( &kc_info->key_filters[i], &table_share->key_info[primary_key], table, - true - ); + true); } } } if (i == primary_key || key_is_clustering(&table_share->key_info[i])) { - error = initialize_col_pack_info(kc_info,table_share,i); + error = initialize_col_pack_info(kc_info, table_share, i); if (error) { goto exit; } } - } exit: return error; } -bool ha_tokudb::can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info, uint pk) { +bool ha_tokudb::can_replace_into_be_fast( + TABLE_SHARE* table_share, + KEY_AND_COL_INFO* kc_info, + uint pk) { + uint curr_num_DBs = table_share->keys + tokudb_test(hidden_primary_key); bool ret_val; if (curr_num_DBs == 1) { @@ -1510,7 +1651,7 @@ bool ha_tokudb::can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_I for (uint curr_index = 0; curr_index < table_share->keys; curr_index++) { if (curr_index == pk) continue; KEY* curr_key_info = &table_share->key_info[curr_index]; - for (uint i = 0; i < get_key_parts(curr_key_info); i++) { + for (uint i = 0; i < curr_key_info->user_defined_key_parts; i++) { uint16 curr_field_index = curr_key_info->key_part[i].field->field_index; if (!bitmap_is_set(&kc_info->key_filters[curr_index],curr_field_index)) { ret_val = false; @@ -1544,8 +1685,6 @@ int ha_tokudb::initialize_share(const char* name, int mode) { if (error) { goto exit; } } - DBUG_PRINT("info", ("share->use_count %u", share->use_count)); - share->m_initialize_count++; error = get_status(txn); if (error) { @@ -1574,43 +1713,64 @@ int ha_tokudb::initialize_share(const char* name, int mode) { goto exit; #endif - error = initialize_key_and_col_info( - table_share, - table, - &share->kc_info, - hidden_primary_key, - primary_key - ); + error = + initialize_key_and_col_info( + table_share, + table, + &share->kc_info, + hidden_primary_key, + primary_key); if (error) { goto exit; } - + error = open_main_dictionary(name, mode == O_RDONLY, txn); - if (error) { goto exit; } + if (error) { + goto exit; + } share->has_unique_keys = false; + share->_keys = table_share->keys; + share->_max_key_parts = table_share->key_parts; + share->_key_descriptors = + (TOKUDB_SHARE::key_descriptor_t*)tokudb::memory::malloc( + sizeof(TOKUDB_SHARE::key_descriptor_t) * share->_keys, + MYF(MY_ZEROFILL)); + /* Open other keys; These are part of the share structure */ for (uint i = 0; i < table_share->keys; i++) { + share->_key_descriptors[i]._parts = + table_share->key_info[i].user_defined_key_parts; + if (i == primary_key) { + share->_key_descriptors[i]._is_unique = true; + share->_key_descriptors[i]._name = tokudb::memory::strdup("primary", 0); + } else { + share->_key_descriptors[i]._is_unique = false; + share->_key_descriptors[i]._name = + tokudb::memory::strdup(table_share->key_info[i].name, 0); + } + if (table_share->key_info[i].flags & HA_NOSAME) { + share->_key_descriptors[i]._is_unique = true; share->has_unique_keys = true; } if (i != primary_key) { - error = open_secondary_dictionary( - &share->key_file[i], - &table_share->key_info[i], - name, - mode == O_RDONLY, - txn - ); + error = + open_secondary_dictionary( + &share->key_file[i], + &table_share->key_info[i], + name, + mode == O_RDONLY, + txn); if (error) { goto exit; } } } - share->replace_into_fast = can_replace_into_be_fast( - table_share, - &share->kc_info, - primary_key - ); - + share->replace_into_fast = + can_replace_into_be_fast( + table_share, + &share->kc_info, + primary_key); + share->pk_has_string = false; if (!hidden_primary_key) { // @@ -1618,8 +1778,9 @@ int ha_tokudb::initialize_share(const char* name, int mode) { // the "infinity byte" in keys, and for placing the DBT size in the first four bytes // ref_length = sizeof(uint32_t) + sizeof(uchar); - KEY_PART_INFO *key_part = table->key_info[primary_key].key_part; - KEY_PART_INFO *end = key_part + get_key_parts(&table->key_info[primary_key]); + KEY_PART_INFO* key_part = table->key_info[primary_key].key_part; + KEY_PART_INFO* end = + key_part + table->key_info[primary_key].user_defined_key_parts; for (; key_part != end; key_part++) { ref_length += key_part->field->max_packed_col_length(key_part->length); TOKU_TYPE toku_type = mysql_to_toku_type(key_part->field); @@ -1640,9 +1801,8 @@ int ha_tokudb::initialize_share(const char* name, int mode) { // estimate_num_rows should not fail under normal conditions // if (error == 0) { - share->rows = num_rows; - } - else { + share->set_row_count(num_rows, true); + } else { goto exit; } // @@ -1655,8 +1815,7 @@ int ha_tokudb::initialize_share(const char* name, int mode) { if (may_table_be_empty(txn)) { share->try_table_lock = true; - } - else { + } else { share->try_table_lock = false; } @@ -1665,12 +1824,22 @@ int ha_tokudb::initialize_share(const char* name, int mode) { init_hidden_prim_key_info(txn); // initialize cardinality info from the status dictionary - share->n_rec_per_key = tokudb::compute_total_key_parts(table_share); - share->rec_per_key = (uint64_t *) tokudb_my_realloc(share->rec_per_key, share->n_rec_per_key * sizeof (uint64_t), MYF(MY_FAE + MY_ALLOW_ZERO_PTR)); - error = tokudb::get_card_from_status(share->status_block, txn, share->n_rec_per_key, share->rec_per_key); - if (error) { - for (uint i = 0; i < share->n_rec_per_key; i++) - share->rec_per_key[i] = 0; + { + uint32_t rec_per_keys = tokudb::compute_total_key_parts(table_share); + uint64_t* rec_per_key = + (uint64_t*)tokudb::memory::malloc( + rec_per_keys * sizeof(uint64_t), + MYF(MY_FAE)); + error = + tokudb::get_card_from_status( + share->status_block, + txn, + rec_per_keys, + rec_per_key); + if (error) { + memset(rec_per_key, 0, sizeof(ulonglong) * rec_per_keys); + } + share->init_cardinality_counts(rec_per_keys, rec_per_key); } error = 0; @@ -1720,7 +1889,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { // the "+ 1" is for the first byte that states +/- infinity // multiply everything by 2 to account for clustered keys having a key and primary key together max_key_length = 2*(table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar)); - alloc_ptr = tokudb_my_multi_malloc(MYF(MY_WME), + alloc_ptr = tokudb::memory::multi_malloc( + MYF(MY_WME), &key_buff, max_key_length, &key_buff2, max_key_length, &key_buff3, max_key_length, @@ -1730,81 +1900,81 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { &primary_key_buff, (hidden_primary_key ? 0 : max_key_length), &fixed_cols_for_query, table_share->fields*sizeof(uint32_t), &var_cols_for_query, table_share->fields*sizeof(uint32_t), - NullS - ); + NullS); if (alloc_ptr == NULL) { ret_val = 1; goto exit; } - size_range_query_buff = get_tokudb_read_buf_size(thd); - range_query_buff = (uchar *)tokudb_my_malloc(size_range_query_buff, MYF(MY_WME)); + size_range_query_buff = tokudb::sysvars::read_buf_size(thd); + range_query_buff = + (uchar*)tokudb::memory::malloc(size_range_query_buff, MYF(MY_WME)); if (range_query_buff == NULL) { ret_val = 1; goto exit; } - alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields; - rec_buff = (uchar *) tokudb_my_malloc(alloced_rec_buff_length, MYF(MY_WME)); + alloced_rec_buff_length = table_share->rec_buff_length + + table_share->fields; + rec_buff = (uchar *) tokudb::memory::malloc( + alloced_rec_buff_length, + MYF(MY_WME)); if (rec_buff == NULL) { ret_val = 1; goto exit; } alloced_update_rec_buff_length = alloced_rec_buff_length; - rec_update_buff = (uchar *) tokudb_my_malloc(alloced_update_rec_buff_length, MYF(MY_WME)); + rec_update_buff = (uchar*)tokudb::memory::malloc( + alloced_update_rec_buff_length, + MYF(MY_WME)); if (rec_update_buff == NULL) { ret_val = 1; goto exit; } // lookup or create share - tokudb_pthread_mutex_lock(&tokudb_mutex); - share = get_share(name, table_share); - assert(share); + share = TOKUDB_SHARE::get_share(name, table_share, &lock, true); + assert_always(share); - thr_lock_data_init(&share->lock, &lock, NULL); - - tokudb_pthread_mutex_lock(&share->mutex); - tokudb_pthread_mutex_unlock(&tokudb_mutex); - share->use_count++; - while (share->m_state == TOKUDB_SHARE::OPENING || share->m_state == TOKUDB_SHARE::CLOSING) { - tokudb_pthread_cond_wait(&share->m_openclose_cond, &share->mutex); - } - if (share->m_state == TOKUDB_SHARE::CLOSED) { - share->m_state = TOKUDB_SHARE::OPENING; - tokudb_pthread_mutex_unlock(&share->mutex); + if (share->state() != TOKUDB_SHARE::OPENED) { + // means we're responsible for the transition to OPENED, ERROR or CLOSED ret_val = allocate_key_and_col_info(table_share, &share->kc_info); if (ret_val == 0) { ret_val = initialize_share(name, mode); } - tokudb_pthread_mutex_lock(&share->mutex); if (ret_val == 0) { - share->m_state = TOKUDB_SHARE::OPENED; + share->set_state(TOKUDB_SHARE::OPENED); } else { - share->m_state = TOKUDB_SHARE::ERROR; - share->m_error = ret_val; + free_key_and_col_info(&share->kc_info); + share->set_state(TOKUDB_SHARE::ERROR); } - tokudb_pthread_cond_broadcast(&share->m_openclose_cond); + share->unlock(); + } else { + // got an already OPENED instance + share->unlock(); } - if (share->m_state == TOKUDB_SHARE::ERROR) { - ret_val = share->m_error; - tokudb_pthread_mutex_unlock(&share->mutex); - free_share(share); + + if (share->state() == TOKUDB_SHARE::ERROR) { + share->release(); goto exit; - } else { - assert(share->m_state == TOKUDB_SHARE::OPENED); - tokudb_pthread_mutex_unlock(&share->mutex); } + assert_always(share->state() == TOKUDB_SHARE::OPENED); + ref_length = share->ref_length; // If second open - - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - TOKUDB_HANDLER_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d", - this, share, share->file, table, table->s, share->use_count); - } + + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_OPEN, + "tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d", + this, + share, + share->file, + table, + table->s, + share->use_count()); key_read = false; stats.block_size = 1<<20; // QQQ Tokudb DB block size @@ -1813,13 +1983,13 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { exit: if (ret_val) { - tokudb_my_free(range_query_buff); + tokudb::memory::free(range_query_buff); range_query_buff = NULL; - tokudb_my_free(alloc_ptr); + tokudb::memory::free(alloc_ptr); alloc_ptr = NULL; - tokudb_my_free(rec_buff); + tokudb::memory::free(rec_buff); rec_buff = NULL; - tokudb_my_free(rec_update_buff); + tokudb::memory::free(rec_update_buff); rec_update_buff = NULL; if (error) { @@ -1982,7 +2152,7 @@ int ha_tokudb::write_frm_data(DB* db, DB_TXN* txn, const char* frm_name) { error = 0; cleanup: - tokudb_my_free(frm_data); + tokudb::memory::free(frm_data); TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -1993,8 +2163,8 @@ int ha_tokudb::remove_frm_data(DB *db, DB_TXN *txn) { static int smart_dbt_callback_verify_frm (DBT const *key, DBT const *row, void *context) { DBT* stored_frm = (DBT *)context; stored_frm->size = row->size; - stored_frm->data = (uchar *)tokudb_my_malloc(row->size, MYF(MY_WME)); - assert(stored_frm->data); + stored_frm->data = (uchar *)tokudb::memory::malloc(row->size, MYF(MY_WME)); + assert_always(stored_frm->data); memcpy(stored_frm->data, row->data, row->size); return 0; } @@ -2046,8 +2216,8 @@ int ha_tokudb::verify_frm_data(const char* frm_name, DB_TXN* txn) { error = 0; cleanup: - tokudb_my_free(mysql_frm_data); - tokudb_my_free(stored_frm.data); + tokudb::memory::free(mysql_frm_data); + tokudb::memory::free(stored_frm.data); TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -2083,7 +2253,7 @@ int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn){ // // Closes a handle to a table. // -int ha_tokudb::close(void) { +int ha_tokudb::close() { TOKUDB_HANDLER_DBUG_ENTER(""); int r = __close(); TOKUDB_HANDLER_DBUG_RETURN(r); @@ -2091,13 +2261,12 @@ int ha_tokudb::close(void) { int ha_tokudb::__close() { TOKUDB_HANDLER_DBUG_ENTER(""); - if (tokudb_debug & TOKUDB_DEBUG_OPEN) - TOKUDB_HANDLER_TRACE("close:%p", this); - tokudb_my_free(rec_buff); - tokudb_my_free(rec_update_buff); - tokudb_my_free(blob_buff); - tokudb_my_free(alloc_ptr); - tokudb_my_free(range_query_buff); + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_OPEN, "close:%p", this); + tokudb::memory::free(rec_buff); + tokudb::memory::free(rec_update_buff); + tokudb::memory::free(blob_buff); + tokudb::memory::free(alloc_ptr); + tokudb::memory::free(range_query_buff); for (uint32_t i = 0; i < sizeof(mult_key_dbt_array)/sizeof(mult_key_dbt_array[0]); i++) { toku_dbt_array_destroy(&mult_key_dbt_array[i]); } @@ -2108,7 +2277,7 @@ int ha_tokudb::__close() { rec_update_buff = NULL; alloc_ptr = NULL; ha_tokudb::reset(); - int retval = free_share(share); + int retval = share->release(); TOKUDB_HANDLER_DBUG_RETURN(retval); } @@ -2120,8 +2289,11 @@ int ha_tokudb::__close() { // bool ha_tokudb::fix_rec_buff_for_blob(ulong length) { if (!rec_buff || (length > alloced_rec_buff_length)) { - uchar *newptr; - if (!(newptr = (uchar *) tokudb_my_realloc((void *) rec_buff, length, MYF(MY_ALLOW_ZERO_PTR)))) + uchar* newptr = (uchar*)tokudb::memory::realloc( + (void*)rec_buff, + length, + MYF(MY_ALLOW_ZERO_PTR)); + if (!newptr) return 1; rec_buff = newptr; alloced_rec_buff_length = length; @@ -2137,8 +2309,11 @@ bool ha_tokudb::fix_rec_buff_for_blob(ulong length) { // bool ha_tokudb::fix_rec_update_buff_for_blob(ulong length) { if (!rec_update_buff || (length > alloced_update_rec_buff_length)) { - uchar *newptr; - if (!(newptr = (uchar *) tokudb_my_realloc((void *) rec_update_buff, length, MYF(MY_ALLOW_ZERO_PTR)))) + uchar* newptr = (uchar*)tokudb::memory::realloc( + (void*)rec_update_buff, + length, + MYF(MY_ALLOW_ZERO_PTR)); + if (!newptr) return 1; rec_update_buff= newptr; alloced_update_rec_buff_length = length; @@ -2273,9 +2448,11 @@ int ha_tokudb::unpack_blobs( // // assert that num_bytes > 0 iff share->num_blobs > 0 // - assert( !((share->kc_info.num_blobs == 0) && (num_bytes > 0)) ); + assert_always( !((share->kc_info.num_blobs == 0) && (num_bytes > 0)) ); if (num_bytes > num_blob_bytes) { - ptr = (uchar *)tokudb_my_realloc((void *)blob_buff, num_bytes, MYF(MY_ALLOW_ZERO_PTR)); + ptr = (uchar*)tokudb::memory::realloc( + (void*)blob_buff, num_bytes, + MYF(MY_ALLOW_ZERO_PTR)); if (ptr == NULL) { error = ENOMEM; goto exit; @@ -2390,8 +2567,7 @@ int ha_tokudb::unpack_row( data_end_offset = uint2korr(var_field_offset_ptr); break; default: - assert(false); - break; + assert_unreachable(); } unpack_var_field( record + field_offset(field, table), @@ -2488,13 +2664,13 @@ exit: } uint32_t ha_tokudb::place_key_into_mysql_buff( - KEY* key_info, - uchar * record, - uchar* data - ) -{ - KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + get_key_parts(key_info); - uchar *pos = data; + KEY* key_info, + uchar* record, + uchar* data) { + + KEY_PART_INFO* key_part = key_info->key_part; + KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts; + uchar* pos = data; for (; key_part != end; key_part++) { if (key_part->field->null_bit) { @@ -2513,7 +2689,7 @@ uint32_t ha_tokudb::place_key_into_mysql_buff( // // HOPEFULLY TEMPORARY // - assert(table->s->db_low_byte_first); + assert_always(table->s->db_low_byte_first); #endif pos = unpack_toku_key_field( record + field_offset(key_part->field, table), @@ -2554,15 +2730,14 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { } uint32_t ha_tokudb::place_key_into_dbt_buff( - KEY* key_info, - uchar * buff, - const uchar * record, - bool* has_null, - int key_length - ) -{ - KEY_PART_INFO *key_part = key_info->key_part; - KEY_PART_INFO *end = key_part + get_key_parts(key_info); + KEY* key_info, + uchar* buff, + const uchar* record, + bool* has_null, + int key_length) { + + KEY_PART_INFO* key_part = key_info->key_part; + KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts; uchar* curr_buff = buff; *has_null = false; for (; key_part != end && key_length > 0; key_part++) { @@ -2585,7 +2760,7 @@ uint32_t ha_tokudb::place_key_into_dbt_buff( // // HOPEFULLY TEMPORARY // - assert(table->s->db_low_byte_first); + assert_always(table->s->db_low_byte_first); #endif // // accessing field_offset(key_part->field) instead off key_part->offset @@ -2742,25 +2917,29 @@ DBT* ha_tokudb::create_dbt_key_for_lookup( // Returns: // the parameter key // -DBT *ha_tokudb::pack_key( - DBT * key, - uint keynr, - uchar * buff, - const uchar * key_ptr, - uint key_length, - int8_t inf_byte - ) -{ - TOKUDB_HANDLER_DBUG_ENTER("key %p %u:%2.2x inf=%d", key_ptr, key_length, key_length > 0 ? key_ptr[0] : 0, inf_byte); +DBT* ha_tokudb::pack_key( + DBT* key, + uint keynr, + uchar* buff, + const uchar* key_ptr, + uint key_length, + int8_t inf_byte) { + + TOKUDB_HANDLER_DBUG_ENTER( + "key %p %u:%2.2x inf=%d", + key_ptr, + key_length, + key_length > 0 ? key_ptr[0] : 0, + inf_byte); #if TOKU_INCLUDE_EXTENDED_KEYS if (keynr != primary_key && !tokudb_test(hidden_primary_key)) { DBUG_RETURN(pack_ext_key(key, keynr, buff, key_ptr, key_length, inf_byte)); } #endif - KEY *key_info = &table->key_info[keynr]; - KEY_PART_INFO *key_part = key_info->key_part; - KEY_PART_INFO *end = key_part + get_key_parts(key_info); - my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); + KEY* key_info = &table->key_info[keynr]; + KEY_PART_INFO* key_part = key_info->key_part; + KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts; + my_bitmap_map* old_map = dbug_tmp_use_all_columns(table, table->write_set); memset((void *) key, 0, sizeof(*key)); key->data = buff; @@ -2782,7 +2961,7 @@ DBT *ha_tokudb::pack_key( offset = 1; // Data is at key_ptr+1 } #if !defined(MARIADB_BASE_VERSION) - assert(table->s->db_low_byte_first); + assert_always(table->s->db_low_byte_first); #endif buff = pack_key_toku_key_field( buff, @@ -2802,31 +2981,30 @@ DBT *ha_tokudb::pack_key( } #if TOKU_INCLUDE_EXTENDED_KEYS -DBT *ha_tokudb::pack_ext_key( - DBT * key, - uint keynr, - uchar * buff, - const uchar * key_ptr, - uint key_length, - int8_t inf_byte - ) -{ +DBT* ha_tokudb::pack_ext_key( + DBT* key, + uint keynr, + uchar* buff, + const uchar* key_ptr, + uint key_length, + int8_t inf_byte) { + TOKUDB_HANDLER_DBUG_ENTER(""); // build a list of PK parts that are in the SK. we will use this list to build the // extended key if necessary. - KEY *pk_key_info = &table->key_info[primary_key]; - uint pk_parts = get_key_parts(pk_key_info); + KEY* pk_key_info = &table->key_info[primary_key]; + uint pk_parts = pk_key_info->user_defined_key_parts; uint pk_next = 0; struct { const uchar *key_ptr; KEY_PART_INFO *key_part; } pk_info[pk_parts]; - KEY *key_info = &table->key_info[keynr]; - KEY_PART_INFO *key_part = key_info->key_part; - KEY_PART_INFO *end = key_part + get_key_parts(key_info); - my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); + KEY* key_info = &table->key_info[keynr]; + KEY_PART_INFO* key_part = key_info->key_part; + KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts; + my_bitmap_map* old_map = dbug_tmp_use_all_columns(table, table->write_set); memset((void *) key, 0, sizeof(*key)); key->data = buff; @@ -2838,7 +3016,7 @@ DBT *ha_tokudb::pack_ext_key( for (; key_part != end && (int) key_length > 0; key_part++) { // if the SK part is part of the PK, then append it to the list. if (key_part->field->part_of_key.is_set(primary_key)) { - assert(pk_next < pk_parts); + assert_always(pk_next < pk_parts); pk_info[pk_next].key_ptr = key_ptr; pk_info[pk_next].key_part = key_part; pk_next++; @@ -2855,7 +3033,7 @@ DBT *ha_tokudb::pack_ext_key( offset = 1; // Data is at key_ptr+1 } #if !defined(MARIADB_BASE_VERSION) - assert(table->s->db_low_byte_first); + assert_always(table->s->db_low_byte_first); #endif buff = pack_key_toku_key_field( buff, @@ -2869,22 +3047,33 @@ DBT *ha_tokudb::pack_ext_key( } if (key_length > 0) { - assert(key_part == end); + assert_always(key_part == end); end = key_info->key_part + get_ext_key_parts(key_info); // pack PK in order of PK key parts - for (uint pk_index = 0; key_part != end && (int) key_length > 0 && pk_index < pk_parts; pk_index++) { + for (uint pk_index = 0; + key_part != end && (int) key_length > 0 && pk_index < pk_parts; + pk_index++) { uint i; for (i = 0; i < pk_next; i++) { - if (pk_info[i].key_part->fieldnr == pk_key_info->key_part[pk_index].fieldnr) + if (pk_info[i].key_part->fieldnr == + pk_key_info->key_part[pk_index].fieldnr) break; } if (i < pk_next) { const uchar *this_key_ptr = pk_info[i].key_ptr; KEY_PART_INFO *this_key_part = pk_info[i].key_part; - buff = pack_key_toku_key_field(buff, (uchar *) this_key_ptr, this_key_part->field, this_key_part->length); + buff = pack_key_toku_key_field( + buff, + (uchar*)this_key_ptr, + this_key_part->field, + this_key_part->length); } else { - buff = pack_key_toku_key_field(buff, (uchar *) key_ptr, key_part->field, key_part->length); + buff = pack_key_toku_key_field( + buff, + (uchar*)key_ptr, + key_part->field, + key_part->length); key_ptr += key_part->store_length; key_length -= key_part->store_length; key_part++; @@ -2907,18 +3096,22 @@ void ha_tokudb::init_hidden_prim_key_info(DB_TXN *txn) { if (!(share->status & STATUS_PRIMARY_KEY_INIT)) { int error = 0; DBC* c = NULL; - error = share->key_file[primary_key]->cursor(share->key_file[primary_key], txn, &c, 0); - assert(error == 0); + error = share->key_file[primary_key]->cursor( + share->key_file[primary_key], + txn, + &c, + 0); + assert_always(error == 0); DBT key,val; memset(&key, 0, sizeof(key)); memset(&val, 0, sizeof(val)); error = c->c_get(c, &key, &val, DB_LAST); if (error == 0) { - assert(key.size == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); + assert_always(key.size == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); share->auto_ident = hpk_char_to_num((uchar *)key.data); } error = c->c_close(c); - assert(error == 0); + assert_always(error == 0); share->status |= STATUS_PRIMARY_KEY_INIT; } TOKUDB_HANDLER_DBUG_VOID_RETURN; @@ -2939,11 +3132,11 @@ int ha_tokudb::get_status(DB_TXN* txn) { // open status.tokudb // if (!share->status_block) { - error = open_status_dictionary( - &share->status_block, - share->table_name, - txn - ); + error = + open_status_dictionary( + &share->status_block, + share->full_table_name(), + txn); if (error) { goto cleanup; } @@ -2958,7 +3151,7 @@ int ha_tokudb::get_status(DB_TXN* txn) { key.size = sizeof(curr_key); value.flags = DB_DBT_USERMEM; - assert(share->status_block); + assert_always(share->status_block); // // get version // @@ -3048,7 +3241,7 @@ cleanup: */ ha_rows ha_tokudb::estimate_rows_upper_bound() { TOKUDB_HANDLER_DBUG_ENTER(""); - DBUG_RETURN(share->rows + HA_TOKUDB_EXTRA_ROWS); + DBUG_RETURN(share->row_count() + HA_TOKUDB_EXTRA_ROWS); } // @@ -3112,8 +3305,8 @@ bool ha_tokudb::may_table_be_empty(DB_TXN *txn) { DBC* tmp_cursor = NULL; DB_TXN* tmp_txn = NULL; - const int empty_scan = THDVAR(ha_thd(), empty_scan); - if (empty_scan == TOKUDB_EMPTY_SCAN_DISABLED) + const int empty_scan = tokudb::sysvars::empty_scan(ha_thd()); + if (empty_scan == tokudb::sysvars::TOKUDB_EMPTY_SCAN_DISABLED) goto cleanup; if (txn == NULL) { @@ -3128,7 +3321,7 @@ bool ha_tokudb::may_table_be_empty(DB_TXN *txn) { if (error) goto cleanup; tmp_cursor->c_set_check_interrupt_callback(tmp_cursor, tokudb_killed_thd_callback, ha_thd()); - if (empty_scan == TOKUDB_EMPTY_SCAN_LR) + if (empty_scan == tokudb::sysvars::TOKUDB_EMPTY_SCAN_LR) error = tmp_cursor->c_getf_next(tmp_cursor, 0, smart_dbt_do_nothing, NULL); else error = tmp_cursor->c_getf_prev(tmp_cursor, 0, smart_dbt_do_nothing, NULL); @@ -3142,7 +3335,7 @@ bool ha_tokudb::may_table_be_empty(DB_TXN *txn) { cleanup: if (tmp_cursor) { int r = tmp_cursor->c_close(tmp_cursor); - assert(r == 0); + assert_always(r == 0); tmp_cursor = NULL; } if (tmp_txn) { @@ -3165,23 +3358,24 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ai_metadata_update_required = false; abort_loader = false; - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); num_DBs_locked_in_bulk = true; lock_count = 0; if ((rows == 0 || rows > 1) && share->try_table_lock) { - if (get_prelock_empty(thd) && may_table_be_empty(transaction) && transaction != NULL) { + if (tokudb::sysvars::prelock_empty(thd) && + may_table_be_empty(transaction) && + transaction != NULL) { if (using_ignore || is_insert_ignore(thd) || thd->lex->duplicates != DUP_ERROR || table->s->next_number_key_offset) { acquire_table_lock(transaction, lock_write); - } - else { + } else { mult_dbt_flags[primary_key] = 0; if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) { mult_put_flags[primary_key] = DB_NOOVERWRITE; } - uint32_t loader_flags = (get_load_save_space(thd)) ? + uint32_t loader_flags = (tokudb::sysvars::load_save_space(thd)) ? LOADER_COMPRESS_INTERMEDIATES : 0; int error = db_env->create_loader( @@ -3196,7 +3390,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { loader_flags ); if (error) { - assert(loader == NULL); + assert_always(loader == NULL); goto exit_try_table_lock; } @@ -3204,18 +3398,18 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { lc.ha = this; error = loader->set_poll_function(loader, loader_poll_fun, &lc); - assert(!error); + assert_always(!error); error = loader->set_error_callback(loader, loader_dup_fun, &lc); - assert(!error); + assert_always(!error); trx->stmt_progress.using_loader = true; } } exit_try_table_lock: - tokudb_pthread_mutex_lock(&share->mutex); + share->lock(); share->try_table_lock = false; - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); } TOKUDB_HANDLER_DBUG_VOID_RETURN; } @@ -3232,9 +3426,9 @@ int ha_tokudb::end_bulk_insert(bool abort) { tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); bool using_loader = (loader != NULL); if (ai_metadata_update_required) { - tokudb_pthread_mutex_lock(&share->mutex); + share->lock(); error = update_max_auto_inc(share->status_block, share->last_auto_increment); - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); if (error) { goto cleanup; } } delay_updating_ai_metadata = false; @@ -3285,7 +3479,7 @@ int ha_tokudb::end_bulk_insert(bool abort) { cleanup: if (num_DBs_locked_in_bulk) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } num_DBs_locked_in_bulk = false; lock_count = 0; @@ -3380,10 +3574,10 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in cnt++; if ((cnt % 10000) == 0) { sprintf( - status_msg, - "Verifying index uniqueness: Checked %llu of %llu rows in key-%s.", - (long long unsigned) cnt, - share->rows, + status_msg, + "Verifying index uniqueness: Checked %llu of %llu rows in key-%s.", + (long long unsigned) cnt, + share->row_count(), key_info->name); thd_proc_info(thd, status_msg); if (thd_killed(thd)) { @@ -3467,7 +3661,7 @@ int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint cleanup: if (tmp_cursor) { int r = tmp_cursor->c_close(tmp_cursor); - assert(r==0); + assert_always(r==0); tmp_cursor = NULL; } return error; @@ -3475,21 +3669,25 @@ cleanup: static void maybe_do_unique_checks_delay(THD *thd) { if (thd->slave_thread) { - uint64_t delay_ms = THDVAR(thd, rpl_unique_checks_delay); + uint64_t delay_ms = tokudb::sysvars::rpl_unique_checks_delay(thd); if (delay_ms) usleep(delay_ms * 1000); } } static bool need_read_only(THD *thd) { - return opt_readonly || !THDVAR(thd, rpl_check_readonly); -} + return opt_readonly || !tokudb::sysvars::rpl_check_readonly(thd); +} static bool do_unique_checks(THD *thd, bool do_rpl_event) { - if (do_rpl_event && thd->slave_thread && need_read_only(thd) && !THDVAR(thd, rpl_unique_checks)) + if (do_rpl_event && + thd->slave_thread && + need_read_only(thd) && + !tokudb::sysvars::rpl_unique_checks(thd)) { return false; - else + } else { return !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS); + } } int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) { @@ -3552,10 +3750,10 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { // //use for testing the packing of keys // - tmp_pk_key_data = (uchar *)tokudb_my_malloc(pk_key->size, MYF(MY_WME)); - assert(tmp_pk_key_data); - tmp_pk_val_data = (uchar *)tokudb_my_malloc(pk_val->size, MYF(MY_WME)); - assert(tmp_pk_val_data); + tmp_pk_key_data = (uchar*)tokudb::memory::malloc(pk_key->size, MYF(MY_WME)); + assert_always(tmp_pk_key_data); + tmp_pk_val_data = (uchar*)tokudb::memory::malloc(pk_val->size, MYF(MY_WME)); + assert_always(tmp_pk_val_data); memcpy(tmp_pk_key_data, pk_key->data, pk_key->size); memcpy(tmp_pk_val_data, pk_val->data, pk_val->size); tmp_pk_key.data = tmp_pk_key_data; @@ -3588,19 +3786,21 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { &tmp_pk_key, &tmp_pk_val ); - assert(tmp_num_bytes == key.size); + assert_always(tmp_num_bytes == key.size); cmp = memcmp(key_buff3,key_buff2,tmp_num_bytes); - assert(cmp == 0); + assert_always(cmp == 0); // // test key packing of clustering keys // if (key_is_clustering(&table->key_info[keynr])) { error = pack_row(&row, (const uchar *) record, keynr); - assert(error == 0); + assert_always(error == 0); uchar* tmp_buff = NULL; - tmp_buff = (uchar *)tokudb_my_malloc(alloced_rec_buff_length,MYF(MY_WME)); - assert(tmp_buff); + tmp_buff = (uchar*)tokudb::memory::malloc( + alloced_rec_buff_length, + MYF(MY_WME)); + assert_always(tmp_buff); row_desc = (uchar *)share->key_file[keynr]->descriptor->dbt.data; row_desc += (*(uint32_t *)row_desc); row_desc += (*(uint32_t *)row_desc); @@ -3612,10 +3812,10 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { desc_size, &tmp_pk_val ); - assert(tmp_num_bytes == row.size); + assert_always(tmp_num_bytes == row.size); cmp = memcmp(tmp_buff,rec_buff,tmp_num_bytes); - assert(cmp == 0); - tokudb_my_free(tmp_buff); + assert_always(cmp == 0); + tokudb::memory::free(tmp_buff); } } @@ -3623,12 +3823,12 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { // copy stuff back out // error = pack_row(pk_val, (const uchar *) record, primary_key); - assert(pk_val->size == tmp_pk_val.size); + assert_always(pk_val->size == tmp_pk_val.size); cmp = memcmp(pk_val->data, tmp_pk_val_data, pk_val->size); - assert( cmp == 0); + assert_always( cmp == 0); - tokudb_my_free(tmp_pk_key_data); - tokudb_my_free(tmp_pk_val_data); + tokudb::memory::free(tmp_pk_key_data); + tokudb::memory::free(tmp_pk_val_data); } // set the put flags for the main dictionary @@ -3675,7 +3875,7 @@ void ha_tokudb::set_main_dict_put_flags(THD* thd, bool opt_eligible, uint32_t* p int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) { int error = 0; uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); - assert(curr_num_DBs == 1); + assert_always(curr_num_DBs == 1); uint32_t put_flags = mult_put_flags[primary_key]; THD *thd = ha_thd(); @@ -3807,33 +4007,36 @@ int ha_tokudb::write_row(uchar * record) { // of the auto inc field. // if (share->has_auto_inc && record == table->record[0]) { - tokudb_pthread_mutex_lock(&share->mutex); + share->lock(); ulonglong curr_auto_inc = retrieve_auto_increment( - table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), record); + table->field[share->ai_field_index]->key_type(), + field_offset(table->field[share->ai_field_index], table), + record); if (curr_auto_inc > share->last_auto_increment) { share->last_auto_increment = curr_auto_inc; if (delay_updating_ai_metadata) { ai_metadata_update_required = true; - } - else { - update_max_auto_inc(share->status_block, share->last_auto_increment); + } else { + update_max_auto_inc( + share->status_block, + share->last_auto_increment); } } - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); } // // grab reader lock on numDBs_lock // if (!num_DBs_locked_in_bulk) { - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); num_DBs_locked = true; } else { lock_count++; if (lock_count >= 2000) { - rw_unlock(&share->num_DBs_lock); - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); + share->_num_DBs_lock.lock_read(); lock_count = 0; } } @@ -3863,10 +4066,8 @@ int ha_tokudb::write_row(uchar * record) { } } txn = create_sub_trans ? sub_trans : transaction; - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("txn %p", txn); - } - if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY) { + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_TXN, "txn %p", txn); + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK_KEY))) { test_row_packing(record,&prim_key,&row); } if (loader) { @@ -3875,8 +4076,7 @@ int ha_tokudb::write_row(uchar * record) { abort_loader = true; goto cleanup; } - } - else { + } else { error = do_uniqueness_checks(record, txn, thd); if (error) { // for #4633 @@ -3889,8 +4089,7 @@ int ha_tokudb::write_row(uchar * record) { // was found and this is a duplicate key, // so we set last_dup_key last_dup_key = primary_key; - } - else if (r != DB_NOTFOUND) { + } else if (r != DB_NOTFOUND) { // if some other error is returned, return that to the user. error = r; } @@ -3900,8 +4099,7 @@ int ha_tokudb::write_row(uchar * record) { if (curr_num_DBs == 1) { error = insert_row_to_main_dictionary(record,&prim_key, &row, txn); if (error) { goto cleanup; } - } - else { + } else { error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd); if (error) { goto cleanup; } } @@ -3919,7 +4117,7 @@ int ha_tokudb::write_row(uchar * record) { } cleanup: if (num_DBs_locked) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } if (error == DB_KEYEXIST) { error = HA_ERR_FOUND_DUPP_KEY; @@ -3989,7 +4187,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { // of the auto inc field. // if (share->has_auto_inc && new_row == table->record[0]) { - tokudb_pthread_mutex_lock(&share->mutex); + share->lock(); ulonglong curr_auto_inc = retrieve_auto_increment( table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), @@ -4001,7 +4199,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { share->last_auto_increment = curr_auto_inc; } } - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); } // @@ -4009,7 +4207,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { // bool num_DBs_locked = false; if (!num_DBs_locked_in_bulk) { - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); num_DBs_locked = true; } curr_num_DBs = share->num_DBs; @@ -4100,6 +4298,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { last_dup_key = primary_key; } else if (!error) { + updated_rows++; trx->stmt_progress.updated++; track_progress(thd); } @@ -4107,7 +4306,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { cleanup: if (num_DBs_locked) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } if (error == DB_KEYEXIST) { error = HA_ERR_FOUND_DUPP_KEY; @@ -4150,7 +4349,7 @@ int ha_tokudb::delete_row(const uchar * record) { // bool num_DBs_locked = false; if (!num_DBs_locked_in_bulk) { - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); num_DBs_locked = true; } curr_num_DBs = share->num_DBs; @@ -4166,33 +4365,36 @@ int ha_tokudb::delete_row(const uchar * record) { goto cleanup; } - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("all %p stmt %p sub_sp_level %p transaction %p", trx->all, trx->stmt, trx->sub_sp_level, transaction); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "all %p stmt %p sub_sp_level %p transaction %p", + trx->all, + trx->stmt, + trx->sub_sp_level, + transaction); - error = db_env->del_multiple( - db_env, - share->key_file[primary_key], - transaction, - &prim_key, - &row, - curr_num_DBs, - share->key_file, - mult_key_dbt_array, - mult_del_flags - ); + error = + db_env->del_multiple( + db_env, + share->key_file[primary_key], + transaction, + &prim_key, + &row, + curr_num_DBs, + share->key_file, + mult_key_dbt_array, + mult_del_flags); if (error) { DBUG_PRINT("error", ("Got error %d", error)); - } - else { + } else { deleted_rows++; trx->stmt_progress.deleted++; track_progress(thd); } cleanup: if (num_DBs_locked) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -4287,11 +4489,16 @@ cleanup: TOKUDB_HANDLER_DBUG_RETURN(error); } -static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint key_len) { +static bool index_key_is_null( + TABLE* table, + uint keynr, + const uchar* key, + uint key_len) { + bool key_can_be_null = false; - KEY *key_info = &table->key_info[keynr]; - KEY_PART_INFO *key_part = key_info->key_part; - KEY_PART_INFO *end = key_part + get_key_parts(key_info); + KEY* key_info = &table->key_info[keynr]; + KEY_PART_INFO* key_part = key_info->key_part; + KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts; for (; key_part != end; key_part++) { if (key_part->null_bit) { key_can_be_null = true; @@ -4309,7 +4516,7 @@ static bool tokudb_do_bulk_fetch(THD *thd) { case SQLCOM_INSERT_SELECT: case SQLCOM_REPLACE_SELECT: case SQLCOM_DELETE: - return THDVAR(thd, bulk_fetch) != 0; + return tokudb::sysvars::bulk_fetch(thd) != 0; default: return false; } @@ -4361,7 +4568,7 @@ cleanup: // if (cursor) { int r = cursor->c_close(cursor); - assert(r==0); + assert_always(r==0); cursor = NULL; remove_from_trx_handler_list(); } @@ -4404,7 +4611,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { if (cursor) { DBUG_PRINT("note", ("Closing active cursor")); int r = cursor->c_close(cursor); - assert(r==0); + assert_always(r==0); remove_from_trx_handler_list(); } active_index = keynr; @@ -4430,10 +4637,12 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { if (use_write_locks) { cursor_flags |= DB_RMW; } - if (get_disable_prefetching(thd)) { + if (tokudb::sysvars::disable_prefetching(thd)) { cursor_flags |= DBC_DISABLE_PREFETCHING; } - if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, cursor_flags))) { + if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], + transaction, &cursor, + cursor_flags))) { if (error == TOKUDB_MVCC_DICTIONARY_TOO_NEW) { error = HA_ERR_TABLE_DEF_CHANGED; my_error(ER_TABLE_DEF_CHANGED, MYF(0)); @@ -4478,7 +4687,7 @@ int ha_tokudb::index_end() { if (cursor) { DBUG_PRINT("enter", ("table: '%s'", table_share->table_name.str)); int r = cursor->c_close(cursor); - assert(r==0); + assert_always(r==0); cursor = NULL; remove_from_trx_handler_list(); last_cursor_error = 0; @@ -4546,7 +4755,7 @@ void ha_tokudb::extract_hidden_primary_key(uint keynr, DBT const *found_key) { int ha_tokudb::read_row_callback (uchar * buf, uint keynr, DBT const *row, DBT const *found_key) { - assert(keynr == primary_key); + assert_always(keynr == primary_key); return unpack_row(buf, row,found_key, keynr); } @@ -4672,7 +4881,7 @@ int ha_tokudb::read_full_row(uchar * buf) { // HA_ERR_END_OF_FILE if not found // error otherwise // -int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { +int ha_tokudb::index_next_same(uchar* buf, const uchar* key, uint keylen) { TOKUDB_HANDLER_DBUG_ENTER(""); ha_statistic_increment(&SSV::ha_read_next_count); @@ -4690,8 +4899,16 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { // // now do the comparison // - create_dbt_key_from_table(&found_key,tokudb_active_index,key_buff3,buf,&has_null); - cmp = tokudb_prefix_cmp_dbt_key(share->key_file[tokudb_active_index], &curr_key, &found_key); + create_dbt_key_from_table( + &found_key, + tokudb_active_index, + key_buff3,buf, + &has_null); + cmp = + tokudb_prefix_cmp_dbt_key( + share->key_file[tokudb_active_index], + &curr_key, + &found_key); if (cmp) { error = HA_ERR_END_OF_FILE; } @@ -4703,7 +4920,7 @@ cleanup: // -// According to InnoDB handlerton: Positions an index cursor to the index +// According to InnoDB handlerton: Positions an index cursor to the index // specified in keynr. Fetches the row if any // Parameters: // [out] buf - buffer for the returned row @@ -4719,10 +4936,20 @@ cleanup: // TODO: investigate this for correctness // error otherwise // -int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) { - TOKUDB_HANDLER_DBUG_ENTER("key %p %u:%2.2x find=%u", key, key_len, key ? key[0] : 0, find_flag); +int ha_tokudb::index_read( + uchar* buf, + const uchar* key, + uint key_len, + enum ha_rkey_function find_flag) { + + TOKUDB_HANDLER_DBUG_ENTER( + "key %p %u:%2.2x find=%u", + key, + key_len, + key ? key[0] : 0, + find_flag); invalidate_bulk_fetch(); - if (tokudb_debug & TOKUDB_DEBUG_INDEX_KEY) { + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_INDEX_KEY))) { TOKUDB_DBUG_DUMP("mysql key=", key, key_len); } DBT row; @@ -4730,14 +4957,17 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ int error = 0; uint32_t flags = 0; THD* thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);; + tokudb_trx_data* trx = (tokudb_trx_data*)thd_get_ha_data(thd, tokudb_hton); struct smart_dbt_info info; struct index_read_info ir_info; HANDLE_INVALID_CURSOR(); - // if we locked a non-null key range and we now have a null key, then remove the bounds from the cursor - if (range_lock_grabbed && !range_lock_grabbed_null && index_key_is_null(table, tokudb_active_index, key, key_len)) { + // if we locked a non-null key range and we now have a null key, then + // remove the bounds from the cursor + if (range_lock_grabbed && + !range_lock_grabbed_null && + index_key_is_null(table, tokudb_active_index, key, key_len)) { range_lock_grabbed = range_lock_grabbed_null = false; cursor->c_remove_restriction(cursor); } @@ -4758,7 +4988,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ pack_key(&lookup_key, tokudb_active_index, key_buff3, key, key_len, COL_NEG_INF); DBT lookup_bound; pack_key(&lookup_bound, tokudb_active_index, key_buff4, key, key_len, COL_POS_INF); - if (tokudb_debug & TOKUDB_DEBUG_INDEX_KEY) { + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_INDEX_KEY))) { TOKUDB_DBUG_DUMP("tokudb key=", lookup_key.data, lookup_key.size); } ir_info.orig_key = &lookup_key; @@ -4815,8 +5045,8 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ if (!error && !key_read && tokudb_active_index != primary_key && !key_is_clustering(&table->key_info[tokudb_active_index])) { error = read_full_row(buf); } - - if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) { + + if (TOKUDB_UNLIKELY(error && TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_ERROR))) { TOKUDB_HANDLER_TRACE("error:%d:%d", error, find_flag); } trx->stmt_progress.queried++; @@ -4845,7 +5075,7 @@ int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val, bool d // if this is a covering index, this is all we need if (do_key_read) { - assert(!need_val); + assert_always(!need_val); extract_hidden_primary_key(tokudb_active_index, &curr_key); read_key_only(buf, tokudb_active_index, &curr_key); error = 0; @@ -4937,17 +5167,27 @@ exit: return error; } -static int -smart_dbt_bf_callback(DBT const *key, DBT const *row, void *context) { +static int smart_dbt_bf_callback( + DBT const* key, + DBT const* row, + void* context) { SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context; - return info->ha->fill_range_query_buf(info->need_val, key, row, info->direction, info->thd, info->buf, info->key_to_compare); + return + info->ha->fill_range_query_buf( + info->need_val, + key, + row, + info->direction, + info->thd, + info->buf, + info->key_to_compare); } -#if defined(MARIADB_BASE_VERSION) || (50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) -enum icp_result ha_tokudb::toku_handler_index_cond_check(Item* pushed_idx_cond) -{ +enum icp_result ha_tokudb::toku_handler_index_cond_check( + Item* pushed_idx_cond) { + enum icp_result res; - if (end_range ) { + if (end_range) { int cmp; #ifdef MARIADB_BASE_VERSION cmp = compare_key2(end_range); @@ -4957,37 +5197,36 @@ enum icp_result ha_tokudb::toku_handler_index_cond_check(Item* pushed_idx_cond) if (cmp > 0) { return ICP_OUT_OF_RANGE; } - } + } res = pushed_idx_cond->val_int() ? ICP_MATCH : ICP_NO_MATCH; return res; } -#endif // fill in the range query buf for bulk fetch int ha_tokudb::fill_range_query_buf( bool need_val, - DBT const *key, - DBT const *row, + DBT const* key, + DBT const* row, int direction, THD* thd, uchar* buf, - DBT* key_to_compare - ) { + DBT* key_to_compare) { + int error; // // first put the value into range_query_buf // - uint32_t size_remaining = size_range_query_buff - bytes_used_in_range_query_buff; + uint32_t size_remaining = + size_range_query_buff - bytes_used_in_range_query_buff; uint32_t size_needed; - uint32_t user_defined_size = get_tokudb_read_buf_size(thd); + uint32_t user_defined_size = tokudb::sysvars::read_buf_size(thd); uchar* curr_pos = NULL; if (key_to_compare) { int cmp = tokudb_prefix_cmp_dbt_key( share->key_file[tokudb_active_index], key_to_compare, - key - ); + key); if (cmp) { icp_went_out_of_range = true; error = 0; @@ -4995,26 +5234,38 @@ int ha_tokudb::fill_range_query_buf( } } -#if defined(MARIADB_BASE_VERSION) || (50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699) // if we have an index condition pushed down, we check it - if (toku_pushed_idx_cond && (tokudb_active_index == toku_pushed_idx_cond_keyno)) { + if (toku_pushed_idx_cond && + (tokudb_active_index == toku_pushed_idx_cond_keyno)) { unpack_key(buf, key, tokudb_active_index); - enum icp_result result = toku_handler_index_cond_check(toku_pushed_idx_cond); + enum icp_result result = + toku_handler_index_cond_check(toku_pushed_idx_cond); + // If we have reason to stop, we set icp_went_out_of_range and get out + // otherwise, if we simply see that the current key is no match, + // we tell the cursor to continue and don't store + // the key locally if (result == ICP_OUT_OF_RANGE || thd_killed(thd)) { icp_went_out_of_range = true; error = 0; + DEBUG_SYNC(ha_thd(), "tokudb_icp_asc_scan_out_of_range"); goto cleanup; - } - // otherwise, if we simply see that the current key is no match, - // we tell the cursor to continue and don't store - // the key locally - else if (result == ICP_NO_MATCH) { + } else if (result == ICP_NO_MATCH) { + // if we are performing a DESC ICP scan and have no end_range + // to compare to stop using ICP filtering as there isn't much more + // that we can do without going through contortions with remembering + // and comparing key parts. + if (!end_range && + direction < 0) { + + cancel_pushed_idx_cond(); + DEBUG_SYNC(ha_thd(), "tokudb_icp_desc_scan_invalidate"); + } + error = TOKUDB_CURSOR_CONTINUE; goto cleanup; } } -#endif // at this point, if ICP is on, we have verified that the key is one // we are interested in, so we proceed with placing the data @@ -5023,57 +5274,63 @@ int ha_tokudb::fill_range_query_buf( if (need_val) { if (unpack_entire_row) { size_needed = 2*sizeof(uint32_t) + key->size + row->size; - } - else { + } else { // this is an upper bound - size_needed = sizeof(uint32_t) + // size of key length - key->size + row->size + //key and row - num_var_cols_for_query*(sizeof(uint32_t)) + //lengths of varchars stored - sizeof(uint32_t); //length of blobs + size_needed = + // size of key length + sizeof(uint32_t) + + // key and row + key->size + row->size + + // lengths of varchars stored + num_var_cols_for_query * (sizeof(uint32_t)) + + // length of blobs + sizeof(uint32_t); } - } - else { + } else { size_needed = sizeof(uint32_t) + key->size; } if (size_remaining < size_needed) { - range_query_buff = (uchar *)tokudb_my_realloc( - (void *)range_query_buff, - bytes_used_in_range_query_buff+size_needed, - MYF(MY_WME) - ); + range_query_buff = + static_cast<uchar*>(tokudb::memory::realloc( + static_cast<void*>(range_query_buff), + bytes_used_in_range_query_buff + size_needed, + MYF(MY_WME))); if (range_query_buff == NULL) { error = ENOMEM; invalidate_bulk_fetch(); goto cleanup; } - size_range_query_buff = bytes_used_in_range_query_buff+size_needed; + size_range_query_buff = bytes_used_in_range_query_buff + size_needed; } // // now we know we have the size, let's fill the buffer, starting with the key // curr_pos = range_query_buff + bytes_used_in_range_query_buff; - *(uint32_t *)curr_pos = key->size; + *reinterpret_cast<uint32_t*>(curr_pos) = key->size; curr_pos += sizeof(uint32_t); memcpy(curr_pos, key->data, key->size); curr_pos += key->size; if (need_val) { if (unpack_entire_row) { - *(uint32_t *)curr_pos = row->size; + *reinterpret_cast<uint32_t*>(curr_pos) = row->size; curr_pos += sizeof(uint32_t); memcpy(curr_pos, row->data, row->size); curr_pos += row->size; - } - else { + } else { // need to unpack just the data we care about - const uchar* fixed_field_ptr = (const uchar *) row->data; + const uchar* fixed_field_ptr = static_cast<const uchar*>(row->data); fixed_field_ptr += table_share->null_bytes; const uchar* var_field_offset_ptr = NULL; const uchar* var_field_data_ptr = NULL; - var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[tokudb_active_index].fixed_field_size; - var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[tokudb_active_index].len_of_offsets; + var_field_offset_ptr = + fixed_field_ptr + + share->kc_info.mcp_info[tokudb_active_index].fixed_field_size; + var_field_data_ptr = + var_field_offset_ptr + + share->kc_info.mcp_info[tokudb_active_index].len_of_offsets; // first the null bytes memcpy(curr_pos, row->data, table_share->null_bytes); @@ -5087,8 +5344,7 @@ int ha_tokudb::fill_range_query_buf( memcpy( curr_pos, fixed_field_ptr + share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val, - share->kc_info.field_lengths[field_index] - ); + share->kc_info.field_lengths[field_index]); curr_pos += share->kc_info.field_lengths[field_index]; } @@ -5097,7 +5353,8 @@ int ha_tokudb::fill_range_query_buf( // for (uint32_t i = 0; i < num_var_cols_for_query; i++) { uint field_index = var_cols_for_query[i]; - uint32_t var_field_index = share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val; + uint32_t var_field_index = + share->kc_info.cp_info[tokudb_active_index][field_index].col_pack_val; uint32_t data_start_offset; uint32_t field_len; @@ -5106,11 +5363,13 @@ int ha_tokudb::fill_range_query_buf( &data_start_offset, var_field_index, var_field_offset_ptr, - share->kc_info.num_offset_bytes - ); + share->kc_info.num_offset_bytes); memcpy(curr_pos, &field_len, sizeof(field_len)); curr_pos += sizeof(field_len); - memcpy(curr_pos, var_field_data_ptr + data_start_offset, field_len); + memcpy( + curr_pos, + var_field_data_ptr + data_start_offset, + field_len); curr_pos += field_len; } @@ -5124,9 +5383,12 @@ int ha_tokudb::fill_range_query_buf( &blob_offset, share->kc_info.mcp_info[tokudb_active_index].len_of_offsets, var_field_data_ptr, - share->kc_info.num_offset_bytes - ); - data_size = row->size - blob_offset - (uint32_t)(var_field_data_ptr - (const uchar *)row->data); + share->kc_info.num_offset_bytes); + data_size = + row->size - + blob_offset - + static_cast<uint32_t>((var_field_data_ptr - + static_cast<const uchar*>(row->data))); memcpy(curr_pos, &data_size, sizeof(data_size)); curr_pos += sizeof(data_size); memcpy(curr_pos, var_field_data_ptr + blob_offset, data_size); @@ -5136,7 +5398,7 @@ int ha_tokudb::fill_range_query_buf( } bytes_used_in_range_query_buff = curr_pos - range_query_buff; - assert(bytes_used_in_range_query_buff <= size_range_query_buff); + assert_always(bytes_used_in_range_query_buff <= size_range_query_buff); // // now determine if we should continue with the bulk fetch @@ -5153,14 +5415,16 @@ int ha_tokudb::fill_range_query_buf( // row fetch upper bound. if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) { uint64_t row_fetch_upper_bound = 1LLU << bulk_fetch_iteration; - assert(row_fetch_upper_bound > 0); + assert_always(row_fetch_upper_bound > 0); if (rows_fetched_using_bulk_fetch >= row_fetch_upper_bound) { error = 0; goto cleanup; } } - if (bytes_used_in_range_query_buff + table_share->rec_buff_length > user_defined_size) { + if (bytes_used_in_range_query_buff + + table_share->rec_buff_length > + user_defined_size) { error = 0; goto cleanup; } @@ -5178,11 +5442,9 @@ int ha_tokudb::fill_range_query_buf( int cmp = tokudb_cmp_dbt_key( share->key_file[tokudb_active_index], key, - &right_range - ); + &right_range); error = (cmp > 0) ? 0 : TOKUDB_CURSOR_CONTINUE; - } - else { + } else { // compare what we got to the left endpoint of prelocked range // because we are searching keys in descending order if (prelocked_left_range_size == 0) { @@ -5196,15 +5458,19 @@ int ha_tokudb::fill_range_query_buf( int cmp = tokudb_cmp_dbt_key( share->key_file[tokudb_active_index], key, - &left_range - ); + &left_range); error = (cmp < 0) ? 0 : TOKUDB_CURSOR_CONTINUE; } cleanup: return error; } -int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_key_read) { +int ha_tokudb::get_next( + uchar* buf, + int direction, + DBT* key_to_compare, + bool do_key_read) { + int error = 0; HANDLE_INVALID_CURSOR(); @@ -5221,17 +5487,18 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_ // we need to read the val of what we retrieve if // we do NOT have a covering index AND we are using a clustering secondary // key - bool need_val = (do_key_read == 0) && - (tokudb_active_index == primary_key || key_is_clustering(&table->key_info[tokudb_active_index])); + bool need_val = + (do_key_read == 0) && + (tokudb_active_index == primary_key || + key_is_clustering(&table->key_info[tokudb_active_index])); - if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) { + if ((bytes_used_in_range_query_buff - + curr_range_query_buff_offset) > 0) { error = read_data_from_range_query_buff(buf, need_val, do_key_read); - } - else if (icp_went_out_of_range) { + } else if (icp_went_out_of_range) { icp_went_out_of_range = false; error = HA_ERR_END_OF_FILE; - } - else { + } else { invalidate_bulk_fetch(); if (doing_bulk_fetch) { struct smart_dbt_bf_info bf_info; @@ -5252,16 +5519,28 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_ // this while loop. icp_out_of_range will be set if we hit a row that // the index condition states is out of our range. When that hits, // we know all the data in the buffer is the last data we will retrieve - while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) { + while (bytes_used_in_range_query_buff == 0 && + !icp_went_out_of_range && error == 0) { if (direction > 0) { - error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info); + error = + cursor->c_getf_next( + cursor, + flags, + smart_dbt_bf_callback, + &bf_info); } else { - error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info); + error = + cursor->c_getf_prev( + cursor, + flags, + smart_dbt_bf_callback, + &bf_info); } } // if there is no data set and we went out of range, // then there is nothing to return - if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) { + if (bytes_used_in_range_query_buff == 0 && + icp_went_out_of_range) { icp_went_out_of_range = false; error = HA_ERR_END_OF_FILE; } @@ -5269,26 +5548,46 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_ bulk_fetch_iteration++; } - error = handle_cursor_error(error, HA_ERR_END_OF_FILE,tokudb_active_index); - if (error) { goto cleanup; } + error = + handle_cursor_error( + error, + HA_ERR_END_OF_FILE, + tokudb_active_index); + if (error) { + goto cleanup; + } // // now that range_query_buff is filled, read an element // - error = read_data_from_range_query_buff(buf, need_val, do_key_read); - } - else { + error = + read_data_from_range_query_buff(buf, need_val, do_key_read); + } else { struct smart_dbt_info info; info.ha = this; info.buf = buf; info.keynr = tokudb_active_index; if (direction > 0) { - error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); + error = + cursor->c_getf_next( + cursor, + flags, + SMART_DBT_CALLBACK(do_key_read), + &info); } else { - error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK(do_key_read), &info); + error = + cursor->c_getf_prev( + cursor, + flags, + SMART_DBT_CALLBACK(do_key_read), + &info); } - error = handle_cursor_error(error, HA_ERR_END_OF_FILE, tokudb_active_index); + error = + handle_cursor_error( + error, + HA_ERR_END_OF_FILE, + tokudb_active_index); } } } @@ -5301,13 +5600,17 @@ int ha_tokudb::get_next(uchar* buf, int direction, DBT* key_to_compare, bool do_ // read the full row by doing a point query into the // main table. // - if (!error && !do_key_read && (tokudb_active_index != primary_key) && !key_is_clustering(&table->key_info[tokudb_active_index])) { + if (!error && + !do_key_read && + (tokudb_active_index != primary_key) && + !key_is_clustering(&table->key_info[tokudb_active_index])) { error = read_full_row(buf); } if (!error) { THD *thd = ha_thd(); - tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); + tokudb_trx_data* trx = + static_cast<tokudb_trx_data*>(thd_get_ha_data(thd, tokudb_hton)); trx->stmt_progress.queried++; track_progress(thd); if (thd_killed(thd)) @@ -5509,40 +5812,69 @@ int ha_tokudb::rnd_next(uchar * buf) { void ha_tokudb::track_progress(THD* thd) { tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); if (trx) { - ulonglong num_written = trx->stmt_progress.inserted + trx->stmt_progress.updated + trx->stmt_progress.deleted; + ulonglong num_written = trx->stmt_progress.inserted + + trx->stmt_progress.updated + + trx->stmt_progress.deleted; bool update_status = - (trx->stmt_progress.queried && tokudb_read_status_frequency && (trx->stmt_progress.queried % tokudb_read_status_frequency) == 0) || - (num_written && tokudb_write_status_frequency && (num_written % tokudb_write_status_frequency) == 0); + (trx->stmt_progress.queried && + tokudb::sysvars::read_status_frequency && + (trx->stmt_progress.queried % + tokudb::sysvars::read_status_frequency) == 0) || + (num_written && tokudb::sysvars::write_status_frequency && + (num_written % tokudb::sysvars::write_status_frequency) == 0); if (update_status) { char *next_status = write_status_msg; bool first = true; int r; if (trx->stmt_progress.queried) { - r = sprintf(next_status, "Queried about %llu row%s", trx->stmt_progress.queried, trx->stmt_progress.queried == 1 ? "" : "s"); - assert(r >= 0); + r = sprintf( + next_status, + "Queried about %llu row%s", + trx->stmt_progress.queried, + trx->stmt_progress.queried == 1 ? "" : "s"); + assert_always(r >= 0); next_status += r; first = false; } if (trx->stmt_progress.inserted) { if (trx->stmt_progress.using_loader) { - r = sprintf(next_status, "%sFetched about %llu row%s, loading data still remains", first ? "" : ", ", trx->stmt_progress.inserted, trx->stmt_progress.inserted == 1 ? "" : "s"); - } - else { - r = sprintf(next_status, "%sInserted about %llu row%s", first ? "" : ", ", trx->stmt_progress.inserted, trx->stmt_progress.inserted == 1 ? "" : "s"); + r = sprintf( + next_status, + "%sFetched about %llu row%s, loading data still remains", + first ? "" : ", ", + trx->stmt_progress.inserted, + trx->stmt_progress.inserted == 1 ? "" : "s"); + } else { + r = sprintf( + next_status, + "%sInserted about %llu row%s", + first ? "" : ", ", + trx->stmt_progress.inserted, + trx->stmt_progress.inserted == 1 ? "" : "s"); } - assert(r >= 0); + assert_always(r >= 0); next_status += r; first = false; } if (trx->stmt_progress.updated) { - r = sprintf(next_status, "%sUpdated about %llu row%s", first ? "" : ", ", trx->stmt_progress.updated, trx->stmt_progress.updated == 1 ? "" : "s"); - assert(r >= 0); + r = sprintf( + next_status, + "%sUpdated about %llu row%s", + first ? "" : ", ", + trx->stmt_progress.updated, + trx->stmt_progress.updated == 1 ? "" : "s"); + assert_always(r >= 0); next_status += r; first = false; } if (trx->stmt_progress.deleted) { - r = sprintf(next_status, "%sDeleted about %llu row%s", first ? "" : ", ", trx->stmt_progress.deleted, trx->stmt_progress.deleted == 1 ? "" : "s"); - assert(r >= 0); + r = sprintf( + next_status, + "%sDeleted about %llu row%s", + first ? "" : ", ", + trx->stmt_progress.deleted, + trx->stmt_progress.deleted == 1 ? "" : "s"); + assert_always(r >= 0); next_status += r; first = false; } @@ -5583,7 +5915,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { // test rpl slave by inducing a delay before the point query THD *thd = ha_thd(); if (thd->slave_thread && (in_rpl_delete_rows || in_rpl_update_rows)) { - uint64_t delay_ms = THDVAR(thd, rpl_lookup_rows_delay); + uint64_t delay_ms = tokudb::sysvars::rpl_lookup_rows_delay(thd); if (delay_ms) usleep(delay_ms * 1000); } @@ -5664,7 +5996,7 @@ int ha_tokudb::prelock_range(const key_range *start_key, const key_range *end_ke // if (cursor) { int r = cursor->c_close(cursor); - assert(r==0); + assert_always(r==0); cursor = NULL; remove_from_trx_handler_list(); } @@ -5783,8 +6115,7 @@ int ha_tokudb::info(uint flag) { #endif DB_TXN* txn = NULL; if (flag & HA_STATUS_VARIABLE) { - // Just to get optimizations right - stats.records = share->rows + share->rows_from_locked_table; + stats.records = share->row_count() + share->rows_from_locked_table; stats.deleted = 0; if (!(flag & HA_STATUS_NO_LOCK)) { uint64_t num_rows = 0; @@ -5792,27 +6123,32 @@ int ha_tokudb::info(uint flag) { memset(&frag_info, 0, sizeof frag_info); error = txn_begin(db_env, NULL, &txn, DB_READ_UNCOMMITTED, ha_thd()); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } // we should always have a primary key - assert(share->file != NULL); + assert_always(share->file != NULL); error = estimate_num_rows(share->file,&num_rows, txn); if (error == 0) { - share->rows = num_rows; + share->set_row_count(num_rows, false); stats.records = num_rows; - } - else { + } else { goto cleanup; } error = share->file->get_fragmentation(share->file, &frag_info); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } stats.delete_length = frag_info.unused_bytes; DB_BTREE_STAT64 dict_stats; error = share->file->stat64(share->file, txn, &dict_stats); - if (error) { goto cleanup; } - + if (error) { + goto cleanup; + } + stats.create_time = dict_stats.bt_create_time_sec; stats.update_time = dict_stats.bt_modify_time_sec; stats.check_time = dict_stats.bt_verify_time_sec; @@ -5822,18 +6158,24 @@ int ha_tokudb::info(uint flag) { // in this case, we have a hidden primary key, do not // want to report space taken up by the hidden primary key to the user // - uint64_t hpk_space = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH*dict_stats.bt_ndata; - stats.data_file_length = (hpk_space > stats.data_file_length) ? 0 : stats.data_file_length - hpk_space; - } - else { + uint64_t hpk_space = + TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH * dict_stats.bt_ndata; + stats.data_file_length = + (hpk_space > stats.data_file_length) ? + 0 : stats.data_file_length - hpk_space; + } else { // // one infinity byte per key needs to be subtracted // uint64_t inf_byte_space = dict_stats.bt_ndata; - stats.data_file_length = (inf_byte_space > stats.data_file_length) ? 0 : stats.data_file_length - inf_byte_space; + stats.data_file_length = + (inf_byte_space > stats.data_file_length) ? + 0 : stats.data_file_length - inf_byte_space; } - stats.mean_rec_length = stats.records ? (ulong)(stats.data_file_length/stats.records) : 0; + stats.mean_rec_length = + stats.records ? + (ulong)(stats.data_file_length/stats.records) : 0; stats.index_file_length = 0; // curr_num_DBs is the number of keys we have, according // to the mysql layer. if drop index is running concurrently @@ -5852,37 +6194,58 @@ int ha_tokudb::info(uint flag) { if (i == primary_key || share->key_file[i] == NULL) { continue; } - error = share->key_file[i]->stat64( - share->key_file[i], - txn, - &dict_stats - ); - if (error) { goto cleanup; } + error = + share->key_file[i]->stat64( + share->key_file[i], + txn, + &dict_stats); + if (error) { + goto cleanup; + } stats.index_file_length += dict_stats.bt_dsize; - error = share->file->get_fragmentation( - share->file, - &frag_info - ); - if (error) { goto cleanup; } + error = + share->file->get_fragmentation( + share->file, + &frag_info); + if (error) { + goto cleanup; + } stats.delete_length += frag_info.unused_bytes; } } + + /* + The following comment and logic has been taken from InnoDB and + an old hack was removed that forced to always set stats.records > 0 + --- + The MySQL optimizer seems to assume in a left join that n_rows + is an accurate estimate if it is zero. Of course, it is not, + since we do not have any locks on the rows yet at this phase. + Since SHOW TABLE STATUS seems to call this function with the + HA_STATUS_TIME flag set, while the left join optimizer does not + set that flag, we add one to a zero value if the flag is not + set. That way SHOW TABLE STATUS will show the best estimate, + while the optimizer never sees the table empty. */ + if (stats.records == 0 && !(flag & HA_STATUS_TIME)) { + stats.records++; + } } if ((flag & HA_STATUS_CONST)) { - stats.max_data_file_length= 9223372036854775807ULL; - tokudb::set_card_in_key_info(table, share->n_rec_per_key, share->rec_per_key); + stats.max_data_file_length = 9223372036854775807ULL; + share->set_cardinality_counts_in_table(table); } /* Don't return key if we got an error for the internal primary key */ if (flag & HA_STATUS_ERRKEY && last_dup_key < table_share->keys) { errkey = last_dup_key; - } + } - if (flag & HA_STATUS_AUTO && table->found_next_number_field) { - THD *thd= table->in_use; - struct system_variables *variables= &thd->variables; - stats.auto_increment_value = share->last_auto_increment + variables->auto_increment_increment; + if (flag & HA_STATUS_AUTO && table->found_next_number_field) { + THD* thd = table->in_use; + struct system_variables* variables = &thd->variables; + stats.auto_increment_value = + share->last_auto_increment + variables->auto_increment_increment; } error = 0; cleanup: @@ -5929,7 +6292,7 @@ int ha_tokudb::extra(enum ha_extra_function operation) { TOKUDB_HANDLER_DBUG_RETURN(0); } -int ha_tokudb::reset(void) { +int ha_tokudb::reset() { TOKUDB_HANDLER_DBUG_ENTER(""); key_read = false; using_ignore = false; @@ -5953,14 +6316,13 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) { TOKUDB_HANDLER_DBUG_ENTER("%p %s", trans, lt == lock_read ? "r" : "w"); int error = ENOSYS; if (!num_DBs_locked_in_bulk) { - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); } uint curr_num_DBs = share->num_DBs; if (lt == lock_read) { error = 0; goto cleanup; - } - else if (lt == lock_write) { + } else if (lt == lock_write) { for (uint i = 0; i < curr_num_DBs; i++) { DB* db = share->key_file[i]; error = db->pre_acquire_table_lock(db, trans); @@ -5968,11 +6330,9 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) { TOKUDB_HANDLER_TRACE("%d db=%p trans=%p", i, db, trans); if (error) break; } - if (tokudb_debug & TOKUDB_DEBUG_LOCK) - TOKUDB_HANDLER_TRACE("error=%d", error); + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_LOCK, "error=%d", error); if (error) goto cleanup; - } - else { + } else { error = ENOSYS; goto cleanup; } @@ -5980,7 +6340,7 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) { error = 0; cleanup: if (!num_DBs_locked_in_bulk) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -6011,17 +6371,19 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { if ((error = txn_begin(db_env, NULL, &trx->all, txn_begin_flags, thd))) { goto cleanup; } - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("created master %p", trx->all); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "created master %p", + trx->all); trx->sp_level = trx->all; trans_register_ha(thd, true, tokudb_hton); } DBUG_PRINT("trans", ("starting transaction stmt")); if (trx->stmt) { - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("warning:stmt=%p", trx->stmt); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "warning:stmt=%p", + trx->stmt); } uint32_t txn_begin_flags; if (trx->all == NULL) { @@ -6036,21 +6398,25 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { if (txn_begin_flags == 0 && is_autocommit && thd_sql_command(thd) == SQLCOM_SELECT) { txn_begin_flags = DB_TXN_SNAPSHOT; } - if (is_autocommit && thd_sql_command(thd) == SQLCOM_SELECT && !thd->in_sub_stmt && lock.type <= TL_READ_NO_INSERT && !thd->lex->uses_stored_routines()) { + if (is_autocommit && thd_sql_command(thd) == SQLCOM_SELECT && + !thd->in_sub_stmt && lock.type <= TL_READ_NO_INSERT && + !thd->lex->uses_stored_routines()) { txn_begin_flags |= DB_TXN_READ_ONLY; } - } - else { + } else { txn_begin_flags = DB_INHERIT_ISOLATION; } - if ((error = txn_begin(db_env, trx->sp_level, &trx->stmt, txn_begin_flags, thd))) { + error = txn_begin(db_env, trx->sp_level, &trx->stmt, txn_begin_flags, thd); + if (error) { /* We leave the possible master transaction open */ goto cleanup; } trx->sub_sp_level = trx->stmt; - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("created stmt %p sp_level %p", trx->sp_level, trx->stmt); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "created stmt %p sp_level %p", + trx->sp_level, + trx->stmt); reset_stmt_progress(&trx->stmt_progress); trans_register_ha(thd, false, tokudb_hton); cleanup: @@ -6081,26 +6447,40 @@ static const char *lock_type_str(int lock_type) { // error otherwise // int ha_tokudb::external_lock(THD * thd, int lock_type) { - TOKUDB_HANDLER_DBUG_ENTER("cmd %d lock %d %s %s", thd_sql_command(thd), lock_type, lock_type_str(lock_type), share->table_name); - if (!(tokudb_debug & TOKUDB_DEBUG_ENTER) && (tokudb_debug & TOKUDB_DEBUG_LOCK)) { - TOKUDB_HANDLER_TRACE("cmd %d lock %d %s %s", thd_sql_command(thd), lock_type, lock_type_str(lock_type), share->table_name); - } - if (tokudb_debug & TOKUDB_DEBUG_LOCK) { - TOKUDB_HANDLER_TRACE("q %s", thd->query()); - } + TOKUDB_HANDLER_DBUG_ENTER( + "cmd %d lock %d %s %s", + thd_sql_command(thd), + lock_type, + lock_type_str(lock_type), + share->full_table_name()); + if (TOKUDB_UNLIKELY(!TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_ENTER) && + TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_LOCK))) { + TOKUDB_HANDLER_TRACE( + "cmd %d lock %d %s %s", + thd_sql_command(thd), + lock_type, + lock_type_str(lock_type), + share->full_table_name()); + } + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_LOCK, "q %s", thd->query()); int error = 0; - tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); + tokudb_trx_data* trx = (tokudb_trx_data*)thd_get_ha_data(thd, tokudb_hton); if (!trx) { error = create_tokudb_trx_data_instance(&trx); if (error) { goto cleanup; } thd_set_ha_data(thd, tokudb_hton, trx); } - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("trx %p %p %p %p %u %u", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level, - trx->tokudb_lock_count, trx->create_lock_count); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "trx %p %p %p %p %u %u", + trx->all, + trx->stmt, + trx->sp_level, + trx->sub_sp_level, + trx->tokudb_lock_count, + trx->create_lock_count); if (trx->all == NULL) { trx->sp_level = NULL; @@ -6120,19 +6500,11 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { } transaction = trx->sub_sp_level; trx->tokudb_lock_count++; - } - else { - tokudb_pthread_mutex_lock(&share->mutex); - // hate dealing with comparison of signed vs unsigned, so doing this - if (deleted_rows > added_rows && share->rows < (deleted_rows - added_rows)) { - share->rows = 0; - } - else { - share->rows += (added_rows - deleted_rows); - } - tokudb_pthread_mutex_unlock(&share->mutex); + } else { + share->update_row_count(thd, added_rows, deleted_rows, updated_rows); added_rows = 0; deleted_rows = 0; + updated_rows = 0; share->rows_from_locked_table = 0; if (trx->tokudb_lock_count > 0) { if (--trx->tokudb_lock_count <= trx->create_lock_count) { @@ -6154,8 +6526,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { } } cleanup: - if (tokudb_debug & TOKUDB_DEBUG_LOCK) - TOKUDB_HANDLER_TRACE("error=%d", error); + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_LOCK, "error=%d", error); TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -6164,24 +6535,32 @@ cleanup: TABLE LOCK is done. Under LOCK TABLES, each used tables will force a call to start_stmt. */ -int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { - TOKUDB_HANDLER_DBUG_ENTER("cmd %d lock %d %s", thd_sql_command(thd), lock_type, share->table_name); - if (tokudb_debug & TOKUDB_DEBUG_LOCK) { - TOKUDB_HANDLER_TRACE("q %s", thd->query()); - } +int ha_tokudb::start_stmt(THD* thd, thr_lock_type lock_type) { + TOKUDB_HANDLER_DBUG_ENTER( + "cmd %d lock %d %s", + thd_sql_command(thd), + lock_type, + share->full_table_name()); + + TOKUDB_HANDLER_TRACE_FOR_FLAGS(TOKUDB_DEBUG_LOCK, "q %s", thd->query()); int error = 0; - tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton); + tokudb_trx_data* trx = (tokudb_trx_data*)thd_get_ha_data(thd, tokudb_hton); if (!trx) { error = create_tokudb_trx_data_instance(&trx); if (error) { goto cleanup; } thd_set_ha_data(thd, tokudb_hton, trx); } - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("trx %p %p %p %p %u %u", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level, - trx->tokudb_lock_count, trx->create_lock_count); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "trx %p %p %p %p %u %u", + trx->all, + trx->stmt, + trx->sp_level, + trx->sub_sp_level, + trx->tokudb_lock_count, + trx->create_lock_count); /* note that trx->stmt may have been already initialized as start_stmt() @@ -6194,11 +6573,11 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { goto cleanup; } trx->create_lock_count = trx->tokudb_lock_count; - } - else { - if (tokudb_debug & TOKUDB_DEBUG_TXN) { - TOKUDB_HANDLER_TRACE("trx->stmt %p already existed", trx->stmt); - } + } else { + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_TXN, + "trx->stmt %p already existed", + trx->stmt); } if (added_rows > deleted_rows) { share->rows_from_locked_table = added_rows - deleted_rows; @@ -6272,27 +6651,40 @@ uint32_t ha_tokudb::get_cursor_isolation_flags(enum thr_lock_type lock_type, THD time). In the future we will probably try to remove this. */ -THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type) { - TOKUDB_HANDLER_DBUG_ENTER("lock_type=%d cmd=%d", lock_type, thd_sql_command(thd)); - if (tokudb_debug & TOKUDB_DEBUG_LOCK) { - TOKUDB_HANDLER_TRACE("lock_type=%d cmd=%d", lock_type, thd_sql_command(thd)); - } +THR_LOCK_DATA* *ha_tokudb::store_lock( + THD* thd, + THR_LOCK_DATA** to, + enum thr_lock_type lock_type) { + + TOKUDB_HANDLER_DBUG_ENTER( + "lock_type=%d cmd=%d", + lock_type, + thd_sql_command(thd)); + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_LOCK, + "lock_type=%d cmd=%d", + lock_type, + thd_sql_command(thd)); if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) { enum_sql_command sql_command = (enum_sql_command) thd_sql_command(thd); if (!thd->in_lock_tables) { - if (sql_command == SQLCOM_CREATE_INDEX && get_create_index_online(thd)) { + if (sql_command == SQLCOM_CREATE_INDEX && + tokudb::sysvars::create_index_online(thd)) { // hot indexing - rw_rdlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_read(); if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) { lock_type = TL_WRITE_ALLOW_WRITE; } - rw_unlock(&share->num_DBs_lock); - } else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && - sql_command != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) { + share->_num_DBs_lock.unlock(); + } else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && + sql_command != SQLCOM_TRUNCATE && + !thd_tablespace_op(thd)) { // allow concurrent writes lock_type = TL_WRITE_ALLOW_WRITE; - } else if (sql_command == SQLCOM_OPTIMIZE && lock_type == TL_READ_NO_INSERT) { + } else if (sql_command == SQLCOM_OPTIMIZE && + lock_type == TL_READ_NO_INSERT) { // hot optimize table lock_type = TL_READ; } @@ -6300,88 +6692,130 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l lock.type = lock_type; } *to++ = &lock; - if (tokudb_debug & TOKUDB_DEBUG_LOCK) - TOKUDB_HANDLER_TRACE("lock_type=%d", lock_type); - DBUG_RETURN(to); + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_LOCK, + "lock_type=%d", + lock_type); + TOKUDB_HANDLER_DBUG_RETURN_PTR(to); } -static toku_compression_method get_compression_method(DB *file) { +static toku_compression_method get_compression_method(DB* file) { enum toku_compression_method method; int r = file->get_compression_method(file, &method); - assert(r == 0); + assert_always(r == 0); return method; } #if TOKU_INCLUDE_ROW_TYPE_COMPRESSION -enum row_type ha_tokudb::get_row_type(void) const { +enum row_type ha_tokudb::get_row_type() const { toku_compression_method compression_method = get_compression_method(share->file); return toku_compression_method_to_row_type(compression_method); } #endif static int create_sub_table( - const char *table_name, - DBT* row_descriptor, - DB_TXN* txn, - uint32_t block_size, + const char* table_name, + DBT* row_descriptor, + DB_TXN* txn, + uint32_t block_size, uint32_t read_block_size, toku_compression_method compression_method, bool is_hot_index, - uint32_t fanout - ) -{ + uint32_t fanout) { + TOKUDB_DBUG_ENTER(""); int error; DB *file = NULL; uint32_t create_flags; - - + + error = db_create(&file, db_env, 0); if (error) { DBUG_PRINT("error", ("Got error: %d when creating table", error)); my_errno = error; goto exit; } - + if (block_size != 0) { error = file->set_pagesize(file, block_size); if (error != 0) { - DBUG_PRINT("error", ("Got error: %d when setting block size %u for table '%s'", error, block_size, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when setting block size %u for table '%s'", + error, + block_size, + table_name)); goto exit; } } if (read_block_size != 0) { error = file->set_readpagesize(file, read_block_size); if (error != 0) { - DBUG_PRINT("error", ("Got error: %d when setting read block size %u for table '%s'", error, read_block_size, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when setting read block size %u for table '%s'", + error, + read_block_size, + table_name)); goto exit; } } if (fanout != 0) { error = file->set_fanout(file, fanout); if (error != 0) { - DBUG_PRINT("error", ("Got error: %d when setting fanout %u for table '%s'", - error, fanout, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when setting fanout %u for table '%s'", + error, + fanout, + table_name)); goto exit; } } error = file->set_compression_method(file, compression_method); if (error != 0) { - DBUG_PRINT("error", ("Got error: %d when setting compression type %u for table '%s'", error, compression_method, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when setting compression type %u for table '%s'", + error, + compression_method, + table_name)); goto exit; } - create_flags = DB_THREAD | DB_CREATE | DB_EXCL | (is_hot_index ? DB_IS_HOT_INDEX : 0); - error = file->open(file, txn, table_name, NULL, DB_BTREE, create_flags, my_umask); + create_flags = + DB_THREAD | DB_CREATE | DB_EXCL | (is_hot_index ? DB_IS_HOT_INDEX : 0); + error = + file->open( + file, + txn, + table_name, + NULL, + DB_BTREE, + create_flags, + my_umask); if (error) { - DBUG_PRINT("error", ("Got error: %d when opening table '%s'", error, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when opening table '%s'", error, table_name)); goto exit; } - error = file->change_descriptor(file, txn, row_descriptor, (is_hot_index ? DB_IS_HOT_INDEX | DB_UPDATE_CMP_DESCRIPTOR : DB_UPDATE_CMP_DESCRIPTOR)); + error = + file->change_descriptor( + file, + txn, + row_descriptor, + (is_hot_index ? DB_IS_HOT_INDEX | + DB_UPDATE_CMP_DESCRIPTOR : + DB_UPDATE_CMP_DESCRIPTOR)); if (error) { - DBUG_PRINT("error", ("Got error: %d when setting row descriptor for table '%s'", error, table_name)); + DBUG_PRINT( + "error", + ("Got error: %d when setting row descriptor for table '%s'", + error, + table_name)); goto exit; } @@ -6389,7 +6823,7 @@ static int create_sub_table( exit: if (file) { int r = file->close(file, 0); - assert(r==0); + assert_always(r==0); } TOKUDB_DBUG_RETURN(error); } @@ -6407,7 +6841,8 @@ void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) { // show create table asks us to update this create_info, this makes it // so we'll always show what compression type we're using create_info->row_type = get_row_type(); - if (create_info->row_type == ROW_TYPE_TOKU_ZLIB && THDVAR(ha_thd(), hide_default_row_format) != 0) { + if (create_info->row_type == ROW_TYPE_TOKU_ZLIB && + tokudb::sysvars::hide_default_row_format(ha_thd()) != 0) { create_info->row_type = ROW_TYPE_DEFAULT; } } @@ -6471,27 +6906,43 @@ int ha_tokudb::write_key_name_to_status(DB* status_block, char* key_name, DB_TXN } // -// some tracing moved out of ha_tokudb::create, because ::create was getting cluttered +// some tracing moved out of ha_tokudb::create, because ::create was +// getting cluttered // void ha_tokudb::trace_create_table_info(const char *name, TABLE * form) { uint i; // // tracing information about what type of table we are creating // - if (tokudb_debug & TOKUDB_DEBUG_OPEN) { + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_OPEN))) { for (i = 0; i < form->s->fields; i++) { Field *field = form->s->field[i]; - TOKUDB_HANDLER_TRACE("field:%d:%s:type=%d:flags=%x", i, field->field_name, field->type(), field->flags); + TOKUDB_HANDLER_TRACE( + "field:%d:%s:type=%d:flags=%x", + i, + field->field_name, + field->type(), + field->flags); } for (i = 0; i < form->s->keys; i++) { KEY *key = &form->s->key_info[i]; - TOKUDB_HANDLER_TRACE("key:%d:%s:%d", i, key->name, get_key_parts(key)); + TOKUDB_HANDLER_TRACE( + "key:%d:%s:%d", + i, + key->name, + key->user_defined_key_parts); uint p; - for (p = 0; p < get_key_parts(key); p++) { - KEY_PART_INFO *key_part = &key->key_part[p]; - Field *field = key_part->field; - TOKUDB_HANDLER_TRACE("key:%d:%d:length=%d:%s:type=%d:flags=%x", - i, p, key_part->length, field->field_name, field->type(), field->flags); + for (p = 0; p < key->user_defined_key_parts; p++) { + KEY_PART_INFO* key_part = &key->key_part[p]; + Field* field = key_part->field; + TOKUDB_HANDLER_TRACE( + "key:%d:%d:length=%d:%s:type=%d:flags=%x", + i, + p, + key_part->length, + field->field_name, + field->type(), + field->flags); } } } @@ -6499,9 +6950,12 @@ void ha_tokudb::trace_create_table_info(const char *name, TABLE * form) { static uint32_t get_max_desc_size(KEY_AND_COL_INFO* kc_info, TABLE* form) { uint32_t max_row_desc_buff_size; - max_row_desc_buff_size = 2*(form->s->fields * 6)+10; // upper bound of key comparison descriptor - max_row_desc_buff_size += get_max_secondary_key_pack_desc_size(kc_info); // upper bound for sec. key part - max_row_desc_buff_size += get_max_clustering_val_pack_desc_size(form->s); // upper bound for clustering val part + // upper bound of key comparison descriptor + max_row_desc_buff_size = 2*(form->s->fields * 6)+10; + // upper bound for sec. key part + max_row_desc_buff_size += get_max_secondary_key_pack_desc_size(kc_info); + // upper bound for clustering val part + max_row_desc_buff_size += get_max_clustering_val_pack_desc_size(form->s); return max_row_desc_buff_size; } @@ -6513,9 +6967,8 @@ static uint32_t create_secondary_key_descriptor( TABLE* form, uint primary_key, uint32_t keynr, - KEY_AND_COL_INFO* kc_info - ) -{ + KEY_AND_COL_INFO* kc_info) { + uchar* ptr = NULL; ptr = buf; @@ -6554,23 +7007,25 @@ static uint32_t create_secondary_key_descriptor( // creates dictionary for secondary index, with key description key_info, all using txn // int ha_tokudb::create_secondary_dictionary( - const char* name, TABLE* form, - KEY* key_info, - DB_TXN* txn, - KEY_AND_COL_INFO* kc_info, + const char* name, + TABLE* form, + KEY* key_info, + DB_TXN* txn, + KEY_AND_COL_INFO* kc_info, uint32_t keynr, bool is_hot_index, - toku_compression_method compression_method - ) -{ + toku_compression_method compression_method) { + int error; DBT row_descriptor; uchar* row_desc_buff = NULL; char* newname = NULL; + size_t newname_len = 0; KEY* prim_key = NULL; char dict_name[MAX_DICT_NAME_LEN]; uint32_t max_row_desc_buff_size; - uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0; + uint hpk= (form->s->primary_key >= MAX_KEY) ? + TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0; uint32_t block_size; uint32_t read_block_size; uint32_t fanout; @@ -6580,14 +7035,23 @@ int ha_tokudb::create_secondary_dictionary( max_row_desc_buff_size = get_max_desc_size(kc_info,form); - row_desc_buff = (uchar *)tokudb_my_malloc(max_row_desc_buff_size, MYF(MY_WME)); - if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;} + row_desc_buff = (uchar*)tokudb::memory::malloc( + max_row_desc_buff_size, + MYF(MY_WME)); + if (row_desc_buff == NULL) { + error = ENOMEM; + goto cleanup; + } - newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); - if (newname == NULL){ error = ENOMEM; goto cleanup;} + newname_len = get_max_dict_name_path_length(name); + newname = (char*)tokudb::memory::malloc(newname_len, MYF(MY_WME)); + if (newname == NULL) { + error = ENOMEM; + goto cleanup; + } sprintf(dict_name, "key-%s", key_info->name); - make_name(newname, name, dict_name); + make_name(newname, newname_len, name, dict_name); prim_key = (hpk) ? NULL : &form->s->key_info[primary_key]; @@ -6606,20 +7070,25 @@ int ha_tokudb::create_secondary_dictionary( form, primary_key, keynr, - kc_info - ); - assert(row_descriptor.size <= max_row_desc_buff_size); + kc_info); + assert_always(row_descriptor.size <= max_row_desc_buff_size); - block_size = get_tokudb_block_size(thd); - read_block_size = get_tokudb_read_block_size(thd); - fanout = get_tokudb_fanout(thd); + block_size = tokudb::sysvars::block_size(thd); + read_block_size = tokudb::sysvars::read_block_size(thd); + fanout = tokudb::sysvars::fanout(thd); - error = create_sub_table(newname, &row_descriptor, txn, block_size, - read_block_size, compression_method, is_hot_index, - fanout); + error = create_sub_table( + newname, + &row_descriptor, + txn, + block_size, + read_block_size, + compression_method, + is_hot_index, + fanout); cleanup: - tokudb_my_free(newname); - tokudb_my_free(row_desc_buff); + tokudb::memory::free(newname); + tokudb::memory::free(row_desc_buff); return error; } @@ -6630,21 +7099,17 @@ static uint32_t create_main_key_descriptor( uint hpk, uint primary_key, TABLE* form, - KEY_AND_COL_INFO* kc_info - ) -{ + KEY_AND_COL_INFO* kc_info) { + uchar* ptr = buf; ptr += create_toku_key_descriptor( ptr, hpk, prim_key, false, - NULL - ); + NULL); - ptr += create_toku_main_key_pack_descriptor( - ptr - ); + ptr += create_toku_main_key_pack_descriptor(ptr); ptr += create_toku_clustering_val_pack_descriptor( ptr, @@ -6652,8 +7117,7 @@ static uint32_t create_main_key_descriptor( form->s, kc_info, primary_key, - false - ); + false); return ptr - buf; } @@ -6661,14 +7125,21 @@ static uint32_t create_main_key_descriptor( // create and close the main dictionarr with name of "name" using table form, all within // transaction txn. // -int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, toku_compression_method compression_method) { +int ha_tokudb::create_main_dictionary( + const char* name, + TABLE* form, + DB_TXN* txn, + KEY_AND_COL_INFO* kc_info, + toku_compression_method compression_method) { + int error; DBT row_descriptor; uchar* row_desc_buff = NULL; char* newname = NULL; + size_t newname_len = 0; KEY* prim_key = NULL; uint32_t max_row_desc_buff_size; - uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0; + uint hpk = (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0; uint32_t block_size; uint32_t read_block_size; uint32_t fanout; @@ -6677,13 +7148,22 @@ int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn memset(&row_descriptor, 0, sizeof(row_descriptor)); max_row_desc_buff_size = get_max_desc_size(kc_info, form); - row_desc_buff = (uchar *)tokudb_my_malloc(max_row_desc_buff_size, MYF(MY_WME)); - if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;} + row_desc_buff = (uchar*)tokudb::memory::malloc( + max_row_desc_buff_size, + MYF(MY_WME)); + if (row_desc_buff == NULL) { + error = ENOMEM; + goto cleanup; + } - newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); - if (newname == NULL){ error = ENOMEM; goto cleanup;} + newname_len = get_max_dict_name_path_length(name); + newname = (char*)tokudb::memory::malloc(newname_len, MYF(MY_WME)); + if (newname == NULL) { + error = ENOMEM; + goto cleanup; + } - make_name(newname, name, "main"); + make_name(newname, newname_len, name, "main"); prim_key = (hpk) ? NULL : &form->s->key_info[primary_key]; @@ -6700,21 +7180,26 @@ int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn hpk, primary_key, form, - kc_info - ); - assert(row_descriptor.size <= max_row_desc_buff_size); + kc_info); + assert_always(row_descriptor.size <= max_row_desc_buff_size); - block_size = get_tokudb_block_size(thd); - read_block_size = get_tokudb_read_block_size(thd); - fanout = get_tokudb_fanout(thd); + block_size = tokudb::sysvars::block_size(thd); + read_block_size = tokudb::sysvars::read_block_size(thd); + fanout = tokudb::sysvars::fanout(thd); /* Create the main table that will hold the real rows */ - error = create_sub_table(newname, &row_descriptor, txn, block_size, - read_block_size, compression_method, false, - fanout); + error = create_sub_table( + newname, + &row_descriptor, + txn, + block_size, + read_block_size, + compression_method, + false, + fanout); cleanup: - tokudb_my_free(newname); - tokudb_my_free(row_desc_buff); + tokudb::memory::free(newname); + tokudb::memory::free(row_desc_buff); return error; } @@ -6728,7 +7213,11 @@ cleanup: // 0 on success // error otherwise // -int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) { +int ha_tokudb::create( + const char* name, + TABLE* form, + HA_CREATE_INFO* create_info) { + TOKUDB_HANDLER_DBUG_ENTER("%s", name); int error; @@ -6738,6 +7227,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in DB_TXN* txn = NULL; bool do_commit = false; char* newname = NULL; + size_t newname_len = 0; KEY_AND_COL_INFO kc_info; tokudb_trx_data *trx = NULL; THD* thd = ha_thd(); @@ -6753,15 +7243,18 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in #endif #if TOKU_INCLUDE_OPTION_STRUCTS - const srv_row_format_t row_format = (srv_row_format_t) form->s->option_struct->row_format; + const tokudb::sysvars::row_format_t row_format = + (tokudb::sysvars::row_format_t)form->s->option_struct->row_format; #else - const srv_row_format_t row_format = (create_info->used_fields & HA_CREATE_USED_ROW_FORMAT) + const tokudb::sysvars::row_format_t row_format = + (create_info->used_fields & HA_CREATE_USED_ROW_FORMAT) ? row_type_to_row_format(create_info->row_type) - : get_row_format(thd); + : tokudb::sysvars::row_format(thd); #endif - const toku_compression_method compression_method = row_format_to_toku_compression_method(row_format); + const toku_compression_method compression_method = + row_format_to_toku_compression_method(row_format); - bool create_from_engine= (create_info->table_options & HA_OPTION_CREATE_FROM_ENGINE); + bool create_from_engine = (create_info->table_options & HA_OPTION_CREATE_FROM_ENGINE); if (create_from_engine) { // table already exists, nothing to do error = 0; @@ -6786,17 +7279,23 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in } } - newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); - if (newname == NULL){ error = ENOMEM; goto cleanup;} + newname_len = get_max_dict_name_path_length(name); + newname = (char*)tokudb::memory::malloc(newname_len, MYF(MY_WME)); + if (newname == NULL) { + error = ENOMEM; + goto cleanup; + } trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); - if (trx && trx->sub_sp_level && thd_sql_command(thd) == SQLCOM_CREATE_TABLE) { + if (trx && trx->sub_sp_level && + thd_sql_command(thd) == SQLCOM_CREATE_TABLE) { txn = trx->sub_sp_level; - } - else { + } else { do_commit = true; error = txn_begin(db_env, 0, &txn, 0, thd); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } } primary_key = form->s->primary_key; @@ -6809,45 +7308,76 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in trace_create_table_info(name,form); /* Create status.tokudb and save relevant metadata */ - make_name(newname, name, "status"); + make_name(newname, newname_len, name, "status"); - error = tokudb::create_status(db_env, &status_block, newname, txn); + error = tokudb::metadata::create(db_env, &status_block, newname, txn); if (error) { goto cleanup; } version = HA_TOKU_VERSION; - error = write_to_status(status_block, hatoku_new_version,&version,sizeof(version), txn); - if (error) { goto cleanup; } + error = write_to_status( + status_block, + hatoku_new_version, + &version, + sizeof(version), + txn); + if (error) { + goto cleanup; + } capabilities = HA_TOKU_CAP; - error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities), txn); - if (error) { goto cleanup; } + error = write_to_status( + status_block, + hatoku_capabilities, + &capabilities, + sizeof(capabilities), + txn); + if (error) { + goto cleanup; + } - error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn); - if (error) { goto cleanup; } + error = write_auto_inc_create( + status_block, + create_info->auto_increment_value, + txn); + if (error) { + goto cleanup; + } #if WITH_PARTITION_STORAGE_ENGINE if (TOKU_PARTITION_WRITE_FRM_DATA || form->part_info == NULL) { error = write_frm_data(status_block, txn, form->s->path.str); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } } #else error = write_frm_data(status_block, txn, form->s->path.str); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } #endif error = allocate_key_and_col_info(form->s, &kc_info); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } error = initialize_key_and_col_info( - form->s, + form->s, form, &kc_info, hidden_primary_key, - primary_key - ); - if (error) { goto cleanup; } + primary_key); + if (error) { + goto cleanup; + } - error = create_main_dictionary(name, form, txn, &kc_info, compression_method); + error = create_main_dictionary( + name, + form, + txn, + &kc_info, + compression_method); if (error) { goto cleanup; } @@ -6855,32 +7385,44 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in for (uint i = 0; i < form->s->keys; i++) { if (i != primary_key) { - error = create_secondary_dictionary(name, form, &form->key_info[i], txn, &kc_info, i, false, compression_method); + error = create_secondary_dictionary( + name, + form, + &form->key_info[i], + txn, + &kc_info, + i, + false, + compression_method); if (error) { goto cleanup; } - error = write_key_name_to_status(status_block, form->s->key_info[i].name, txn); - if (error) { goto cleanup; } + error = write_key_name_to_status( + status_block, + form->s->key_info[i].name, + txn); + if (error) { + goto cleanup; + } } } error = 0; cleanup: if (status_block != NULL) { - int r = tokudb::close_status(&status_block); - assert(r==0); + int r = tokudb::metadata::close(&status_block); + assert_always(r==0); } free_key_and_col_info(&kc_info); if (do_commit && txn) { if (error) { abort_txn(txn); - } - else { + } else { commit_txn(txn,0); } } - tokudb_my_free(newname); + tokudb::memory::free(newname); TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -6903,27 +7445,36 @@ int ha_tokudb::discard_or_import_tablespace(my_bool discard) { // is_key specifies if it is a secondary index (and hence a "key-" needs to be prepended) or // if it is not a secondary index // -int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* secondary_name, bool is_key, DB_TXN* txn, bool is_delete) { +int ha_tokudb::delete_or_rename_dictionary( + const char* from_name, + const char* to_name, + const char* secondary_name, + bool is_key, + DB_TXN* txn, + bool is_delete) { + int error; char dict_name[MAX_DICT_NAME_LEN]; char* new_from_name = NULL; + size_t new_from_name_len = 0; char* new_to_name = NULL; - assert(txn); + size_t new_to_name_len = 0; + assert_always(txn); - new_from_name = (char *)tokudb_my_malloc( - get_max_dict_name_path_length(from_name), - MYF(MY_WME) - ); + new_from_name_len = get_max_dict_name_path_length(from_name); + new_from_name = (char*)tokudb::memory::malloc( + new_from_name_len, + MYF(MY_WME)); if (new_from_name == NULL) { error = ENOMEM; goto cleanup; } if (!is_delete) { - assert(to_name); - new_to_name = (char *)tokudb_my_malloc( - get_max_dict_name_path_length(to_name), - MYF(MY_WME) - ); + assert_always(to_name); + new_to_name_len = get_max_dict_name_path_length(to_name); + new_to_name = (char*)tokudb::memory::malloc( + new_to_name_len, + MYF(MY_WME)); if (new_to_name == NULL) { error = ENOMEM; goto cleanup; @@ -6932,32 +7483,37 @@ int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* t if (is_key) { sprintf(dict_name, "key-%s", secondary_name); - make_name(new_from_name, from_name, dict_name); - } - else { - make_name(new_from_name, from_name, secondary_name); + make_name(new_from_name, new_from_name_len, from_name, dict_name); + } else { + make_name(new_from_name, new_from_name_len, from_name, secondary_name); } if (!is_delete) { if (is_key) { sprintf(dict_name, "key-%s", secondary_name); - make_name(new_to_name, to_name, dict_name); - } - else { - make_name(new_to_name, to_name, secondary_name); + make_name(new_to_name, new_to_name_len, to_name, dict_name); + } else { + make_name(new_to_name, new_to_name_len, to_name, secondary_name); } } if (is_delete) { error = db_env->dbremove(db_env, txn, new_from_name, NULL, 0); + } else { + error = db_env->dbrename( + db_env, + txn, + new_from_name, + NULL, + new_to_name, + 0); } - else { - error = db_env->dbrename(db_env, txn, new_from_name, NULL, new_to_name, 0); + if (error) { + goto cleanup; } - if (error) { goto cleanup; } cleanup: - tokudb_my_free(new_from_name); - tokudb_my_free(new_to_name); + tokudb::memory::free(new_from_name); + tokudb::memory::free(new_to_name); return error; } @@ -7023,12 +7579,12 @@ int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_nam if (error) { goto cleanup; } error = status_cursor->c_close(status_cursor); - assert(error==0); + assert_always(error==0); status_cursor = NULL; if (error) { goto cleanup; } error = status_db->close(status_db, 0); - assert(error == 0); + assert_always(error == 0); status_db = NULL; // @@ -7041,11 +7597,11 @@ int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_nam cleanup: if (status_cursor) { int r = status_cursor->c_close(status_cursor); - assert(r==0); + assert_always(r==0); } if (status_db) { int r = status_db->close(status_db, 0); - assert(r==0); + assert_always(r==0); } if (txn) { if (error) { @@ -7069,12 +7625,25 @@ cleanup: // int ha_tokudb::delete_table(const char *name) { TOKUDB_HANDLER_DBUG_ENTER("%s", name); + TOKUDB_SHARE* share = TOKUDB_SHARE::get_share(name, NULL, NULL, false); + if (share) { + share->unlock(); + share->release(); + // this should be enough to handle locking as the higher level MDL + // on this table should prevent any new analyze tasks. + share->cancel_background_jobs(); + TOKUDB_SHARE::drop_share(share); + } + int error; error = delete_or_rename_table(name, NULL, true); - if (error == DB_LOCK_NOTGRANTED && ((tokudb_debug & TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0)) { - sql_print_error("Could not delete table %s because \ -another transaction has accessed the table. \ -To drop the table, make sure no transactions touch the table.", name); + if (TOKUDB_LIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0) && + error == DB_LOCK_NOTGRANTED) { + sql_print_error( + "Could not delete table %s because another transaction has " + "accessed the table. To drop the table, make sure no " + "transactions touch the table.", + name); } TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -7091,12 +7660,25 @@ To drop the table, make sure no transactions touch the table.", name); // int ha_tokudb::rename_table(const char *from, const char *to) { TOKUDB_HANDLER_DBUG_ENTER("%s %s", from, to); + TOKUDB_SHARE* share = TOKUDB_SHARE::get_share(from, NULL, NULL, false); + if (share) { + share->unlock(); + share->release(); + // this should be enough to handle locking as the higher level MDL + // on this table should prevent any new analyze tasks. + share->cancel_background_jobs(); + TOKUDB_SHARE::drop_share(share); + } int error; error = delete_or_rename_table(from, to, false); - if (error == DB_LOCK_NOTGRANTED && ((tokudb_debug & TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0)) { - sql_print_error("Could not rename table from %s to %s because \ -another transaction has accessed the table. \ -To rename the table, make sure no transactions touch the table.", from, to); + if (TOKUDB_LIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0) && + error == DB_LOCK_NOTGRANTED) { + sql_print_error( + "Could not rename table from %s to %s because another transaction " + "has accessed the table. To rename the table, make sure no " + "transactions touch the table.", + from, + to); } TOKUDB_HANDLER_DBUG_RETURN(error); } @@ -7111,9 +7693,11 @@ To rename the table, make sure no transactions touch the table.", from, to); double ha_tokudb::scan_time() { TOKUDB_HANDLER_DBUG_ENTER(""); double ret_val = (double)stats.records / 3; - if (tokudb_debug & TOKUDB_DEBUG_RETURN) { - TOKUDB_HANDLER_TRACE("return %" PRIu64 " %f", (uint64_t) stats.records, ret_val); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_RETURN, + "return %" PRIu64 " %f", + (uint64_t)stats.records, + ret_val); DBUG_RETURN(ret_val); } @@ -7137,10 +7721,7 @@ double ha_tokudb::keyread_time(uint index, uint ranges, ha_rows rows) (table->key_info[index].key_length + ref_length) + 1); ret_val = (rows + keys_per_block - 1)/ keys_per_block; - if (tokudb_debug & TOKUDB_DEBUG_RETURN) { - TOKUDB_HANDLER_TRACE("return %f", ret_val); - } - DBUG_RETURN(ret_val); + TOKUDB_HANDLER_DBUG_RETURN_DOUBLE(ret_val); } // @@ -7202,19 +7783,13 @@ double ha_tokudb::read_time( ret_val = is_clustering ? ret_val + 0.00001 : ret_val; cleanup: - if (tokudb_debug & TOKUDB_DEBUG_RETURN) { - TOKUDB_HANDLER_TRACE("return %f", ret_val); - } - DBUG_RETURN(ret_val); + TOKUDB_HANDLER_DBUG_RETURN_DOUBLE(ret_val); } double ha_tokudb::index_only_read_time(uint keynr, double records) { TOKUDB_HANDLER_DBUG_ENTER("%u %f", keynr, records); double ret_val = keyread_time(keynr, 1, (ha_rows)records); - if (tokudb_debug & TOKUDB_DEBUG_RETURN) { - TOKUDB_HANDLER_TRACE("return %f", ret_val); - } - DBUG_RETURN(ret_val); + TOKUDB_HANDLER_DBUG_RETURN_DOUBLE(ret_val); } // @@ -7287,9 +7862,11 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ret_val = (ha_rows) (rows <= 1 ? 1 : rows); cleanup: - if (tokudb_debug & TOKUDB_DEBUG_RETURN) { - TOKUDB_HANDLER_TRACE("return %" PRIu64 " %" PRIu64, (uint64_t) ret_val, rows); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_RETURN, + "return %" PRIu64 " %" PRIu64, + (uint64_t)ret_val, + rows); DBUG_RETURN(ret_val); } @@ -7345,12 +7922,19 @@ void ha_tokudb::init_auto_increment() { commit_txn(txn, 0); } - if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) { - TOKUDB_HANDLER_TRACE("init auto increment:%lld", share->last_auto_increment); - } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_AUTO_INCREMENT, + "init auto increment:%lld", + share->last_auto_increment); } -void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values) { +void ha_tokudb::get_auto_increment( + ulonglong offset, + ulonglong increment, + ulonglong nb_desired_values, + ulonglong* first_value, + ulonglong* nb_reserved_values) { + TOKUDB_HANDLER_DBUG_ENTER(""); ulonglong nr; bool over; @@ -7361,14 +7945,13 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl DBUG_VOID_RETURN; } - tokudb_pthread_mutex_lock(&share->mutex); + share->lock(); if (share->auto_inc_create_value > share->last_auto_increment) { nr = share->auto_inc_create_value; over = false; share->last_auto_increment = share->auto_inc_create_value; - } - else { + } else { nr = share->last_auto_increment + increment; over = nr < share->last_auto_increment; if (over) @@ -7378,19 +7961,23 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl share->last_auto_increment = nr + (nb_desired_values - 1)*increment; if (delay_updating_ai_metadata) { ai_metadata_update_required = true; - } - else { - update_max_auto_inc(share->status_block, share->last_auto_increment); - } - } - - if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) { - TOKUDB_HANDLER_TRACE("get_auto_increment(%lld,%lld,%lld):got:%lld:%lld", - offset, increment, nb_desired_values, nr, nb_desired_values); - } + } else { + update_max_auto_inc( + share->status_block, + share->last_auto_increment); + } + } + TOKUDB_HANDLER_TRACE_FOR_FLAGS( + TOKUDB_DEBUG_AUTO_INCREMENT, + "get_auto_increment(%lld,%lld,%lld): got:%lld:%lld", + offset, + increment, + nb_desired_values, + nr, + nb_desired_values); *first_value = nr; *nb_reserved_values = nb_desired_values; - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); TOKUDB_HANDLER_DBUG_VOID_RETURN; } @@ -7419,16 +8006,15 @@ bool ha_tokudb::is_auto_inc_singleton(){ // 0 on success, error otherwise // int ha_tokudb::tokudb_add_index( - TABLE *table_arg, - KEY *key_info, - uint num_of_keys, - DB_TXN* txn, + TABLE* table_arg, + KEY* key_info, + uint num_of_keys, + DB_TXN* txn, bool* inc_num_DBs, - bool* modified_DBs - ) -{ + bool* modified_DBs) { + TOKUDB_HANDLER_DBUG_ENTER(""); - assert(txn); + assert_always(txn); int error; uint curr_index = 0; @@ -7438,7 +8024,7 @@ int ha_tokudb::tokudb_add_index( THD* thd = ha_thd(); DB_LOADER* loader = NULL; DB_INDEXER* indexer = NULL; - bool loader_save_space = get_load_save_space(thd); + bool loader_save_space = tokudb::sysvars::load_save_space(thd); bool use_hot_index = (lock.type == TL_WRITE_ALLOW_WRITE); uint32_t loader_flags = loader_save_space ? LOADER_COMPRESS_INTERMEDIATES : 0; uint32_t indexer_flags = 0; @@ -7468,14 +8054,17 @@ int ha_tokudb::tokudb_add_index( // // get the row type to use for the indexes we're adding // - toku_compression_method compression_method = get_compression_method(share->file); + toku_compression_method compression_method = + get_compression_method(share->file); // // status message to be shown in "show process list" // const char *orig_proc_info = tokudb_thd_get_proc_info(thd); - char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. - ulonglong num_processed = 0; //variable that stores number of elements inserted thus far + // buffer of 200 should be a good upper bound. + char status_msg[MAX_ALIAS_NAME + 200]; + // variable that stores number of elements inserted thus far + ulonglong num_processed = 0; thd_proc_info(thd, "Adding indexes"); // @@ -7500,13 +8089,15 @@ int ha_tokudb::tokudb_add_index( } } - rw_wrlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_write(); rw_lock_taken = true; // // open all the DB files and set the appropriate variables in share // they go to the end of share->key_file // - creating_hot_index = use_hot_index && num_of_keys == 1 && (key_info[0].flags & HA_NOSAME) == 0; + creating_hot_index = + use_hot_index && num_of_keys == 1 && + (key_info[0].flags & HA_NOSAME) == 0; if (use_hot_index && (share->num_DBs > curr_num_DBs)) { // // already have hot index in progress, get out @@ -7522,35 +8113,47 @@ int ha_tokudb::tokudb_add_index( &share->kc_info.key_filters[curr_index], &key_info[i], table_arg, - false - ); + false); if (!hidden_primary_key) { set_key_filter( &share->kc_info.key_filters[curr_index], &table_arg->key_info[primary_key], table_arg, - false - ); + false); } - error = initialize_col_pack_info(&share->kc_info,table_arg->s,curr_index); + error = initialize_col_pack_info( + &share->kc_info, + table_arg->s, + curr_index); if (error) { goto cleanup; } } - error = create_secondary_dictionary(share->table_name, table_arg, &key_info[i], txn, &share->kc_info, curr_index, creating_hot_index, compression_method); - if (error) { goto cleanup; } + error = create_secondary_dictionary( + share->full_table_name(), + table_arg, + &key_info[i], + txn, + &share->kc_info, + curr_index, + creating_hot_index, + compression_method); + if (error) { + goto cleanup; + } error = open_secondary_dictionary( - &share->key_file[curr_index], + &share->key_file[curr_index], &key_info[i], - share->table_name, + share->full_table_name(), false, - txn - ); - if (error) { goto cleanup; } + txn); + if (error) { + goto cleanup; + } } if (creating_hot_index) { @@ -7564,17 +8167,22 @@ int ha_tokudb::tokudb_add_index( num_of_keys, &share->key_file[curr_num_DBs], mult_db_flags, - indexer_flags - ); - if (error) { goto cleanup; } + indexer_flags); + if (error) { + goto cleanup; + } error = indexer->set_poll_function(indexer, ai_poll_fun, &lc); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } error = indexer->set_error_callback(indexer, loader_ai_err_fun, &lc); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); rw_lock_taken = false; #ifdef HA_TOKUDB_HAS_THD_PROGRESS @@ -7585,17 +8193,20 @@ int ha_tokudb::tokudb_add_index( error = indexer->build(indexer); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } - rw_wrlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_write(); error = indexer->close(indexer); - rw_unlock(&share->num_DBs_lock); - if (error) { goto cleanup; } + share->_num_DBs_lock.unlock(); + if (error) { + goto cleanup; + } indexer = NULL; - } - else { + } else { DBUG_ASSERT(table->mdl_ticket->get_type() >= MDL_SHARED_NO_WRITE); - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); rw_lock_taken = false; prelocked_right_range_size = 0; prelocked_left_range_size = 0; @@ -7608,27 +8219,37 @@ int ha_tokudb::tokudb_add_index( bf_info.key_to_compare = NULL; error = db_env->create_loader( - db_env, - txn, - &loader, + db_env, + txn, + &loader, NULL, // no src_db needed - num_of_keys, - &share->key_file[curr_num_DBs], + num_of_keys, + &share->key_file[curr_num_DBs], mult_put_flags, mult_dbt_flags, - loader_flags - ); - if (error) { goto cleanup; } + loader_flags); + if (error) { + goto cleanup; + } error = loader->set_poll_function(loader, loader_poll_fun, &lc); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } error = loader->set_error_callback(loader, loader_ai_err_fun, &lc); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } // // scan primary table, create each secondary key, add to each DB // - if ((error = share->file->cursor(share->file, txn, &tmp_cursor, DB_SERIALIZABLE))) { + error = share->file->cursor( + share->file, + txn, + &tmp_cursor, + DB_SERIALIZABLE); + if (error) { tmp_cursor = NULL; // Safety goto cleanup; } @@ -7643,16 +8264,21 @@ int ha_tokudb::tokudb_add_index( share->file->dbt_neg_infty(), share->file->dbt_pos_infty(), true, - 0 - ); - if (error) { goto cleanup; } + 0); + if (error) { + goto cleanup; + } // set the bulk fetch iteration to its max so that adding an // index fills the bulk fetch buffer every time. we do not // want it to grow exponentially fast. rows_fetched_using_bulk_fetch = 0; bulk_fetch_iteration = HA_TOKU_BULK_FETCH_ITERATION_MAX; - cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED,smart_dbt_bf_callback, &bf_info); + cursor_ret_val = tmp_cursor->c_getf_next( + tmp_cursor, + DB_PRELOCKED, + smart_dbt_bf_callback, + &bf_info); #ifdef HA_TOKUDB_HAS_THD_PROGRESS // initialize a two phase progress report. @@ -7660,21 +8286,30 @@ int ha_tokudb::tokudb_add_index( thd_progress_init(thd, 2); #endif - while (cursor_ret_val != DB_NOTFOUND || ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0)) { - if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) == 0) { + while (cursor_ret_val != DB_NOTFOUND || + ((bytes_used_in_range_query_buff - + curr_range_query_buff_offset) > 0)) { + if ((bytes_used_in_range_query_buff - + curr_range_query_buff_offset) == 0) { invalidate_bulk_fetch(); // reset the buffers - cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_bf_callback, &bf_info); + cursor_ret_val = tmp_cursor->c_getf_next( + tmp_cursor, + DB_PRELOCKED, + smart_dbt_bf_callback, + &bf_info); if (cursor_ret_val != DB_NOTFOUND && cursor_ret_val != 0) { error = cursor_ret_val; goto cleanup; } } - // do this check in case the the c_getf_next did not put anything into the buffer because - // there was no more data - if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) == 0) { + // do this check in case the the c_getf_next did not put anything + // into the buffer because there was no more data + if ((bytes_used_in_range_query_buff - + curr_range_query_buff_offset) == 0) { break; } - // at this point, we know the range query buffer has at least one key/val pair + // at this point, we know the range query buffer has at least one + // key/val pair uchar* curr_pos = range_query_buff+curr_range_query_buff_offset; uint32_t key_size = *(uint32_t *)curr_pos; @@ -7694,17 +8329,26 @@ int ha_tokudb::tokudb_add_index( curr_range_query_buff_offset = curr_pos - range_query_buff; error = loader->put(loader, &curr_pk_key, &curr_pk_val); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } num_processed++; if ((num_processed % 1000) == 0) { - sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", - num_processed, (long long unsigned) share->rows); + sprintf( + status_msg, + "Adding indexes: Fetched %llu of about %llu rows, loading " + "of data still remains.", + num_processed, + (long long unsigned)share->row_count()); thd_proc_info(thd, status_msg); #ifdef HA_TOKUDB_HAS_THD_PROGRESS - thd_progress_report(thd, num_processed, (long long unsigned) share->rows); + thd_progress_report( + thd, + num_processed, + (long long unsigned)share->row_count()); #endif if (thd_killed(thd)) { @@ -7714,7 +8358,7 @@ int ha_tokudb::tokudb_add_index( } } error = tmp_cursor->c_close(tmp_cursor); - assert(error==0); + assert_always(error==0); tmp_cursor = NULL; #ifdef HA_TOKUDB_HAS_THD_PROGRESS @@ -7732,9 +8376,14 @@ int ha_tokudb::tokudb_add_index( for (uint i = 0; i < num_of_keys; i++, curr_index++) { if (key_info[i].flags & HA_NOSAME) { bool is_unique; - error = is_index_unique(&is_unique, txn, share->key_file[curr_index], &key_info[i], - creating_hot_index ? 0 : DB_PRELOCKED_WRITE); - if (error) goto cleanup; + error = is_index_unique( + &is_unique, + txn, + share->key_file[curr_index], + &key_info[i], + creating_hot_index ? 0 : DB_PRELOCKED_WRITE); + if (error) + goto cleanup; if (!is_unique) { error = HA_ERR_FOUND_DUPP_KEY; last_dup_key = i; @@ -7743,22 +8392,20 @@ int ha_tokudb::tokudb_add_index( } } + share->lock(); // // We have an accurate row count, might as well update share->rows // if(!creating_hot_index) { - tokudb_pthread_mutex_lock(&share->mutex); - share->rows = num_processed; - tokudb_pthread_mutex_unlock(&share->mutex); + share->set_row_count(num_processed, true); } // // now write stuff to status.tokudb // - tokudb_pthread_mutex_lock(&share->mutex); for (uint i = 0; i < num_of_keys; i++) { write_key_name_to_status(share->status_block, key_info[i].name, txn); } - tokudb_pthread_mutex_unlock(&share->mutex); + share->unlock(); error = 0; cleanup: @@ -7766,12 +8413,12 @@ cleanup: thd_progress_end(thd); #endif if (rw_lock_taken) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); rw_lock_taken = false; } if (tmp_cursor) { int r = tmp_cursor->c_close(tmp_cursor); - assert(r==0); + assert_always(r==0); tmp_cursor = NULL; } if (loader != NULL) { @@ -7782,14 +8429,17 @@ cleanup: if (indexer != NULL) { sprintf(status_msg, "aborting creation of indexes."); thd_proc_info(thd, status_msg); - rw_wrlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_write(); indexer->abort(indexer); - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } - if (error == DB_LOCK_NOTGRANTED && ((tokudb_debug & TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0)) { - sql_print_error("Could not add indexes to table %s because \ -another transaction has accessed the table. \ -To add indexes, make sure no transactions touch the table.", share->table_name); + if (TOKUDB_LIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0) && + error == DB_LOCK_NOTGRANTED) { + sql_print_error( + "Could not add indexes to table %s because another transaction has " + "accessed the table. To add indexes, make sure no transactions " + "touch the table.", + share->full_table_name()); } thd_proc_info(thd, orig_proc_info); TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error); @@ -7799,7 +8449,12 @@ To add indexes, make sure no transactions touch the table.", share->table_name); // Internal function called by ha_tokudb::add_index and ha_tokudb::alter_table_phase2 // Closes added indexes in case of error in error path of add_index and alter_table_phase2 // -void ha_tokudb::restore_add_index(TABLE* table_arg, uint num_of_keys, bool incremented_numDBs, bool modified_DBs) { +void ha_tokudb::restore_add_index( + TABLE* table_arg, + uint num_of_keys, + bool incremented_numDBs, + bool modified_DBs) { + uint curr_num_DBs = table_arg->s->keys + tokudb_test(hidden_primary_key); uint curr_index = 0; @@ -7808,7 +8463,7 @@ void ha_tokudb::restore_add_index(TABLE* table_arg, uint num_of_keys, bool incre // so that there is not a window // if (incremented_numDBs) { - rw_wrlock(&share->num_DBs_lock); + share->_num_DBs_lock.lock_write(); share->num_DBs--; } if (modified_DBs) { @@ -7821,15 +8476,14 @@ void ha_tokudb::restore_add_index(TABLE* table_arg, uint num_of_keys, bool incre if (share->key_file[curr_index]) { int r = share->key_file[curr_index]->close( share->key_file[curr_index], - 0 - ); - assert(r==0); + 0); + assert_always(r==0); share->key_file[curr_index] = NULL; } } } if (incremented_numDBs) { - rw_unlock(&share->num_DBs_lock); + share->_num_DBs_lock.unlock(); } } @@ -7837,14 +8491,22 @@ void ha_tokudb::restore_add_index(TABLE* table_arg, uint num_of_keys, bool incre // Internal function called by ha_tokudb::prepare_drop_index and ha_tokudb::alter_table_phase2 // With a transaction, drops dictionaries associated with indexes in key_num // -int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, KEY *key_info, DB_TXN* txn) { +int ha_tokudb::drop_indexes( + TABLE* table_arg, + uint* key_num, + uint num_of_keys, + KEY* key_info, + DB_TXN* txn) { + TOKUDB_HANDLER_DBUG_ENTER(""); - assert(txn); + assert_always(txn); int error = 0; for (uint i = 0; i < num_of_keys; i++) { uint curr_index = key_num[i]; - error = share->key_file[curr_index]->pre_acquire_fileops_lock(share->key_file[curr_index],txn); + error = share->key_file[curr_index]->pre_acquire_fileops_lock( + share->key_file[curr_index], + txn); if (error != 0) { goto cleanup; } @@ -7852,30 +8514,51 @@ int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, K for (uint i = 0; i < num_of_keys; i++) { uint curr_index = key_num[i]; int r = share->key_file[curr_index]->close(share->key_file[curr_index],0); - assert(r==0); + assert_always(r==0); share->key_file[curr_index] = NULL; - error = remove_key_name_from_status(share->status_block, key_info[curr_index].name, txn); - if (error) { goto cleanup; } + error = remove_key_name_from_status( + share->status_block, + key_info[curr_index].name, + txn); + if (error) { + goto cleanup; + } - error = delete_or_rename_dictionary(share->table_name, NULL, key_info[curr_index].name, true, txn, true); - if (error) { goto cleanup; } + error = delete_or_rename_dictionary( + share->full_table_name(), + NULL, + key_info[curr_index].name, + true, + txn, + true); + if (error) { + goto cleanup; + } } cleanup: - if (error == DB_LOCK_NOTGRANTED && ((tokudb_debug & TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0)) { - sql_print_error("Could not drop indexes from table %s because \ -another transaction has accessed the table. \ -To drop indexes, make sure no transactions touch the table.", share->table_name); + if (TOKUDB_LIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0) && + error == DB_LOCK_NOTGRANTED) { + sql_print_error( + "Could not drop indexes from table %s because another transaction " + "has accessed the table. To drop indexes, make sure no " + "transactions touch the table.", + share->full_table_name()); } TOKUDB_HANDLER_DBUG_RETURN(error); } // -// Internal function called by ha_tokudb::prepare_drop_index and ha_tokudb::alter_table_phase2 -// Restores dropped indexes in case of error in error path of prepare_drop_index and alter_table_phase2 +// Internal function called by ha_tokudb::prepare_drop_index and +// ha_tokudb::alter_table_phase2 +// Restores dropped indexes in case of error in error path of +// prepare_drop_index and alter_table_phase2 // -void ha_tokudb::restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys) { +void ha_tokudb::restore_drop_indexes( + TABLE* table_arg, + uint* key_num, + uint num_of_keys) { // // reopen closed dictionaries @@ -7885,13 +8568,12 @@ void ha_tokudb::restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_o uint curr_index = key_num[i]; if (share->key_file[curr_index] == NULL) { r = open_secondary_dictionary( - &share->key_file[curr_index], + &share->key_file[curr_index], &table_share->key_info[curr_index], - share->table_name, - false, // - NULL - ); - assert(!r); + share->full_table_name(), + false, + NULL); + assert_always(!r); } } } @@ -7937,56 +8619,65 @@ void ha_tokudb::print_error(int error, myf errflag) { // does so by deleting and then recreating the dictionary in the context // of a transaction // -int ha_tokudb::truncate_dictionary( uint keynr, DB_TXN* txn ) { +int ha_tokudb::truncate_dictionary(uint keynr, DB_TXN* txn) { int error; bool is_pk = (keynr == primary_key); - toku_compression_method compression_method = get_compression_method(share->key_file[keynr]); + toku_compression_method compression_method = + get_compression_method(share->key_file[keynr]); error = share->key_file[keynr]->close(share->key_file[keynr], 0); - assert(error == 0); + assert_always(error == 0); share->key_file[keynr] = NULL; - if (is_pk) { share->file = NULL; } + if (is_pk) { + share->file = NULL; + } if (is_pk) { error = delete_or_rename_dictionary( - share->table_name, + share->full_table_name(), NULL, - "main", + "main", false, //is_key txn, - true // is a delete - ); - if (error) { goto cleanup; } - } - else { + true); // is a delete + if (error) { + goto cleanup; + } + } else { error = delete_or_rename_dictionary( - share->table_name, + share->full_table_name(), NULL, - table_share->key_info[keynr].name, + table_share->key_info[keynr].name, true, //is_key txn, - true // is a delete - ); - if (error) { goto cleanup; } + true); // is a delete + if (error) { + goto cleanup; + } } if (is_pk) { - error = create_main_dictionary(share->table_name, table, txn, &share->kc_info, compression_method); - } - else { + error = create_main_dictionary( + share->full_table_name(), + table, + txn, + &share->kc_info, + compression_method); + } else { error = create_secondary_dictionary( - share->table_name, - table, - &table_share->key_info[keynr], + share->full_table_name(), + table, + &table_share->key_info[keynr], txn, &share->kc_info, keynr, false, - compression_method - ); + compression_method); + } + if (error) { + goto cleanup; } - if (error) { goto cleanup; } cleanup: return error; @@ -8025,40 +8716,51 @@ int ha_tokudb::delete_all_rows_internal() { uint curr_num_DBs = 0; DB_TXN* txn = NULL; + // this should be enough to handle locking as the higher level MDL + // on this table should prevent any new analyze tasks. + share->cancel_background_jobs(); + error = txn_begin(db_env, 0, &txn, 0, ha_thd()); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); for (uint i = 0; i < curr_num_DBs; i++) { error = share->key_file[i]->pre_acquire_fileops_lock( - share->key_file[i], - txn - ); - if (error) { goto cleanup; } + share->key_file[i], + txn); + if (error) { + goto cleanup; + } error = share->key_file[i]->pre_acquire_table_lock( - share->key_file[i], - txn - ); - if (error) { goto cleanup; } + share->key_file[i], + txn); + if (error) { + goto cleanup; + } } for (uint i = 0; i < curr_num_DBs; i++) { error = truncate_dictionary(i, txn); - if (error) { goto cleanup; } + if (error) { + goto cleanup; + } } + DEBUG_SYNC(ha_thd(), "tokudb_after_truncate_all_dictionarys"); + // zap the row count if (error == 0) { - share->rows = 0; - // update auto increment - share->last_auto_increment = 0; - // calling write_to_status directly because we need to use txn - write_to_status( - share->status_block, + share->set_row_count(0, false); + // update auto increment + share->last_auto_increment = 0; + // calling write_to_status directly because we need to use txn + write_to_status( + share->status_block, hatoku_max_ai, - &share->last_auto_increment, - sizeof(share->last_auto_increment), - txn - ); + &share->last_auto_increment, + sizeof(share->last_auto_increment), + txn); } share->try_table_lock = true; @@ -8066,16 +8768,19 @@ cleanup: if (txn) { if (error) { abort_txn(txn); - } - else { + } else { commit_txn(txn,0); } } - if (error == DB_LOCK_NOTGRANTED && ((tokudb_debug & TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0)) { - sql_print_error("Could not truncate table %s because another transaction has accessed the \ - table. To truncate the table, make sure no transactions touch the table.", - share->table_name); + if (TOKUDB_LIKELY(TOKUDB_DEBUG_FLAGS( + TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS) == 0) && + error == DB_LOCK_NOTGRANTED) { + sql_print_error( + "Could not truncate table %s because another transaction has " + "accessed the table. To truncate the table, make sure no " + "transactions touch the table.", + share->full_table_name()); } // // regardless of errors, need to reopen the DB's @@ -8085,21 +8790,18 @@ cleanup: if (share->key_file[i] == NULL) { if (i != primary_key) { r = open_secondary_dictionary( - &share->key_file[i], - &table_share->key_info[i], - share->table_name, - false, // - NULL - ); - assert(!r); - } - else { + &share->key_file[i], + &table_share->key_info[i], + share->full_table_name(), + false, + NULL); + assert_always(!r); + } else { r = open_main_dictionary( - share->table_name, - false, - NULL - ); - assert(!r); + share->full_table_name(), + false, + NULL); + assert_always(!r); } } } @@ -8111,7 +8813,7 @@ void ha_tokudb::set_loader_error(int err) { } void ha_tokudb::set_dup_value_for_pk(DBT* key) { - assert(!hidden_primary_key); + assert_always(!hidden_primary_key); unpack_key(table->record[0],key,primary_key); last_dup_key = primary_key; } @@ -8144,21 +8846,28 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) { return idx_cond_arg; } +void ha_tokudb::cancel_pushed_idx_cond() { + invalidate_icp(); + handler::cancel_pushed_idx_cond(); +} + void ha_tokudb::cleanup_txn(DB_TXN *txn) { if (transaction == txn && cursor) { int r = cursor->c_close(cursor); - assert(r == 0); + assert_always(r == 0); cursor = NULL; } } void ha_tokudb::add_to_trx_handler_list() { - tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); + tokudb_trx_data* trx = + (tokudb_trx_data*)thd_get_ha_data(ha_thd(), tokudb_hton); trx->handlers = list_add(trx->handlers, &trx_handler_list); } void ha_tokudb::remove_from_trx_handler_list() { - tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton); + tokudb_trx_data* trx = + (tokudb_trx_data*)thd_get_ha_data(ha_thd(), tokudb_hton); trx->handlers = list_delete(trx->handlers, &trx_handler_list); } @@ -8190,7 +8899,7 @@ bool ha_tokudb::rpl_lookup_rows() { if (!in_rpl_delete_rows && !in_rpl_update_rows) return true; else - return THDVAR(ha_thd(), rpl_lookup_rows); + return tokudb::sysvars::rpl_lookup_rows(ha_thd()); } // table admin |