diff options
Diffstat (limited to 'storage/rocksdb/ha_rocksdb.cc')
-rw-r--r-- | storage/rocksdb/ha_rocksdb.cc | 855 |
1 files changed, 522 insertions, 333 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 59c6e1ab33b..d3157b0b800 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -59,6 +59,7 @@ #include "./rdb_i_s.h" #include "./rdb_index_merge.h" #include "./rdb_mutex_wrapper.h" +#include "./rdb_psi.h" #include "./rdb_threads.h" // Internal MySQL APIs not exposed in any header. @@ -302,7 +303,7 @@ static void rocksdb_set_pause_background_work( my_core::THD *const thd MY_ATTRIBUTE((__unused__)), struct st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)), void *const var_ptr MY_ATTRIBUTE((__unused__)), const void *const save) { - mysql_mutex_lock(&rdb_sysvars_mutex); + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); const bool pause_requested = *static_cast<const bool *>(save); if (rocksdb_pause_background_work != pause_requested) { if (pause_requested) { @@ -312,7 +313,7 @@ static void rocksdb_set_pause_background_work( } rocksdb_pause_background_work = pause_requested; } - mysql_mutex_unlock(&rdb_sysvars_mutex); + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } static void rocksdb_set_compaction_options(THD *thd, @@ -329,6 +330,10 @@ static void rocksdb_set_rate_limiter_bytes_per_sec(THD *thd, void *var_ptr, const void *save); +static void rocksdb_set_delayed_write_rate(THD *thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); + static void rdb_set_collation_exception_list(const char *exception_list); static void rocksdb_set_collation_exception_list(THD *thd, struct st_mysql_sys_var *var, @@ -350,14 +355,16 @@ static long long rocksdb_block_cache_size; /* Use unsigned long long instead of uint64_t because of MySQL compatibility */ static unsigned long long // NOLINT(runtime/int) rocksdb_rate_limiter_bytes_per_sec; +static unsigned long long rocksdb_delayed_write_rate; static unsigned long // NOLINT(runtime/int) - rocksdb_persistent_cache_size; + rocksdb_persistent_cache_size_mb; static uint64_t rocksdb_info_log_level; static char *rocksdb_wal_dir; static char *rocksdb_persistent_cache_path; static uint64_t rocksdb_index_type; static char rocksdb_background_sync; static uint32_t rocksdb_debug_optimizer_n_rows; +static my_bool rocksdb_force_compute_memtable_stats; static my_bool rocksdb_debug_optimizer_no_zero_cardinality; static uint32_t rocksdb_wal_recovery_mode; static uint32_t rocksdb_access_hint_on_compaction_start; @@ -413,11 +420,11 @@ static void rocksdb_set_rocksdb_info_log_level( const void *const save) { DBUG_ASSERT(save != nullptr); - mysql_mutex_lock(&rdb_sysvars_mutex); + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); rocksdb_info_log_level = *static_cast<const uint64_t *>(save); rocksdb_db_options.info_log->SetInfoLogLevel( static_cast<const rocksdb::InfoLogLevel>(rocksdb_info_log_level)); - mysql_mutex_unlock(&rdb_sysvars_mutex); + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } static const char *index_type_names[] = {"kBinarySearch", "kHashSearch", NullS}; @@ -478,6 +485,12 @@ static MYSQL_THDVAR_BOOL( "update and delete", nullptr, nullptr, FALSE); +static MYSQL_THDVAR_BOOL( + blind_delete_primary_key, PLUGIN_VAR_RQCMDARG, + "Deleting rows by primary key lookup, without reading rows (Blind Deletes)." + " Blind delete is disabled if the table has secondary key", + nullptr, nullptr, FALSE); + static MYSQL_THDVAR_STR( read_free_rpl_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, "List of tables that will use read-free replication on the slave " @@ -561,6 +574,13 @@ static MYSQL_SYSVAR_ULONGLONG( nullptr, rocksdb_set_rate_limiter_bytes_per_sec, /* default */ 0L, /* min */ 0L, /* max */ MAX_RATE_LIMITER_BYTES_PER_SEC, 0); +static MYSQL_SYSVAR_ULONGLONG(delayed_write_rate, rocksdb_delayed_write_rate, + PLUGIN_VAR_RQCMDARG, + "DBOptions::delayed_write_rate", nullptr, + rocksdb_set_delayed_write_rate, + rocksdb_db_options.delayed_write_rate, 0, + UINT64_MAX, 0); + static MYSQL_SYSVAR_ENUM( info_log_level, rocksdb_info_log_level, PLUGIN_VAR_RQCMDARG, "Filter level for info logs to be written mysqld error log. " @@ -579,8 +599,9 @@ static MYSQL_THDVAR_INT( static MYSQL_SYSVAR_UINT( wal_recovery_mode, rocksdb_wal_recovery_mode, PLUGIN_VAR_RQCMDARG, - "DBOptions::wal_recovery_mode for RocksDB", nullptr, nullptr, - /* default */ (uint)rocksdb::WALRecoveryMode::kPointInTimeRecovery, + "DBOptions::wal_recovery_mode for RocksDB. Default is kAbsoluteConsistency", + nullptr, nullptr, + /* default */ (uint)rocksdb::WALRecoveryMode::kAbsoluteConsistency, /* min */ (uint)rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords, /* max */ (uint)rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords, 0); @@ -638,13 +659,6 @@ static MYSQL_SYSVAR_ULONG(max_total_wal_size, /* min */ 0L, /* max */ LONG_MAX, 0); static MYSQL_SYSVAR_BOOL( - disabledatasync, - *reinterpret_cast<my_bool *>(&rocksdb_db_options.disableDataSync), - PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, - "DBOptions::disableDataSync for RocksDB", nullptr, nullptr, - rocksdb_db_options.disableDataSync); - -static MYSQL_SYSVAR_BOOL( use_fsync, *reinterpret_cast<my_bool *>(&rocksdb_db_options.use_fsync), PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "DBOptions::use_fsync for RocksDB", nullptr, nullptr, @@ -662,10 +676,10 @@ static MYSQL_SYSVAR_STR( nullptr, ""); static MYSQL_SYSVAR_ULONG( - persistent_cache_size, rocksdb_persistent_cache_size, + persistent_cache_size_mb, rocksdb_persistent_cache_size_mb, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, - "Size of cache for BlockBasedTableOptions::persistent_cache for RocksDB", - nullptr, nullptr, rocksdb_persistent_cache_size, + "Size of cache in MB for BlockBasedTableOptions::persistent_cache " + "for RocksDB", nullptr, nullptr, rocksdb_persistent_cache_size_mb, /* min */ 0L, /* max */ ULONG_MAX, 0); static MYSQL_SYSVAR_ULONG( @@ -946,9 +960,11 @@ static MYSQL_SYSVAR_BOOL(background_sync, rocksdb_background_sync, "turns on background syncs for RocksDB", nullptr, nullptr, FALSE); -static MYSQL_THDVAR_BOOL(write_sync, PLUGIN_VAR_RQCMDARG, - "WriteOptions::sync for RocksDB", nullptr, nullptr, - rocksdb::WriteOptions().sync); +static MYSQL_THDVAR_UINT(flush_log_at_trx_commit, PLUGIN_VAR_RQCMDARG, + "Sync on transaction commit. Similar to " + "innodb_flush_log_at_trx_commit. 1: sync on commit, " + "0,2: not sync on commit", + nullptr, nullptr, 1, 0, 2, 0); static MYSQL_THDVAR_BOOL(write_disable_wal, PLUGIN_VAR_RQCMDARG, "WriteOptions::disableWAL for RocksDB", nullptr, @@ -986,6 +1002,12 @@ static MYSQL_SYSVAR_UINT( "Test only to override rocksdb estimates of table size in a memtable", nullptr, nullptr, 0, /* min */ 0, /* max */ INT_MAX, 0); +static MYSQL_SYSVAR_BOOL(force_compute_memtable_stats, + rocksdb_force_compute_memtable_stats, + PLUGIN_VAR_RQCMDARG, + "Force to always compute memtable stats", + nullptr, nullptr, TRUE); + static MYSQL_SYSVAR_BOOL( debug_optimizer_no_zero_cardinality, rocksdb_debug_optimizer_no_zero_cardinality, PLUGIN_VAR_RQCMDARG, @@ -1085,6 +1107,7 @@ static MYSQL_SYSVAR_BOOL( "Counting SingleDelete as rocksdb_compaction_sequential_deletes", nullptr, nullptr, rocksdb_compaction_sequential_deletes_count_sd); + static MYSQL_SYSVAR_BOOL( print_snapshot_conflict_queries, rocksdb_print_snapshot_conflict_queries, PLUGIN_VAR_RQCMDARG, @@ -1104,6 +1127,11 @@ static MYSQL_THDVAR_BOOL(verify_row_debug_checksums, PLUGIN_VAR_RQCMDARG, "Verify checksums when reading index/table records", nullptr, nullptr, false /* default value */); +static MYSQL_THDVAR_BOOL(master_skip_tx_api, PLUGIN_VAR_RQCMDARG, + "Skipping holding any lock on row access. " + "Not effective on slave.", + nullptr, nullptr, false); + static MYSQL_SYSVAR_UINT( validate_tables, rocksdb_validate_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, @@ -1154,6 +1182,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(skip_unique_check_tables), MYSQL_SYSVAR(trace_sst_api), MYSQL_SYSVAR(commit_in_the_middle), + MYSQL_SYSVAR(blind_delete_primary_key), MYSQL_SYSVAR(read_free_rpl_tables), MYSQL_SYSVAR(bulk_load_size), MYSQL_SYSVAR(merge_buf_size), @@ -1167,14 +1196,14 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(error_if_exists), MYSQL_SYSVAR(paranoid_checks), MYSQL_SYSVAR(rate_limiter_bytes_per_sec), + MYSQL_SYSVAR(delayed_write_rate), MYSQL_SYSVAR(info_log_level), MYSQL_SYSVAR(max_open_files), MYSQL_SYSVAR(max_total_wal_size), - MYSQL_SYSVAR(disabledatasync), MYSQL_SYSVAR(use_fsync), MYSQL_SYSVAR(wal_dir), MYSQL_SYSVAR(persistent_cache_path), - MYSQL_SYSVAR(persistent_cache_size), + MYSQL_SYSVAR(persistent_cache_size_mb), MYSQL_SYSVAR(delete_obsolete_files_period_micros), MYSQL_SYSVAR(base_background_compactions), MYSQL_SYSVAR(max_background_compactions), @@ -1224,7 +1253,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(background_sync), - MYSQL_SYSVAR(write_sync), + MYSQL_SYSVAR(flush_log_at_trx_commit), MYSQL_SYSVAR(write_disable_wal), MYSQL_SYSVAR(write_ignore_missing_column_families), @@ -1234,6 +1263,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(records_in_range), MYSQL_SYSVAR(force_index_records_in_range), MYSQL_SYSVAR(debug_optimizer_n_rows), + MYSQL_SYSVAR(force_compute_memtable_stats), MYSQL_SYSVAR(debug_optimizer_no_zero_cardinality), MYSQL_SYSVAR(compact_cf), @@ -1259,6 +1289,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(checksums_pct), MYSQL_SYSVAR(store_row_debug_checksums), MYSQL_SYSVAR(verify_row_debug_checksums), + MYSQL_SYSVAR(master_skip_tx_api), MYSQL_SYSVAR(validate_tables), MYSQL_SYSVAR(table_stats_sampling_pct), @@ -1268,7 +1299,7 @@ static rocksdb::WriteOptions rdb_get_rocksdb_write_options(my_core::THD *const thd) { rocksdb::WriteOptions opt; - opt.sync = THDVAR(thd, write_sync); + opt.sync = THDVAR(thd, flush_log_at_trx_commit) == 1; opt.disableWAL = THDVAR(thd, write_disable_wal); opt.ignore_missing_column_families = THDVAR(thd, write_ignore_missing_column_families); @@ -1292,85 +1323,6 @@ Rdb_open_tables_map::get_hash_key(Rdb_table_handler *const table_handler, } /* - The following is needed as an argument for mysql_stage_register, - irrespectively of whether we're compiling with P_S or not. -*/ -PSI_stage_info stage_waiting_on_row_lock = {0, "Waiting for row lock", 0}; - -#ifdef HAVE_PSI_INTERFACE -static PSI_thread_key rdb_background_psi_thread_key; -static PSI_thread_key rdb_drop_idx_psi_thread_key; - -static PSI_stage_info *all_rocksdb_stages[] = {&stage_waiting_on_row_lock}; - -static my_core::PSI_mutex_key rdb_psi_open_tbls_mutex_key, - rdb_signal_bg_psi_mutex_key, rdb_signal_drop_idx_psi_mutex_key, - rdb_collation_data_mutex_key, rdb_mem_cmp_space_mutex_key, - key_mutex_tx_list, rdb_sysvars_psi_mutex_key; - -static PSI_mutex_info all_rocksdb_mutexes[] = { - {&rdb_psi_open_tbls_mutex_key, "open tables", PSI_FLAG_GLOBAL}, - {&rdb_signal_bg_psi_mutex_key, "stop background", PSI_FLAG_GLOBAL}, - {&rdb_signal_drop_idx_psi_mutex_key, "signal drop index", PSI_FLAG_GLOBAL}, - {&rdb_collation_data_mutex_key, "collation data init", PSI_FLAG_GLOBAL}, - {&rdb_mem_cmp_space_mutex_key, "collation space char data init", - PSI_FLAG_GLOBAL}, - {&key_mutex_tx_list, "tx_list", PSI_FLAG_GLOBAL}, - {&rdb_sysvars_psi_mutex_key, "setting sysvar", PSI_FLAG_GLOBAL}, -}; - -static PSI_rwlock_key key_rwlock_collation_exception_list; -static PSI_rwlock_key key_rwlock_read_free_rpl_tables; -static PSI_rwlock_key key_rwlock_skip_unique_check_tables; - -static PSI_rwlock_info all_rocksdb_rwlocks[] = { - {&key_rwlock_collation_exception_list, "collation_exception_list", - PSI_FLAG_GLOBAL}, - {&key_rwlock_read_free_rpl_tables, "read_free_rpl_tables", PSI_FLAG_GLOBAL}, - {&key_rwlock_skip_unique_check_tables, "skip_unique_check_tables", - PSI_FLAG_GLOBAL}, -}; - -PSI_cond_key rdb_signal_bg_psi_cond_key, rdb_signal_drop_idx_psi_cond_key; - -static PSI_cond_info all_rocksdb_conds[] = { - {&rdb_signal_bg_psi_cond_key, "cond signal background", PSI_FLAG_GLOBAL}, - {&rdb_signal_drop_idx_psi_cond_key, "cond signal drop index", - PSI_FLAG_GLOBAL}, -}; - -static PSI_thread_info all_rocksdb_threads[] = { - {&rdb_background_psi_thread_key, "background", PSI_FLAG_GLOBAL}, - {&rdb_drop_idx_psi_thread_key, "drop index", PSI_FLAG_GLOBAL}, -}; - -static void init_rocksdb_psi_keys() { - const char *const category = "rocksdb"; - int count; - - if (PSI_server == nullptr) - return; - - count = array_elements(all_rocksdb_mutexes); - PSI_server->register_mutex(category, all_rocksdb_mutexes, count); - - count = array_elements(all_rocksdb_rwlocks); - PSI_server->register_rwlock(category, all_rocksdb_rwlocks, count); - - count = array_elements(all_rocksdb_conds); - // TODO Disabling PFS for conditions due to the bug - // https://github.com/MySQLOnRocksDB/mysql-5.6/issues/92 - // PSI_server->register_cond(category, all_rocksdb_conds, count); - - count = array_elements(all_rocksdb_stages); - mysql_stage_register(category, all_rocksdb_stages, count); - - count = array_elements(all_rocksdb_threads); - mysql_thread_register(category, all_rocksdb_threads, count); -} -#endif - -/* Drop index thread's control */ @@ -1503,10 +1455,12 @@ public: static void walk_tx_list(Rdb_tx_list_walker *walker) { DBUG_ASSERT(walker != nullptr); - mysql_mutex_lock(&s_tx_list_mutex); + RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex); + for (auto it : s_tx_list) walker->process_tran(it); - mysql_mutex_unlock(&s_tx_list_mutex); + + RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex); } int set_status_error(THD *const thd, const rocksdb::Status &s, @@ -1850,15 +1804,15 @@ public: explicit Rdb_transaction(THD *const thd) : m_thd(thd), m_tbl_io_perf(nullptr) { - mysql_mutex_lock(&s_tx_list_mutex); + RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex); s_tx_list.insert(this); - mysql_mutex_unlock(&s_tx_list_mutex); + RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex); } virtual ~Rdb_transaction() { - mysql_mutex_lock(&s_tx_list_mutex); + RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex); s_tx_list.erase(this); - mysql_mutex_unlock(&s_tx_list_mutex); + RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex); } }; @@ -2075,7 +2029,7 @@ public: tx_opts.lock_timeout = rdb_convert_sec_to_ms(m_timeout_sec); tx_opts.deadlock_detect = THDVAR(m_thd, deadlock_detect); - write_opts.sync = THDVAR(m_thd, write_sync); + write_opts.sync = THDVAR(m_thd, flush_log_at_trx_commit) == 1; write_opts.disableWAL = THDVAR(m_thd, write_disable_wal); write_opts.ignore_missing_column_families = THDVAR(m_thd, write_ignore_missing_column_families); @@ -2288,7 +2242,7 @@ public: void start_tx() override { reset(); - write_opts.sync = THDVAR(m_thd, write_sync); + write_opts.sync = THDVAR(m_thd, flush_log_at_trx_commit) == 1; write_opts.disableWAL = THDVAR(m_thd, write_disable_wal); write_opts.ignore_missing_column_families = THDVAR(m_thd, write_ignore_missing_column_families); @@ -2367,10 +2321,15 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) { Rdb_transaction *&tx = get_tx_from_thd(thd); // TODO: this is called too many times.. O(#rows) if (tx == nullptr) { - if (rpl_skip_tx_api && thd->rli_slave) + if ((rpl_skip_tx_api && thd->rli_slave) || + (THDVAR(thd, master_skip_tx_api) && !thd->rli_slave)) + { tx = new Rdb_writebatch_impl(thd); + } else + { tx = new Rdb_transaction_impl(thd); + } tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks)); tx->start_tx(); } else { @@ -2474,7 +2433,7 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd, return HA_EXIT_FAILURE; } if (thd->durability_property == HA_IGNORE_DURABILITY && - THDVAR(thd, write_sync)) { + (THDVAR(thd, flush_log_at_trx_commit) == 1)) { /** we set the log sequence as '1' just to trigger hton->flush_logs */ @@ -3232,9 +3191,7 @@ static int rocksdb_init_func(void *const p) { // Validate the assumption about the size of ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN. static_assert(sizeof(longlong) == 8, "Assuming that longlong is 8 bytes."); -#ifdef HAVE_PSI_INTERFACE init_rocksdb_psi_keys(); -#endif rocksdb_hton = (handlerton *)p; mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &rdb_open_tables.m_mutex, @@ -3298,6 +3255,8 @@ static int rocksdb_init_func(void *const p) { rocksdb_db_options.rate_limiter = rocksdb_rate_limiter; } + rocksdb_db_options.delayed_write_rate = rocksdb_delayed_write_rate; + std::shared_ptr<Rdb_logger> myrocks_logger = std::make_shared<Rdb_logger>(); rocksdb::Status s = rocksdb::CreateLoggerFromOptions( rocksdb_datadir, rocksdb_db_options, &rocksdb_db_options.info_log); @@ -3383,24 +3342,25 @@ static int rocksdb_init_func(void *const p) { rocksdb_set_compaction_options(nullptr, nullptr, nullptr, nullptr); - mysql_mutex_lock(&rdb_sysvars_mutex); + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); DBUG_ASSERT(rocksdb_table_stats_sampling_pct <= RDB_TBL_STATS_SAMPLE_PCT_MAX); properties_collector_factory->SetTableStatsSamplingPct( rocksdb_table_stats_sampling_pct); - mysql_mutex_unlock(&rdb_sysvars_mutex); + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } - if (rocksdb_persistent_cache_size > 0) { + if (rocksdb_persistent_cache_size_mb > 0) { std::shared_ptr<rocksdb::PersistentCache> pcache; + uint64_t cache_size_bytes= rocksdb_persistent_cache_size_mb * 1024 * 1024; rocksdb::NewPersistentCache( rocksdb::Env::Default(), std::string(rocksdb_persistent_cache_path), - rocksdb_persistent_cache_size, myrocks_logger, true, &pcache); + cache_size_bytes, myrocks_logger, true, &pcache); rocksdb_tbl_options.persistent_cache = pcache; } else if (strlen(rocksdb_persistent_cache_path)) { - sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size"); + sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size_mb"); DBUG_RETURN(1); } @@ -3581,7 +3541,7 @@ static int rocksdb_done_func(void *const p) { // signal the drop index thread to stop rdb_drop_idx_thread.signal(true); - // Flush all memtables for not lose data, even if WAL is disabled. + // Flush all memtables for not losing data, even if WAL is disabled. rocksdb_flush_all_memtables(); // Stop all rocksdb background work @@ -3652,6 +3612,16 @@ static int rocksdb_done_func(void *const p) { DBUG_RETURN(error); } +static inline void rocksdb_smart_seek(bool seek_backward, + rocksdb::Iterator *const iter, + const rocksdb::Slice &key_slice) { + if (seek_backward) { + iter->SeekForPrev(key_slice); + } else { + iter->Seek(key_slice); + } +} + /** @brief Example of simple lock controls. The "table_handler" it creates is a @@ -3670,7 +3640,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) { length = (uint)strlen(table_name); // First, look up the table in the hash map. - mysql_mutex_lock(&m_mutex); + RDB_MUTEX_LOCK_CHECK(m_mutex); if (!(table_handler = reinterpret_cast<Rdb_table_handler *>(my_hash_search( &m_hash, reinterpret_cast<const uchar *>(table_name), length)))) { // Since we did not find it in the hash map, attempt to create and add it @@ -3679,7 +3649,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) { MYF(MY_WME | MY_ZEROFILL), &table_handler, sizeof(*table_handler), &tmp_name, length + 1, NullS)))) { // Allocating a new Rdb_table_handler and a new table name failed. - mysql_mutex_unlock(&m_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_mutex); return nullptr; } @@ -3690,7 +3660,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) { if (my_hash_insert(&m_hash, reinterpret_cast<uchar *>(table_handler))) { // Inserting into the hash map failed. - mysql_mutex_unlock(&m_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_mutex); my_free(table_handler); return nullptr; } @@ -3701,7 +3671,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) { DBUG_ASSERT(table_handler->m_ref_count >= 0); table_handler->m_ref_count++; - mysql_mutex_unlock(&m_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_mutex); return table_handler; } @@ -3715,7 +3685,7 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const { const Rdb_table_handler *table_handler; std::vector<std::string> names; - mysql_mutex_lock(&m_mutex); + RDB_MUTEX_LOCK_CHECK(m_mutex); for (i = 0; (table_handler = reinterpret_cast<const Rdb_table_handler *>( my_hash_const_element(&m_hash, i))); i++) { @@ -3723,7 +3693,7 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const { names.push_back(table_handler->m_table_name); } DBUG_ASSERT(i == m_hash.records); - mysql_mutex_unlock(&m_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_mutex); return names; } @@ -3872,7 +3842,7 @@ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) { void Rdb_open_tables_map::release_table_handler( Rdb_table_handler *const table_handler) { - mysql_mutex_lock(&m_mutex); + RDB_MUTEX_LOCK_CHECK(m_mutex); DBUG_ASSERT(table_handler != nullptr); DBUG_ASSERT(table_handler->m_ref_count > 0); @@ -3885,7 +3855,7 @@ void Rdb_open_tables_map::release_table_handler( my_free(table_handler); } - mysql_mutex_unlock(&m_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_mutex); } static handler *rocksdb_create_handler(my_core::handlerton *const hton, @@ -4137,6 +4107,92 @@ int ha_rocksdb::convert_record_from_storage_format( return convert_record_from_storage_format(key, &retrieved_rec_slice, buf); } +int ha_rocksdb::convert_blob_from_storage_format( + my_core::Field_blob *const blob, + Rdb_string_reader *const reader, + bool decode) +{ + /* Get the number of bytes needed to store length*/ + const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; + + const char *data_len_str; + if (!(data_len_str = reader->read(length_bytes))) { + return HA_ERR_INTERNAL_ERROR; + } + + memcpy(blob->ptr, data_len_str, length_bytes); + + const uint32 data_len = blob->get_length( + reinterpret_cast<const uchar*>(data_len_str), length_bytes, + table->s->db_low_byte_first); + const char *blob_ptr; + if (!(blob_ptr = reader->read(data_len))) { + return HA_ERR_INTERNAL_ERROR; + } + + if (decode) { + // set 8-byte pointer to 0, like innodb does (relevant for 32-bit + // platforms) + memset(blob->ptr + length_bytes, 0, 8); + memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); + } + + return HA_EXIT_SUCCESS; +} + +int ha_rocksdb::convert_varchar_from_storage_format( + my_core::Field_varstring *const field_var, + Rdb_string_reader *const reader, + bool decode) +{ + const char *data_len_str; + if (!(data_len_str = reader->read(field_var->length_bytes))) + return HA_ERR_INTERNAL_ERROR; + + uint data_len; + /* field_var->length_bytes is 1 or 2 */ + if (field_var->length_bytes == 1) { + data_len = (uchar)data_len_str[0]; + } else { + DBUG_ASSERT(field_var->length_bytes == 2); + data_len = uint2korr(data_len_str); + } + + if (data_len > field_var->field_length) { + /* The data on disk is longer than table DDL allows? */ + return HA_ERR_INTERNAL_ERROR; + } + + if (!reader->read(data_len)) { + return HA_ERR_INTERNAL_ERROR; + } + + if (decode) { + memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len); + } + + return HA_EXIT_SUCCESS; +} + +int ha_rocksdb::convert_field_from_storage_format( + my_core::Field *const field, + Rdb_string_reader *const reader, + bool decode, + uint len) +{ + const char *data_bytes; + if (len > 0) { + if ((data_bytes = reader->read(len)) == nullptr) { + return HA_ERR_INTERNAL_ERROR; + } + + if (decode) + memcpy(field->ptr, data_bytes, len); + } + + return HA_EXIT_SUCCESS; +} + /* @brief Unpack the record in this->m_retrieved_record and this->m_last_rowkey from @@ -4168,7 +4224,6 @@ int ha_rocksdb::convert_record_from_storage_format( DBUG_ASSERT(buf != nullptr); Rdb_string_reader reader(value); - const my_ptrdiff_t ptr_diff = buf - table->record[0]; /* Decode PK fields from the key @@ -4208,6 +4263,7 @@ int ha_rocksdb::convert_record_from_storage_format( return HA_ERR_INTERNAL_ERROR; } + int err = HA_EXIT_SUCCESS; for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) { const Rdb_field_encoder *const field_dec = it->m_field_enc; const bool decode = it->m_decode; @@ -4221,89 +4277,49 @@ int ha_rocksdb::convert_record_from_storage_format( if (it->m_skip && !reader.read(it->m_skip)) return HA_ERR_INTERNAL_ERROR; + uint field_offset = field->ptr - table->record[0]; + uint null_offset = field->null_offset(); + bool maybe_null = field->real_maybe_null(); + field->move_field(buf + field_offset, + maybe_null ? buf + null_offset : nullptr, + field->null_bit); + // WARNING! - Don't return before restoring field->ptr and field->null_ptr! + if (isNull) { if (decode) { /* This sets the NULL-bit of this record */ - field->set_null(ptr_diff); + field->set_null(); /* Besides that, set the field value to default value. CHECKSUM TABLE depends on this. */ - uint field_offset = field->ptr - table->record[0]; - memcpy(buf + field_offset, table->s->default_values + field_offset, + memcpy(field->ptr, table->s->default_values + field_offset, field->pack_length()); } - continue; } else { - if (decode) - field->set_notnull(ptr_diff); - } - - if (field_dec->m_field_type == MYSQL_TYPE_BLOB) { - my_core::Field_blob *const blob = (my_core::Field_blob *)field; - /* Get the number of bytes needed to store length*/ - const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; - - blob->move_field_offset(ptr_diff); - - const char *data_len_str; - if (!(data_len_str = reader.read(length_bytes))) { - blob->move_field_offset(-ptr_diff); - return HA_ERR_INTERNAL_ERROR; - } - - memcpy(blob->ptr, data_len_str, length_bytes); - - const uint32 data_len = blob->get_length( - (uchar *)data_len_str, length_bytes, table->s->db_low_byte_first); - const char *blob_ptr; - if (!(blob_ptr = reader.read(data_len))) { - blob->move_field_offset(-ptr_diff); - return HA_ERR_INTERNAL_ERROR; - } - if (decode) { - // set 8-byte pointer to 0, like innodb does (relevant for 32-bit - // platforms) - memset(blob->ptr + length_bytes, 0, 8); - memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); - blob->move_field_offset(-ptr_diff); + field->set_notnull(); } - } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { - Field_varstring *const field_var = (Field_varstring *)field; - const char *data_len_str; - if (!(data_len_str = reader.read(field_var->length_bytes))) - return HA_ERR_INTERNAL_ERROR; - uint data_len; - /* field_var->length_bytes is 1 or 2 */ - if (field_var->length_bytes == 1) { - data_len = (uchar)data_len_str[0]; + if (field_dec->m_field_type == MYSQL_TYPE_BLOB) { + err = convert_blob_from_storage_format( + (my_core::Field_blob *) field, &reader, decode); + } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { + err = convert_varchar_from_storage_format( + (my_core::Field_varstring *) field, &reader, decode); } else { - DBUG_ASSERT(field_var->length_bytes == 2); - data_len = uint2korr(data_len_str); - } - if (data_len > field->field_length) { - /* The data on disk is longer than table DDL allows? */ - return HA_ERR_INTERNAL_ERROR; + err = convert_field_from_storage_format( + field, &reader, decode, field_dec->m_pack_length_in_rec); } - if (!reader.read(data_len)) - return HA_ERR_INTERNAL_ERROR; + } - if (decode) { - memcpy(field_var->ptr + ptr_diff, data_len_str, - field_var->length_bytes + data_len); - } - } else { - const char *data_bytes; - const uint len = field_dec->m_pack_length_in_rec; - if (len > 0) { - if ((data_bytes = reader.read(len)) == nullptr) { - return HA_ERR_INTERNAL_ERROR; - } - if (decode) - memcpy(field->ptr + ptr_diff, data_bytes, len); - } + // Restore field->ptr and field->null_ptr + field->move_field(table->record[0] + field_offset, + maybe_null ? table->record[0] + null_offset : nullptr, + field->null_bit); + + if (err != HA_EXIT_SUCCESS) { + return err; } } @@ -4723,6 +4739,14 @@ bool ha_rocksdb::get_error_message(const int error, String *const buf) { buf->append(tx->m_detailed_error); temp_error = true; break; + case HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED: + buf->append("Specifying DATA DIRECTORY for an individual table is not " + "supported."); + break; + case HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED: + buf->append("Specifying INDEX DIRECTORY for an individual table is not " + "supported."); + break; default: // We can be called with the values which are < HA_ERR_FIRST because most // MySQL internal functions will just return HA_EXIT_FAILURE in case of @@ -4791,7 +4815,7 @@ int ha_rocksdb::create_key_defs( */ if (create_cfs(table_arg, tbl_def_arg, &cfs)) { DBUG_RETURN(HA_EXIT_FAILURE); - }; + } if (!old_tbl_def_arg) { /* @@ -4845,6 +4869,7 @@ int ha_rocksdb::create_cfs( DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(table_arg->s != nullptr); + DBUG_ASSERT(tbl_def_arg != nullptr); char tablename_sys[NAME_LEN + 1]; @@ -4884,35 +4909,53 @@ int ha_rocksdb::create_cfs( } } - /* - index comment has Column Family name. If there was no comment, we get - NULL, and it means use the default column family. - */ - const char *const comment = get_key_comment(i, table_arg, tbl_def_arg); + // Internal consistency check to make sure that data in TABLE and + // Rdb_tbl_def structures matches. Either both are missing or both are + // specified. Yes, this is critical enough to make it into SHIP_ASSERT. + SHIP_ASSERT(!table_arg->part_info == tbl_def_arg->base_partition().empty()); + + // Generate the name for the column family to use. + bool per_part_match_found = false; + std::string cf_name = generate_cf_name(i, table_arg, tbl_def_arg, + &per_part_match_found); + const char *const key_name = get_key_name(i, table_arg, tbl_def_arg); - if (looks_like_per_index_cf_typo(comment)) { + if (looks_like_per_index_cf_typo(cf_name.c_str())) { my_error(ER_NOT_SUPPORTED_YET, MYF(0), - "column family name looks like a typo of $per_index_cf"); + "column family name looks like a typo of $per_index_cf."); DBUG_RETURN(HA_EXIT_FAILURE); } - /* Prevent create from using the system column family */ - if (comment && strcmp(DEFAULT_SYSTEM_CF_NAME, comment) == 0) { + + // Prevent create from using the system column family. + if (!cf_name.empty() && strcmp(DEFAULT_SYSTEM_CF_NAME, + cf_name.c_str()) == 0) { my_error(ER_WRONG_ARGUMENTS, MYF(0), - "column family not valid for storing index data"); + "column family not valid for storing index data."); DBUG_RETURN(HA_EXIT_FAILURE); } + bool is_auto_cf_flag; + + // Here's how `get_or_create_cf` will use the input parameters: + // + // `cf_name` - will be used as a CF name. + // `key_name` - will be only used in case of "$per_index_cf". cf_handle = - cf_manager.get_or_create_cf(rdb, comment, tbl_def_arg->full_tablename(), - key_name, &is_auto_cf_flag); - if (!cf_handle) + cf_manager.get_or_create_cf(rdb, cf_name.c_str(), + tbl_def_arg->full_tablename(), key_name, + &is_auto_cf_flag); + + if (!cf_handle) { DBUG_RETURN(HA_EXIT_FAILURE); + } auto &cf = (*cfs)[i]; + cf.cf_handle = cf_handle; - cf.is_reverse_cf = Rdb_cf_manager::is_cf_name_reverse(comment); + cf.is_reverse_cf = Rdb_cf_manager::is_cf_name_reverse(cf_name.c_str()); cf.is_auto_cf = is_auto_cf_flag; + cf.is_per_partition_cf = per_part_match_found; } DBUG_RETURN(HA_EXIT_SUCCESS); @@ -4982,7 +5025,8 @@ int ha_rocksdb::create_inplace_key_defs( new_key_descr[i] = std::make_shared<Rdb_key_def>( okd.get_index_number(), i, okd.get_cf(), index_dict_version, index_type, kv_version, okd.m_is_reverse_cf, okd.m_is_auto_cf, - okd.m_name.c_str(), dict_manager.get_stats(gl_index_id)); + okd.m_is_per_partition_cf, okd.m_name.c_str(), + dict_manager.get_stats(gl_index_id)); } else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i], cfs[i])) { DBUG_RETURN(HA_EXIT_FAILURE); @@ -5118,7 +5162,8 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i, const char *const key_name = get_key_name(i, table_arg, m_tbl_def); *new_key_def = std::make_shared<Rdb_key_def>( index_id, i, cf_info.cf_handle, index_dict_version, index_type, - kv_version, cf_info.is_reverse_cf, cf_info.is_auto_cf, key_name); + kv_version, cf_info.is_reverse_cf, cf_info.is_auto_cf, + cf_info.is_per_partition_cf, key_name); DBUG_RETURN(HA_EXIT_SUCCESS); } @@ -5271,6 +5316,22 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg, HA_CREATE_INFO *const create_info) { DBUG_ENTER_FUNC(); + DBUG_ASSERT(table_arg != nullptr); + DBUG_ASSERT(create_info != nullptr); + + if (create_info->data_file_name) { + // DATA DIRECTORY is used to create tables under a specific location + // outside the MySQL data directory. We don't support this for MyRocks. + // The `rocksdb_datadir` setting should be used to configure RocksDB data + // directory. + DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED); + } + + if (create_info->index_file_name) { + // Similar check for INDEX DIRECTORY as well. + DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED); + } + int res; std::string str; @@ -5441,15 +5502,7 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, index_tuple= lookup_tuple. lookup_tuple may be a prefix of the index. */ - if (kd.m_is_reverse_cf) { - if (!full_key_match) { - if (!iter->Valid()) - iter->SeekToLast(); - else - iter->Prev(); - } - } - + rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice); if (!iter->Valid() || !kd.value_matches_prefix(iter->key(), key_slice)) { /* Got a record that is not equal to the lookup value, or even a record @@ -5467,24 +5520,20 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd, We are looking for record with the biggest t.key such that t.key < lookup_tuple. */ - if (kd.m_is_reverse_cf) { - if (m_scan_it->Valid() && full_key_match && - kd.value_matches_prefix(m_scan_it->key(), key_slice)) { - /* We are using full key and we've hit an exact match */ + rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice); + if (m_scan_it->Valid() && full_key_match && + kd.value_matches_prefix(m_scan_it->key(), key_slice)) { + /* We are using full key and we've hit an exact match */ + if (kd.m_is_reverse_cf) { m_scan_it->Next(); - } - } else { - if (m_scan_it->Valid()) + } else { m_scan_it->Prev(); - else - m_scan_it->SeekToLast(); + } } - return m_scan_it->Valid() ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND; } int ha_rocksdb::read_after_key(const Rdb_key_def &kd, - const bool &full_key_match, const rocksdb::Slice &key_slice) { /* We are looking for the first record such that @@ -5494,22 +5543,7 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd, with HA_READ_AFTER_KEY, $GT = '>', with HA_READ_KEY_OR_NEXT, $GT = '>=' */ - if (kd.m_is_reverse_cf) { - if (!m_scan_it->Valid()) { - m_scan_it->SeekToLast(); - } else { - /* - We should step back - - when not using full extended key - - when using full extended key and when we've got an exact match - */ - if (!full_key_match || - !kd.value_matches_prefix(m_scan_it->key(), key_slice)) { - m_scan_it->Prev(); - } - } - } - + rocksdb_smart_seek(kd.m_is_reverse_cf, m_scan_it, key_slice); return m_scan_it->Valid() ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND; } @@ -5538,7 +5572,7 @@ int ha_rocksdb::position_to_correct_key(const Rdb_key_def &kd, break; case HA_READ_AFTER_KEY: case HA_READ_KEY_OR_NEXT: - rc = read_after_key(kd, full_key_match, key_slice); + rc = read_after_key(kd, key_slice); if (rc == 0 && !kd.covers_key(m_scan_it->key())) { /* The record we've got is not from this index */ rc = HA_ERR_KEY_NOT_FOUND; @@ -5905,8 +5939,10 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, */ const uint size = kd.pack_index_tuple(table, m_pack_buffer, m_pk_packed_tuple, key, keypart_map); - rc = get_row_by_rowid(buf, m_pk_packed_tuple, size); - if (!rc) { + bool skip_lookup = is_blind_delete_enabled(); + rc = get_row_by_rowid(buf, m_pk_packed_tuple, size, + skip_lookup); + if (!rc && !skip_lookup) { stats.rows_read++; stats.rows_index_first++; update_row_stats(ROWS_READ); @@ -6339,6 +6375,17 @@ rocksdb::Status ha_rocksdb::get_for_update( return s; } +bool ha_rocksdb::is_blind_delete_enabled() +{ + THD *thd = ha_thd(); + return (THDVAR(thd, blind_delete_primary_key) && + thd->lex->sql_command == SQLCOM_DELETE && + thd->lex->table_count == 1 && + table->s->keys == 1 && + !has_hidden_pk(table) && + !thd->rli_slave); +} + /* Given a rowid (i.e. packed PK) as a parameter, get the record. @@ -6348,7 +6395,8 @@ rocksdb::Status ha_rocksdb::get_for_update( */ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, - const uint rowid_size) { + const uint rowid_size, + const bool skip_lookup) { DBUG_ENTER_FUNC(); DBUG_ASSERT(buf != nullptr); @@ -6374,6 +6422,16 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, bool found; rocksdb::Status s; + /* Pretend row found without looking up */ + if (skip_lookup) + { + stats.rows_deleted_blind++; + update_row_stats(ROWS_DELETED_BLIND); + m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin); + table->status = 0; + DBUG_RETURN(0); + } + if (m_lock_rows == RDB_LOCK_NONE) { tx->acquire_snapshot(true); s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record); @@ -6554,6 +6612,7 @@ int ha_rocksdb::index_first_intern(uchar *const buf) { for (;;) { setup_scan_iterator(kd, &index_key, false, !kd.m_is_reverse_cf, Rdb_key_def::INDEX_NUMBER_SIZE); + m_scan_it->Seek(index_key); m_skip_scan_it_next_call = true; rc = index_next_with_direction(buf, true); @@ -6649,21 +6708,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) { for (;;) { setup_scan_iterator(kd, &index_key, false, kd.m_is_reverse_cf, Rdb_key_def::INDEX_NUMBER_SIZE); - - if (!m_scan_it->Valid()) { - /* - RocksDB keyspace has no records that follow our index. It could be, our - index is the last in the keyspace. Navigate to last record in the - keyspace. - */ - m_scan_it->SeekToLast(); - } else { - /* - Iterator is pointing at some record right after the end of the index. - Move one record back. - */ - m_scan_it->Prev(); - } + m_scan_it->SeekForPrev(index_key); m_skip_scan_it_next_call = false; if (is_pk(active_index, table, m_tbl_def)) { @@ -6726,11 +6771,15 @@ bool ha_rocksdb::skip_unique_check() const { 2) this table is in the whitelist of tables to skip and the replication lag has reached a large enough value (see unique_check_lag_threshold and unique_check_lage_reset_threshold) - 3) the user set unique_checks option to 0 + 3) the user set unique_checks option to 0, and the table does not have + any indexes. If the table has secondary keys, then those might becomes + inconsisted/corrupted */ return THDVAR(table->in_use, bulk_load) || (m_force_skip_unique_check && m_skip_unique_check) || - my_core::thd_test_options(table->in_use, OPTION_RELAXED_UNIQUE_CHECKS); + (my_core::thd_test_options(table->in_use, + OPTION_RELAXED_UNIQUE_CHECKS) && + m_tbl_def->m_key_count == 1); } void ha_rocksdb::set_force_skip_unique_check(bool skip) { @@ -6805,15 +6854,31 @@ bool ha_rocksdb::is_pk(const uint index, const TABLE *const table_arg, is_hidden_pk(index, table_arg, tbl_def_arg); } +/* + Formats the string and returns the column family name assignment part for a + specific partition. +*/ +const std::string ha_rocksdb::gen_cf_name_qualifier_for_partition( + const std::string& prefix) { + DBUG_ASSERT(!prefix.empty()); + + return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER + + RDB_PER_PARTITION_QUALIFIER_VALUE_SEP; +} + const char *ha_rocksdb::get_key_name(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { DBUG_ASSERT(table_arg != nullptr); + DBUG_ASSERT(tbl_def_arg != nullptr); if (is_hidden_pk(index, table_arg, tbl_def_arg)) { return HIDDEN_PK_NAME; } + DBUG_ASSERT(table_arg->key_info != nullptr); + DBUG_ASSERT(table_arg->key_info[index].name != nullptr); + return table_arg->key_info[index].name; } @@ -6821,14 +6886,84 @@ const char *ha_rocksdb::get_key_comment(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { DBUG_ASSERT(table_arg != nullptr); + DBUG_ASSERT(tbl_def_arg != nullptr); if (is_hidden_pk(index, table_arg, tbl_def_arg)) { return nullptr; } + DBUG_ASSERT(table_arg->key_info != nullptr); + return table_arg->key_info[index].comment.str; } +const std::string ha_rocksdb::generate_cf_name(const uint index, + const TABLE *const table_arg, + const Rdb_tbl_def *const tbl_def_arg, + bool *per_part_match_found) { + DBUG_ASSERT(table_arg != nullptr); + DBUG_ASSERT(tbl_def_arg != nullptr); + DBUG_ASSERT(per_part_match_found != nullptr); + + // When creating CF-s the caller needs to know if there was a custom CF name + // specified for a given paritition. + *per_part_match_found = false; + + // Index comment is used to define the column family name specification(s). + // If there was no comment, we get an emptry string, and it means "use the + // default column family". + const char *const comment = get_key_comment(index, table_arg, tbl_def_arg); + + // `get_key_comment` can return `nullptr`, that's why this. + std::string key_comment = comment ? comment : ""; + + // If table has partitions then we need to check if user has requested to + // create a column family with a specific name on a per partition basis. + if (table_arg->part_info != nullptr) { + std::string partition_name = tbl_def_arg->base_partition(); + DBUG_ASSERT(!partition_name.empty()); + + // Let's fetch the comment for a index and check if there's a custom key + // name specified for a partition we are handling. + std::vector<std::string> v = myrocks::parse_into_tokens(key_comment, + RDB_QUALIFIER_SEP); + std::string part_to_search = gen_cf_name_qualifier_for_partition( + partition_name); + DBUG_ASSERT(!part_to_search.empty()); + + // Basic O(N) search for a matching assignment. At most we expect maybe + // ten or so elements here. + for (const auto &it : v) { + if (it.substr(0, part_to_search.length()) == part_to_search) { + // We found a prefix match. Try to parse it as an assignment. + std::vector<std::string> tokens = myrocks::parse_into_tokens(it, + RDB_PER_PARTITION_QUALIFIER_VALUE_SEP); + + // We found a custom name, it was in the form we expected it to be. + // Return that instead of whatever we initially wanted to return. In + // a case below the `foo` part will be returned to the caller. + // + // p3_cfname=foo + // + // If no value was specified then we'll return an empty string which + // later gets translated into using a default CF. + if (tokens.size() == 2) { + *per_part_match_found = true; + return tokens[1]; + } else { + return ""; + } + } + } + + // At this point we tried to search for a custom CF name for a partition, + // but none was specified. Therefore default one will be used. + return ""; + } + + return key_comment; +} + int ha_rocksdb::write_row(uchar *const buf) { DBUG_ENTER_FUNC(); @@ -7188,7 +7323,7 @@ int ha_rocksdb::finalize_bulk_load() { return rc; } - mysql_mutex_lock(&m_bulk_load_mutex); + RDB_MUTEX_LOCK_CHECK(m_bulk_load_mutex); /* We need this check because it's possible that m_sst_info has been @@ -7203,7 +7338,8 @@ int ha_rocksdb::finalize_bulk_load() { mysql prints via my_printf_error. */ sql_print_error("Failed to commit bulk loaded sst file to the " - "data store (%s)", m_sst_info->error_message().c_str()); + "data store (%s)", + m_sst_info->error_message().c_str()); my_printf_error(ER_UNKNOWN_ERROR, "Failed to commit bulk loaded sst file to the " @@ -7217,7 +7353,8 @@ int ha_rocksdb::finalize_bulk_load() { m_bulk_load_tx = nullptr; } - mysql_mutex_unlock(&m_bulk_load_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_bulk_load_mutex); + return rc; } @@ -7462,7 +7599,7 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, } /* - Open a cursor and position it at the passed record + Open a cursor */ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, @@ -7522,12 +7659,6 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, } m_scan_it_skips_bloom = skip_bloom; } - /* - Seek() will "Position at the first key in the source that at or past - target". - The operation cannot fail. - */ - m_scan_it->Seek(*slice); } void ha_rocksdb::release_scan_iterator() { @@ -7551,6 +7682,7 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() { rocksdb::Slice table_key((const char *)m_pk_packed_tuple, key_size); setup_scan_iterator(*m_pk_descr, &table_key); + m_scan_it->Seek(table_key); m_skip_scan_it_next_call = true; } @@ -7859,20 +7991,33 @@ int ha_rocksdb::info(uint flag) { update_stats(); } - if (stats.records == 0) { - // most likely, the table is in memtable - // try to deduce from GetApproximateSizes + // if number of records is hardcoded, we do not want to force computation + // of memtable cardinalities + if (stats.records == 0 || + (rocksdb_force_compute_memtable_stats && + rocksdb_debug_optimizer_n_rows == 0)) + { + // First, compute SST files stats uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; auto r = get_range(pk_index(table, m_tbl_def), buf); uint64_t sz = 0; -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz, true); -#pragma GCC diagnostic pop + uint8_t include_flags = rocksdb::DB::INCLUDE_FILES; + // recompute SST files stats only if records count is 0 + if (stats.records == 0) { + rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz, + include_flags); + stats.records+= sz/ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE; + stats.data_file_length+= sz; + } - stats.records = sz / ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE; - stats.data_file_length = sz; + // Second, compute memtable stats + uint64_t memtableCount; + uint64_t memtableSize; + rdb->GetApproximateMemTableStats(m_pk_descr->get_cf(), r, + &memtableCount, &memtableSize); + stats.records += memtableCount; + stats.data_file_length += memtableSize; if (rocksdb_debug_optimizer_n_rows > 0) stats.records = rocksdb_debug_optimizer_n_rows; @@ -8301,12 +8446,36 @@ ha_rocksdb::get_range(const int &i, return myrocks::get_range(*m_key_descr_arr[i], buf); } +static bool is_myrocks_index_empty( + rocksdb::ColumnFamilyHandle *cfh, const bool is_reverse_cf, + const rocksdb::ReadOptions &read_opts, + const uint index_id) +{ + bool index_removed = false; + uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0}; + rdb_netbuf_store_uint32(key_buf, index_id); + const rocksdb::Slice key = + rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)); + std::unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(read_opts, cfh)); + rocksdb_smart_seek(is_reverse_cf, it.get(), key); + if (!it->Valid()) { + index_removed = true; + } else { + if (memcmp(it->key().data(), key_buf, + Rdb_key_def::INDEX_NUMBER_SIZE)) { + // Key does not have same prefix + index_removed = true; + } + } + return index_removed; +} + /* Drop index thread's main logic */ void Rdb_drop_index_thread::run() { - mysql_mutex_lock(&m_signal_mutex); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); for (;;) { // The stop flag might be set by shutdown command @@ -8331,7 +8500,7 @@ void Rdb_drop_index_thread::run() { } // make sure, no program error is returned DBUG_ASSERT(ret == 0 || ret == ETIMEDOUT); - mysql_mutex_unlock(&m_signal_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex); std::unordered_set<GL_INDEX_ID> indices; dict_manager.get_ongoing_drop_indexes(&indices); @@ -8353,11 +8522,11 @@ void Rdb_drop_index_thread::run() { DBUG_ASSERT(cfh); const bool is_reverse_cf = cf_flags & Rdb_key_def::REVERSE_CF_FLAG; - bool index_removed = false; - uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0}; - rdb_netbuf_store_uint32(key_buf, d.index_id); - const rocksdb::Slice key = - rocksdb::Slice((char *)key_buf, sizeof(key_buf)); + if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) + { + finished.insert(d); + continue; + } uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; rocksdb::Range range = get_range(d.index_id, buf, is_reverse_cf ? 1 : 0, is_reverse_cf ? 0 : 1); @@ -8381,25 +8550,8 @@ void Rdb_drop_index_thread::run() { } rdb_handle_io_error(status, RDB_IO_ERROR_BG_THREAD); } - std::unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(read_opts, cfh)); - it->Seek(key); - if (is_reverse_cf) { - if (!it->Valid()) { - it->SeekToLast(); - } else { - it->Prev(); - } - } - if (!it->Valid()) { - index_removed = true; - } else { - if (memcmp(it->key().data(), key_buf, - Rdb_key_def::INDEX_NUMBER_SIZE)) { - // Key does not have same prefix - index_removed = true; - } - } - if (index_removed) { + if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) + { finished.insert(d); } } @@ -8408,10 +8560,10 @@ void Rdb_drop_index_thread::run() { dict_manager.finish_drop_indexes(finished); } } - mysql_mutex_lock(&m_signal_mutex); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); } - mysql_mutex_unlock(&m_signal_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex); } Rdb_tbl_def *ha_rocksdb::get_table_if_exists(const char *const tablename) { @@ -8683,10 +8835,12 @@ ha_rows ha_rocksdb::records_in_range(uint inx, key_range *const min_key, #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" // Getting statistics, including from Memtables - rdb->GetApproximateSizes(kd.get_cf(), &r, 1, &sz, true); -#pragma GCC diagnostic pop - + uint8_t include_flags = rocksdb::DB::INCLUDE_FILES; + rdb->GetApproximateSizes(kd.get_cf(), &r, 1, &sz, include_flags); ret = rows * sz / disk_size; + uint64_t memTableCount; + rdb->GetApproximateMemTableStats(kd.get_cf(), r, &memTableCount, &sz); + ret += memTableCount; /* GetApproximateSizes() gives estimates so ret might exceed stats.records. @@ -8764,6 +8918,7 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, std::unordered_map<rocksdb::ColumnFamilyHandle *, std::vector<rocksdb::Range>> ranges; std::unordered_set<GL_INDEX_ID> ids_to_check; + std::unordered_map<GL_INDEX_ID, uint> ids_to_keyparts; std::vector<uchar> buf(table_arg->s->keys * 2 * Rdb_key_def::INDEX_NUMBER_SIZE); for (uint i = 0; i < table_arg->s->keys; i++) { @@ -8771,6 +8926,7 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, const Rdb_key_def &kd = *m_key_descr_arr[i]; ranges[kd.get_cf()].push_back(get_range(i, bufp)); ids_to_check.insert(kd.get_gl_index_id()); + ids_to_keyparts[kd.get_gl_index_id()] = kd.get_key_parts(); } // for analyze statements, force flush on memtable to get accurate cardinality @@ -8800,6 +8956,8 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd, // Initialize the stats to 0. If there are no files that contain // this gl_index_id, then 0 should be stored for the cached stats. stats[it] = Rdb_index_stats(it); + DBUG_ASSERT(ids_to_keyparts.count(it) > 0); + stats[it].m_distinct_keys_per_prefix.resize(ids_to_keyparts[it]); } for (const auto &it : props) { std::vector<Rdb_index_stats> sst_stats; @@ -9271,6 +9429,15 @@ int ha_rocksdb::inplace_populate_sk( dict_manager.add_create_index(create_index_ids, batch); dict_manager.commit(batch); + /* + Add uncommitted key definitons to ddl_manager. We need to do this + so that the property collector can find this keydef when it needs to + update stats. The property collector looks for the keydef in the + data dictionary, but it won't be there yet since this key definition + is still in the creation process. + */ + ddl_manager.add_uncommitted_keydefs(indexes); + const bool hidden_pk_exists = has_hidden_pk(table); int res = 0; @@ -9489,6 +9656,8 @@ bool ha_rocksdb::commit_inplace_alter_table( Rdb_inplace_alter_ctx *const ctx0 = static_cast<Rdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx); + DEBUG_SYNC(ha_thd(), "rocksdb.commit_in_place_alter_table"); + /* IMPORTANT: When rollback is requested, mysql will abort with an assertion failure. That means every failed commit during inplace alter @@ -9523,6 +9692,9 @@ bool ha_rocksdb::commit_inplace_alter_table( delete ctx0->m_new_tdef; } + /* Remove uncommitted key definitons from ddl_manager */ + ddl_manager.remove_uncommitted_keydefs(ctx0->m_added_indexes); + /* Rollback any partially created indexes */ dict_manager.rollback_ongoing_index_creation(); @@ -9582,6 +9754,12 @@ bool ha_rocksdb::commit_inplace_alter_table( */ DBUG_ASSERT(0); } + + /* + Remove uncommitted key definitons from ddl_manager, as they are now + committed into the data dictionary. + */ + ddl_manager.remove_uncommitted_keydefs(ctx->m_added_indexes); } if (dict_manager.commit(batch)) { @@ -9750,6 +9928,7 @@ static void myrocks_update_status() { export_stats.rows_inserted = global_stats.rows[ROWS_INSERTED]; export_stats.rows_read = global_stats.rows[ROWS_READ]; export_stats.rows_updated = global_stats.rows[ROWS_UPDATED]; + export_stats.rows_deleted_blind = global_stats.rows[ROWS_DELETED_BLIND]; export_stats.system_rows_deleted = global_stats.system_rows[ROWS_DELETED]; export_stats.system_rows_inserted = global_stats.system_rows[ROWS_INSERTED]; @@ -9765,6 +9944,8 @@ static SHOW_VAR myrocks_status_variables[] = { DEF_STATUS_VAR_FUNC("rows_read", &export_stats.rows_read, SHOW_LONGLONG), DEF_STATUS_VAR_FUNC("rows_updated", &export_stats.rows_updated, SHOW_LONGLONG), + DEF_STATUS_VAR_FUNC("rows_deleted_blind", + &export_stats.rows_deleted_blind, SHOW_LONGLONG), DEF_STATUS_VAR_FUNC("system_rows_deleted", &export_stats.system_rows_deleted, SHOW_LONGLONG), DEF_STATUS_VAR_FUNC("system_rows_inserted", @@ -9870,7 +10051,7 @@ void Rdb_background_thread::run() { // Wait until the next timeout or until we receive a signal to stop the // thread. Request to stop the thread should only be triggered when the // storage engine is being unloaded. - mysql_mutex_lock(&m_signal_mutex); + RDB_MUTEX_LOCK_CHECK(m_signal_mutex); const auto ret MY_ATTRIBUTE((__unused__)) = mysql_cond_timedwait(&m_signal_cond, &m_signal_mutex, &ts_next_sync); @@ -9879,7 +10060,7 @@ void Rdb_background_thread::run() { const bool local_stop = m_stop; const bool local_save_stats = m_save_stats; reset(); - mysql_mutex_unlock(&m_signal_mutex); + RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex); if (local_stop) { // If we're here then that's because condition variable was signaled by @@ -9963,11 +10144,8 @@ bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, shorter require all parts of the key to be available for the short key match. */ - if (use_all_keys && prefix_extractor->InRange(eq_cond)) - can_use = true; - else if (!is_ascending) - can_use = false; - else if (prefix_extractor->SameResultWhenAppended(eq_cond)) + if ((use_all_keys && prefix_extractor->InRange(eq_cond)) + || prefix_extractor->SameResultWhenAppended(eq_cond)) can_use = true; else can_use = false; @@ -10138,7 +10316,7 @@ void rocksdb_set_table_stats_sampling_pct( my_core::THD *const thd MY_ATTRIBUTE((__unused__)), my_core::st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)), void *const var_ptr MY_ATTRIBUTE((__unused__)), const void *const save) { - mysql_mutex_lock(&rdb_sysvars_mutex); + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); const uint32_t new_val = *static_cast<const uint32_t *>(save); @@ -10151,7 +10329,7 @@ void rocksdb_set_table_stats_sampling_pct( } } - mysql_mutex_unlock(&rdb_sysvars_mutex); + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } /* @@ -10185,6 +10363,15 @@ void rocksdb_set_rate_limiter_bytes_per_sec( } } +void rocksdb_set_delayed_write_rate(THD *thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) { + const uint64_t new_val = *static_cast<const uint64_t *>(save); + if (rocksdb_delayed_write_rate != new_val) { + rocksdb_delayed_write_rate = new_val; + rocksdb_db_options.delayed_write_rate = new_val; + } +} + void rdb_set_collation_exception_list(const char *const exception_list) { DBUG_ASSERT(rdb_collation_exceptions != nullptr); @@ -10200,7 +10387,7 @@ void rocksdb_set_collation_exception_list(THD *const thd, const void *const save) { const char *const val = *static_cast<const char *const *>(save); - rdb_set_collation_exception_list(val); + rdb_set_collation_exception_list(val == nullptr ? "" : val); *static_cast<const char **>(var_ptr) = val; } @@ -10229,13 +10416,15 @@ static void rocksdb_set_max_background_compactions( const void *const save) { DBUG_ASSERT(save != nullptr); - mysql_mutex_lock(&rdb_sysvars_mutex); + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); + rocksdb_db_options.max_background_compactions = *static_cast<const int *>(save); rocksdb_db_options.env->SetBackgroundThreads( rocksdb_db_options.max_background_compactions, rocksdb::Env::Priority::LOW); - mysql_mutex_unlock(&rdb_sysvars_mutex); + + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } void rdb_queue_save_stats_request() { rdb_bg_thread.request_save_stats(); } |