diff options
author | Sergei Petrunia <psergey@askmonty.org> | 2019-06-16 00:28:33 +0300 |
---|---|---|
committer | Sergei Petrunia <psergey@askmonty.org> | 2019-06-16 00:28:33 +0300 |
commit | 9ab0d7b4e9c48e881157728a4688c0bea9256a32 (patch) | |
tree | 2376691c32692cbef0648e9d3deb31bac6e935c3 /storage/rocksdb/ha_rocksdb.cc | |
parent | 91be2212c69d727d0b104f70cb32c4639397338c (diff) | |
parent | 5173e396ffce706954f7302e9854126aadab4c54 (diff) | |
download | mariadb-git-9ab0d7b4e9c48e881157728a4688c0bea9256a32.tar.gz |
Merge from MyRocks upstream:
Copy of
commit dcd9379eb5707bc7514a2ff4d9127790356505cb
Author: Manuel Ung <mung@fb.com>
Date: Fri Jun 14 10:38:17 2019 -0700
Skip valgrind for rocksdb.force_shutdown
Summary:
This test does unclean shutdown, and leaks memory.
Squash with: D15749084
Reviewed By: hermanlee
Differential Revision: D15828957
fbshipit-source-id: 30541455d74
Diffstat (limited to 'storage/rocksdb/ha_rocksdb.cc')
-rw-r--r-- | storage/rocksdb/ha_rocksdb.cc | 3929 |
1 files changed, 2117 insertions, 1812 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index cb289fb0077..3f1db15a426 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -15,7 +15,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ #ifdef USE_PRAGMA_IMPLEMENTATION -#pragma implementation // gcc: Class implementation +#pragma implementation // gcc: Class implementation #endif #define MYSQL_SERVER 1 @@ -31,8 +31,8 @@ #include "./ha_rocksdb.h" /* C++ standard header files */ -#include <algorithm> #include <inttypes.h> +#include <algorithm> #include <limits> #include <map> #include <queue> @@ -41,6 +41,9 @@ #include <vector> /* MySQL includes */ +#include <mysql/psi/mysql_table.h> +// #include <thread_pool_priv.h> +#include <mysys_err.h> #include "./debug_sync.h" #include "./my_bit.h" #include "./my_stacktrace.h" @@ -48,20 +51,14 @@ #include "./sql_audit.h" #include "./sql_table.h" #include "./sql_hset.h" -#include <mysql/psi/mysql_table.h> #ifdef MARIAROCKS_NOT_YET -#include <mysql/thread_pool_priv.h> #endif -#include <mysys_err.h> - -// Both MySQL and RocksDB define the same constant. To avoid compilation errors -// till we make the fix in RocksDB, we'll temporary undefine it here. -#undef CACHE_LINE_SIZE /* RocksDB includes */ #include "monitoring/histogram.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" +#include "rocksdb/memory_allocator.h" #include "rocksdb/persistent_cache.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/slice_transform.h" @@ -70,6 +67,7 @@ #include "rocksdb/utilities/convenience.h" #include "rocksdb/utilities/memory_util.h" #include "rocksdb/utilities/sim_cache.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "util/stop_watch.h" #include "./rdb_source_revision.h" @@ -77,8 +75,10 @@ #include "./event_listener.h" #include "./ha_rocksdb_proto.h" #include "./logger.h" +#include "./nosql_access.h" #include "./rdb_cf_manager.h" #include "./rdb_cf_options.h" +#include "./rdb_converter.h" #include "./rdb_datadic.h" #include "./rdb_i_s.h" #include "./rdb_index_merge.h" @@ -100,18 +100,19 @@ void thd_mark_transaction_to_rollback(MYSQL_THD thd, bool all); * Get the user thread's binary logging format * @param thd user thread * @return Value to be used as index into the binlog_format_names array -*/ + */ int thd_binlog_format(const MYSQL_THD thd); /** * Check if binary logging is filtered for thread's current db. * @param thd Thread handle * @retval 1 the query is not filtered, 0 otherwise. -*/ + */ bool thd_binlog_filter_ok(const MYSQL_THD thd); } MYSQL_PLUGIN_IMPORT bool my_disable_leak_check; +extern my_bool opt_core_file; // Needed in rocksdb_init_func void ignore_db_dirs_append(const char *dirname_arg); @@ -128,22 +129,14 @@ const std::string DEFAULT_CF_NAME("default"); const std::string DEFAULT_SYSTEM_CF_NAME("__system__"); const std::string PER_INDEX_CF_NAME("$per_index_cf"); -class Rdb_explicit_snapshot; - -std::mutex explicit_snapshot_mutex; -ulonglong explicit_snapshot_counter = 0; -std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>> - explicit_snapshots; static std::vector<GL_INDEX_ID> rdb_indexes_to_recalc; #ifdef MARIADB_NOT_YET class Rdb_explicit_snapshot : public explicit_snapshot { - std::unique_ptr<rocksdb::ManagedSnapshot> snapshot; - public: - static std::shared_ptr<Rdb_explicit_snapshot> - create(snapshot_info_st *ss_info, rocksdb::DB *db, - const rocksdb::Snapshot *snapshot) { + static std::shared_ptr<Rdb_explicit_snapshot> create( + snapshot_info_st *ss_info, rocksdb::DB *db, + const rocksdb::Snapshot *snapshot) { std::lock_guard<std::mutex> lock(explicit_snapshot_mutex); auto s = std::unique_ptr<rocksdb::ManagedSnapshot>( new rocksdb::ManagedSnapshot(db, snapshot)); @@ -159,8 +152,24 @@ class Rdb_explicit_snapshot : public explicit_snapshot { return ret; } - static std::shared_ptr<Rdb_explicit_snapshot> - get(const ulonglong snapshot_id) { + static std::string dump_snapshots() { + std::string str; + std::lock_guard<std::mutex> lock(explicit_snapshot_mutex); + for (const auto &elem : explicit_snapshots) { + const auto &ss = elem.second.lock(); + DBUG_ASSERT(ss != nullptr); + const auto &info = ss->ss_info; + str += "\nSnapshot ID: " + std::to_string(info.snapshot_id) + + "\nBinlog File: " + info.binlog_file + + "\nBinlog Pos: " + std::to_string(info.binlog_pos) + + "\nGtid Executed: " + info.gtid_executed + "\n"; + } + + return str; + } + + static std::shared_ptr<Rdb_explicit_snapshot> get( + const ulonglong snapshot_id) { std::lock_guard<std::mutex> lock(explicit_snapshot_mutex); auto elem = explicit_snapshots.find(snapshot_id); if (elem == explicit_snapshots.end()) { @@ -172,14 +181,27 @@ class Rdb_explicit_snapshot : public explicit_snapshot { rocksdb::ManagedSnapshot *get_snapshot() { return snapshot.get(); } Rdb_explicit_snapshot(snapshot_info_st ss_info, - std::unique_ptr<rocksdb::ManagedSnapshot> snapshot) + std::unique_ptr<rocksdb::ManagedSnapshot> &&snapshot) : explicit_snapshot(ss_info), snapshot(std::move(snapshot)) {} virtual ~Rdb_explicit_snapshot() { std::lock_guard<std::mutex> lock(explicit_snapshot_mutex); explicit_snapshots.erase(ss_info.snapshot_id); } + + private: + std::unique_ptr<rocksdb::ManagedSnapshot> snapshot; + + static std::mutex explicit_snapshot_mutex; + static ulonglong explicit_snapshot_counter; + static std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>> + explicit_snapshots; }; + +std::mutex Rdb_explicit_snapshot::explicit_snapshot_mutex; +ulonglong Rdb_explicit_snapshot::explicit_snapshot_counter = 0; +std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>> + Rdb_explicit_snapshot::explicit_snapshots; #endif /** @@ -188,10 +210,11 @@ class Rdb_explicit_snapshot : public explicit_snapshot { void ha_rocksdb::update_row_stats(const operation_type &type) { DBUG_ASSERT(type < ROWS_MAX); // Find if we are modifying system databases. - if (table->s && m_tbl_def->m_is_mysql_system_table) + if (table->s && m_tbl_def->m_is_mysql_system_table) { global_stats.system_rows[type].inc(); - else + } else { global_stats.rows[type].inc(); + } } void dbug_dump_database(rocksdb::DB *db); @@ -199,8 +222,8 @@ static handler *rocksdb_create_handler(my_core::handlerton *hton, my_core::TABLE_SHARE *table_arg, my_core::MEM_ROOT *mem_root); -static rocksdb::CompactRangeOptions -getCompactRangeOptions(int concurrency = 0) { +static rocksdb::CompactRangeOptions getCompactRangeOptions( + int concurrency = 0) { rocksdb::CompactRangeOptions compact_range_options; compact_range_options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce; @@ -261,37 +284,77 @@ static void rocksdb_flush_all_memtables() { } } +static void rocksdb_delete_column_family_stub( + THD *const /* thd */, struct st_mysql_sys_var *const /* var */, + void *const /* var_ptr */, const void *const /* save */) {} + +static int rocksdb_delete_column_family( + THD *const /* thd */, struct st_mysql_sys_var *const /* var */, + void *const /* var_ptr */, struct st_mysql_value *const value) { + // Return failure for now until the race condition between creating + // CF and deleting CF is resolved + return HA_EXIT_FAILURE; + + char buff[STRING_BUFFER_USUAL_SIZE]; + int len = sizeof(buff); + + DBUG_ASSERT(value != nullptr); + + if (const char *const cf = value->val_str(value, buff, &len)) { + auto &cf_manager = rdb_get_cf_manager(); + auto ret = cf_manager.drop_cf(cf); + if (ret == HA_EXIT_SUCCESS) { + // NO_LINT_DEBUG + sql_print_information("RocksDB: Dropped column family: %s\n", cf); + } else { + // NO_LINT_DEBUG + sql_print_error("RocksDB: Failed to drop column family: %s, error: %d\n", + cf, ret); + } + + return ret; + } + + return HA_EXIT_SUCCESS; +} + /////////////////////////////////////////////////////////// // Hash map: table name => open table handler /////////////////////////////////////////////////////////// -namespace // anonymous namespace = not visible outside this source file +namespace // anonymous namespace = not visible outside this source file { const ulong TABLE_HASH_SIZE = 32; typedef Hash_set<Rdb_table_handler> Rdb_table_set; -struct Rdb_open_tables_map { +class Rdb_open_tables_map { + private: /* Hash table used to track the handlers of open tables */ - Rdb_table_set m_hash; + std::unordered_map<std::string, Rdb_table_handler *> m_table_map; + /* The mutex used to protect the hash table */ mutable mysql_mutex_t m_mutex; - static uchar *get_hash_key(const Rdb_table_handler *const table_handler, - size_t *const length, - my_bool not_used MY_ATTRIBUTE((__unused__))); + public: + void init() { + m_table_map.clear(); + mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &m_mutex, MY_MUTEX_INIT_FAST); + } + + void free() { + m_table_map.clear(); + mysql_mutex_destroy(&m_mutex); + } + size_t count() { return m_table_map.size(); } Rdb_table_handler *get_table_handler(const char *const table_name); void release_table_handler(Rdb_table_handler *const table_handler); - Rdb_open_tables_map() : m_hash(get_hash_key, system_charset_info) { } - - void free_hash(void) { m_hash.~Rdb_table_set(); } - std::vector<std::string> get_table_names(void) const; }; -} // anonymous namespace +} // anonymous namespace static Rdb_open_tables_map rdb_open_tables; @@ -326,6 +389,7 @@ static int rocksdb_create_checkpoint( status = checkpoint->CreateCheckpoint(checkpoint_dir.c_str()); delete checkpoint; if (status.ok()) { + // NO_LINT_DEBUG sql_print_information( "RocksDB: created checkpoint in directory : %s\n", checkpoint_dir.c_str()); @@ -355,6 +419,7 @@ static void rocksdb_force_flush_memtable_now_stub( static int rocksdb_force_flush_memtable_now( THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr, struct st_mysql_value *const value) { + // NO_LINT_DEBUG sql_print_information("RocksDB: Manual memtable flush."); rocksdb_flush_all_memtables(); return HA_EXIT_SUCCESS; @@ -367,6 +432,7 @@ static void rocksdb_force_flush_memtable_and_lzero_now_stub( static int rocksdb_force_flush_memtable_and_lzero_now( THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr, struct st_mysql_value *const value) { + // NO_LINT_DEBUG sql_print_information("RocksDB: Manual memtable and L0 flush."); rocksdb_flush_all_memtables(); @@ -375,29 +441,46 @@ static int rocksdb_force_flush_memtable_and_lzero_now( rocksdb::ColumnFamilyMetaData metadata; rocksdb::ColumnFamilyDescriptor cf_descr; + int i, max_attempts = 3, num_errors = 0; + for (const auto &cf_handle : cf_manager.get_all_cf()) { - rdb->GetColumnFamilyMetaData(cf_handle, &metadata); - cf_handle->GetDescriptor(&cf_descr); - c_options.output_file_size_limit = cf_descr.options.target_file_size_base; + for (i = 0; i < max_attempts; i++) { + rdb->GetColumnFamilyMetaData(cf_handle, &metadata); + cf_handle->GetDescriptor(&cf_descr); + c_options.output_file_size_limit = cf_descr.options.target_file_size_base; + + DBUG_ASSERT(metadata.levels[0].level == 0); + std::vector<std::string> file_names; + for (auto &file : metadata.levels[0].files) { + file_names.emplace_back(file.db_path + file.name); + } - DBUG_ASSERT(metadata.levels[0].level == 0); - std::vector<std::string> file_names; - for (auto &file : metadata.levels[0].files) { - file_names.emplace_back(file.db_path + file.name); - } + if (file_names.empty()) { + break; + } - if (!file_names.empty()) { rocksdb::Status s; s = rdb->CompactFiles(c_options, cf_handle, file_names, 1); + // Due to a race, it's possible for CompactFiles to collide + // with auto compaction, causing an error to return + // regarding file not found. In that case, retry. + if (s.IsInvalidArgument()) { + continue; + } + if (!s.ok() && !s.IsAborted()) { rdb_handle_io_error(s, RDB_IO_ERROR_GENERAL); return HA_EXIT_FAILURE; } + break; + } + if (i == max_attempts) { + num_errors++; } } - return HA_EXIT_SUCCESS; + return num_errors == 0 ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE; } static void rocksdb_drop_index_wakeup_thread( @@ -468,11 +551,9 @@ static void rocksdb_set_update_cf_options(THD *thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save); -static int rocksdb_check_bulk_load(THD *const thd, - struct st_mysql_sys_var *var - MY_ATTRIBUTE((__unused__)), - void *save, - struct st_mysql_value *value); +static int rocksdb_check_bulk_load( + THD *const thd, struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), + void *save, struct st_mysql_value *value); static int rocksdb_check_bulk_load_allow_unsorted( THD *const thd, struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), @@ -499,6 +580,8 @@ static int rocksdb_validate_set_block_cache_size( static long long rocksdb_block_cache_size; static long long rocksdb_sim_cache_size; static my_bool rocksdb_use_clock_cache; +static double rocksdb_cache_high_pri_pool_ratio; +static my_bool rocksdb_cache_dump; /* Use unsigned long long instead of uint64_t because of MySQL compatibility */ static unsigned long long // NOLINT(runtime/int) rocksdb_rate_limiter_bytes_per_sec; @@ -518,8 +601,10 @@ static my_bool rocksdb_force_compute_memtable_stats; static uint32_t rocksdb_force_compute_memtable_stats_cachetime; static my_bool rocksdb_debug_optimizer_no_zero_cardinality; static uint32_t rocksdb_wal_recovery_mode; +static uint32_t rocksdb_stats_level; static uint32_t rocksdb_access_hint_on_compaction_start; static char *rocksdb_compact_cf_name; +static char *rocksdb_delete_cf_name; static char *rocksdb_checkpoint_name; static my_bool rocksdb_signal_drop_index_thread; static my_bool rocksdb_signal_remove_mariabackup_checkpoint; @@ -555,10 +640,21 @@ char *compression_types_val= const_cast<char*>(get_rocksdb_supported_compression_types()); static unsigned long rocksdb_write_policy = rocksdb::TxnDBWritePolicy::WRITE_COMMITTED; +char *rocksdb_read_free_rpl_tables; +std::mutex rocksdb_read_free_rpl_tables_mutex; +#if defined(HAVE_PSI_INTERFACE) +Regex_list_handler rdb_read_free_regex_handler(key_rwlock_read_free_rpl_tables); +#else +Regex_list_handler rdb_read_free_regex_handler; +#endif +enum read_free_rpl_type { OFF = 0, PK_ONLY, PK_SK }; +static uint64_t rocksdb_read_free_rpl = read_free_rpl_type::OFF; static my_bool rocksdb_error_on_suboptimal_collation = 1; static uint32_t rocksdb_stats_recalc_rate = 0; static uint32_t rocksdb_debug_manual_compaction_delay = 0; static uint32_t rocksdb_max_manual_compactions = 0; +static my_bool rocksdb_rollback_on_timeout = FALSE; +static my_bool rocksdb_enable_insert_with_update_caching = TRUE; std::atomic<uint64_t> rocksdb_row_lock_deadlocks(0); std::atomic<uint64_t> rocksdb_row_lock_wait_timeouts(0); @@ -566,6 +662,9 @@ std::atomic<uint64_t> rocksdb_snapshot_conflict_errors(0); std::atomic<uint64_t> rocksdb_wal_group_syncs(0); std::atomic<uint64_t> rocksdb_manual_compactions_processed(0); std::atomic<uint64_t> rocksdb_manual_compactions_running(0); +#ifndef DBUG_OFF +std::atomic<uint64_t> rocksdb_num_get_for_update_calls(0); +#endif @@ -635,7 +734,7 @@ static std::unique_ptr<rocksdb::DBOptions> rdb_init_rocksdb_db_options(void) { o->listeners.push_back(std::make_shared<Rdb_event_listener>(&ddl_manager)); o->info_log_level = rocksdb::InfoLogLevel::INFO_LEVEL; o->max_subcompactions = DEFAULT_SUBCOMPACTIONS; - o->max_open_files = -2; // auto-tune to 50% open_files_limit + o->max_open_files = -2; // auto-tune to 50% open_files_limit o->two_write_queues = true; o->manual_wal_flush = true; @@ -659,6 +758,13 @@ static TYPELIB write_policy_typelib = {array_elements(write_policy_names) - 1, "write_policy_typelib", write_policy_names, nullptr}; +/* This array needs to be kept up to date with myrocks::read_free_rpl_type */ +static const char *read_free_rpl_names[] = {"OFF", "PK_ONLY", "PK_SK", NullS}; + +static TYPELIB read_free_rpl_typelib = {array_elements(read_free_rpl_names) - 1, + "read_free_rpl_typelib", + read_free_rpl_names, nullptr}; + /* This enum needs to be kept up to date with rocksdb::InfoLogLevel */ static const char *info_log_level_names[] = {"debug_level", "info_level", "warn_level", "error_level", @@ -680,6 +786,23 @@ static void rocksdb_set_rocksdb_info_log_level( RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); } +static void rocksdb_set_rocksdb_stats_level(THD *const thd, + struct st_mysql_sys_var *const var, + void *const var_ptr, + const void *const save) { + DBUG_ASSERT(save != nullptr); + + RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); + rocksdb_db_options->statistics->set_stats_level( + static_cast<const rocksdb::StatsLevel>( + *static_cast<const uint64_t *>(save))); + // Actual stats level is defined at rocksdb dbopt::statistics::stats_level_ + // so adjusting rocksdb_stats_level here to make sure it points to + // the correct stats level. + rocksdb_stats_level = rocksdb_db_options->statistics->get_stats_level(); + RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); +} + static void rocksdb_set_reset_stats( my_core::THD *const /* unused */, my_core::st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)), @@ -804,7 +927,7 @@ static MYSQL_THDVAR_ULONG(deadlock_detect_depth, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_BOOL( commit_time_batch_for_recovery, PLUGIN_VAR_RQCMDARG, "TransactionOptions::commit_time_batch_for_recovery for RocksDB", nullptr, - nullptr, FALSE); + nullptr, TRUE); static MYSQL_THDVAR_BOOL( trace_sst_api, PLUGIN_VAR_RQCMDARG, @@ -844,10 +967,11 @@ static MYSQL_THDVAR_STR(tmpdir, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, "Directory for temporary files during DDL operations.", nullptr, nullptr, ""); +#define DEFAULT_SKIP_UNIQUE_CHECK_TABLES ".*" static MYSQL_THDVAR_STR( skip_unique_check_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, "Skip unique constraint checking for the specified tables", nullptr, - nullptr, ".*"); + nullptr, DEFAULT_SKIP_UNIQUE_CHECK_TABLES); static MYSQL_THDVAR_BOOL( commit_in_the_middle, PLUGIN_VAR_RQCMDARG, @@ -861,11 +985,80 @@ static MYSQL_THDVAR_BOOL( " Blind delete is disabled if the table has secondary key", nullptr, nullptr, FALSE); -static MYSQL_THDVAR_STR( - read_free_rpl_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, +static const char *DEFAULT_READ_FREE_RPL_TABLES = ".*"; + +static int rocksdb_validate_read_free_rpl_tables( + THD *thd MY_ATTRIBUTE((__unused__)), + struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), void *save, + struct st_mysql_value *value) { + char buff[STRING_BUFFER_USUAL_SIZE]; + int length = sizeof(buff); + const char *wlist_buf = value->val_str(value, buff, &length); + const auto wlist = wlist_buf ? wlist_buf : DEFAULT_READ_FREE_RPL_TABLES; + +#if defined(HAVE_PSI_INTERFACE) + Regex_list_handler regex_handler(key_rwlock_read_free_rpl_tables); +#else + Regex_list_handler regex_handler; +#endif + + if (!regex_handler.set_patterns(wlist)) { + warn_about_bad_patterns(®ex_handler, "rocksdb_read_free_rpl_tables"); + return HA_EXIT_FAILURE; + } + + *static_cast<const char **>(save) = my_strdup(wlist, MYF(MY_WME)); + return HA_EXIT_SUCCESS; +} + +static void rocksdb_update_read_free_rpl_tables( + THD *thd MY_ATTRIBUTE((__unused__)), + struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), void *var_ptr, + const void *save) { + const auto wlist = *static_cast<const char *const *>(save); + DBUG_ASSERT(wlist != nullptr); + + // This is bound to succeed since we've already checked for bad patterns in + // rocksdb_validate_read_free_rpl_tables + rdb_read_free_regex_handler.set_patterns(wlist); + + // update all table defs + struct Rdb_read_free_rpl_updater : public Rdb_tables_scanner { + int add_table(Rdb_tbl_def *tdef) override { + tdef->check_and_set_read_free_rpl_table(); + return HA_EXIT_SUCCESS; + } + } updater; + ddl_manager.scan_for_tables(&updater); + + if (wlist == DEFAULT_READ_FREE_RPL_TABLES) { + // If running SET var = DEFAULT, then rocksdb_validate_read_free_rpl_tables + // isn't called, and memory is never allocated for the value. Allocate it + // here. + *static_cast<const char **>(var_ptr) = my_strdup(wlist, MYF(MY_WME)); + } else { + // Otherwise, we just reuse the value allocated from + // rocksdb_validate_read_free_rpl_tables. + *static_cast<const char **>(var_ptr) = wlist; + } +} + +static MYSQL_SYSVAR_STR( + read_free_rpl_tables, rocksdb_read_free_rpl_tables, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC /*| PLUGIN_VAR_ALLOCATED*/, "List of tables that will use read-free replication on the slave " "(i.e. not lookup a row during replication)", - nullptr, nullptr, ""); + rocksdb_validate_read_free_rpl_tables, rocksdb_update_read_free_rpl_tables, + DEFAULT_READ_FREE_RPL_TABLES); + +static MYSQL_SYSVAR_ENUM( + read_free_rpl, rocksdb_read_free_rpl, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Use read-free replication on the slave (i.e. no row lookup during " + "replication). Default is OFF, PK_SK will enable it on all tables with " + "primary key. PK_ONLY will enable it on tables where the only key is the " + "primary key (i.e. no secondary keys).", + nullptr, nullptr, read_free_rpl_type::OFF, &read_free_rpl_typelib); static MYSQL_THDVAR_BOOL(skip_bloom_filter_on_read, PLUGIN_VAR_RQCMDARG, "Skip using bloom filter for reads", nullptr, nullptr, @@ -1033,6 +1226,14 @@ static MYSQL_SYSVAR_UINT( /* min */ (uint)rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords, /* max */ (uint)rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords, 0); +static MYSQL_SYSVAR_UINT( + stats_level, rocksdb_stats_level, PLUGIN_VAR_RQCMDARG, + "Statistics Level for RocksDB. Default is 0 (kExceptHistogramOrTimers)", + nullptr, rocksdb_set_rocksdb_stats_level, + /* default */ (uint)rocksdb::StatsLevel::kExceptHistogramOrTimers, + /* min */ (uint)rocksdb::StatsLevel::kExceptHistogramOrTimers, + /* max */ (uint)rocksdb::StatsLevel::kAll, 0); + static MYSQL_SYSVAR_SIZE_T(compaction_readahead_size, rocksdb_db_options->compaction_readahead_size, PLUGIN_VAR_RQCMDARG, @@ -1107,7 +1308,8 @@ static MYSQL_SYSVAR_ULONG( persistent_cache_size_mb, rocksdb_persistent_cache_size_mb, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "Size of cache in MB for BlockBasedTableOptions::persistent_cache " - "for RocksDB", nullptr, nullptr, rocksdb_persistent_cache_size_mb, + "for RocksDB", + nullptr, nullptr, rocksdb_persistent_cache_size_mb, /* min */ 0L, /* max */ ULONG_MAX, 0); static MYSQL_SYSVAR_UINT64_T( @@ -1286,7 +1488,7 @@ static MYSQL_SYSVAR_LONGLONG(block_cache_size, rocksdb_block_cache_size, rocksdb_validate_set_block_cache_size, nullptr, /* default */ RDB_DEFAULT_BLOCK_CACHE_SIZE, /* min */ RDB_MIN_BLOCK_CACHE_SIZE, - /* max */ LONGLONG_MAX, + /* max */ LLONG_MAX, /* Block size */ RDB_MIN_BLOCK_CACHE_SIZE); static MYSQL_SYSVAR_LONGLONG(sim_cache_size, rocksdb_sim_cache_size, @@ -1295,15 +1497,26 @@ static MYSQL_SYSVAR_LONGLONG(sim_cache_size, rocksdb_sim_cache_size, nullptr, /* default */ 0, /* min */ 0, - /* max */ LONGLONG_MAX, + /* max */ LLONG_MAX, /* Block size */ 0); static MYSQL_SYSVAR_BOOL( - use_clock_cache, - rocksdb_use_clock_cache, + use_clock_cache, rocksdb_use_clock_cache, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, - "Use ClockCache instead of default LRUCache for RocksDB", - nullptr, nullptr, false); + "Use ClockCache instead of default LRUCache for RocksDB", nullptr, nullptr, + false); + +static MYSQL_SYSVAR_BOOL(cache_dump, rocksdb_cache_dump, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Include RocksDB block cache content in core dump.", + nullptr, nullptr, true); + +static MYSQL_SYSVAR_DOUBLE(cache_high_pri_pool_ratio, + rocksdb_cache_high_pri_pool_ratio, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "Specify the size of block cache high-pri pool", + nullptr, nullptr, /* default */ 0.0, /* min */ 0.0, + /* max */ 1.0, 0); static MYSQL_SYSVAR_BOOL( cache_index_and_filter_blocks, @@ -1313,6 +1526,14 @@ static MYSQL_SYSVAR_BOOL( "BlockBasedTableOptions::cache_index_and_filter_blocks for RocksDB", nullptr, nullptr, true); +static MYSQL_SYSVAR_BOOL( + cache_index_and_filter_with_high_priority, + *reinterpret_cast<my_bool *>( + &rocksdb_tbl_options->cache_index_and_filter_blocks_with_high_priority), + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, + "cache_index_and_filter_blocks_with_high_priority for RocksDB", nullptr, + nullptr, true); + // When pin_l0_filter_and_index_blocks_in_cache is true, RocksDB will use the // LRU cache, but will always keep the filter & idndex block's handle checked // out (=won't call ShardedLRUCache::Release), plus the parsed out objects @@ -1441,10 +1662,10 @@ static MYSQL_SYSVAR_UINT( nullptr, nullptr, 0, /* min */ 0, /* max */ INT_MAX, 0); static MYSQL_SYSVAR_BOOL(force_compute_memtable_stats, - rocksdb_force_compute_memtable_stats, - PLUGIN_VAR_RQCMDARG, - "Force to always compute memtable stats", - nullptr, nullptr, TRUE); + rocksdb_force_compute_memtable_stats, + PLUGIN_VAR_RQCMDARG, + "Force to always compute memtable stats", nullptr, + nullptr, TRUE); static MYSQL_SYSVAR_UINT(force_compute_memtable_stats_cachetime, rocksdb_force_compute_memtable_stats_cachetime, @@ -1464,6 +1685,10 @@ static MYSQL_SYSVAR_STR(compact_cf, rocksdb_compact_cf_name, rocksdb_compact_column_family, rocksdb_compact_column_family_stub, ""); +static MYSQL_SYSVAR_STR(delete_cf, rocksdb_delete_cf_name, PLUGIN_VAR_RQCMDARG, + "Delete column family", rocksdb_delete_column_family, + rocksdb_delete_column_family_stub, ""); + static MYSQL_SYSVAR_STR(create_checkpoint, rocksdb_checkpoint_name, PLUGIN_VAR_RQCMDARG, "Checkpoint directory", rocksdb_create_checkpoint, @@ -1535,6 +1760,12 @@ static MYSQL_SYSVAR_UINT( "Maximum number of pending + ongoing number of manual compactions.", nullptr, nullptr, /* default */ 10, /* min */ 0, /* max */ UINT_MAX, 0); +static MYSQL_SYSVAR_BOOL( + rollback_on_timeout, rocksdb_rollback_on_timeout, PLUGIN_VAR_OPCMDARG, + "Whether to roll back the complete transaction or a single statement on " + "lock wait timeout (a single statement by default)", + NULL, NULL, FALSE); + static MYSQL_SYSVAR_UINT( debug_manual_compaction_delay, rocksdb_debug_manual_compaction_delay, PLUGIN_VAR_RQCMDARG, @@ -1626,7 +1857,7 @@ static MYSQL_SYSVAR_LONGLONG( rocksdb_compaction_sequential_deletes_file_size, PLUGIN_VAR_RQCMDARG, "Minimum file size required for compaction_sequential_deletes", nullptr, rocksdb_set_compaction_options, 0L, - /* min */ -1L, /* max */ LONGLONG_MAX, 0); + /* min */ -1L, /* max */ LLONG_MAX, 0); static MYSQL_SYSVAR_BOOL( compaction_sequential_deletes_count_sd, @@ -1731,6 +1962,13 @@ static MYSQL_SYSVAR_BOOL(error_on_suboptimal_collation, "collation is used", nullptr, nullptr, TRUE); +static MYSQL_SYSVAR_BOOL( + enable_insert_with_update_caching, + rocksdb_enable_insert_with_update_caching, PLUGIN_VAR_OPCMDARG, + "Whether to enable optimization where we cache the read from a failed " + "insertion attempt in INSERT ON DUPLICATE KEY UPDATE", + nullptr, nullptr, TRUE); + static const int ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE = 100; static struct st_mysql_sys_var *rocksdb_system_variables[] = { @@ -1749,6 +1987,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(commit_in_the_middle), MYSQL_SYSVAR(blind_delete_primary_key), MYSQL_SYSVAR(read_free_rpl_tables), + MYSQL_SYSVAR(read_free_rpl), MYSQL_SYSVAR(bulk_load_size), MYSQL_SYSVAR(merge_buf_size), MYSQL_SYSVAR(enable_bulk_load_api), @@ -1800,6 +2039,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(enable_thread_tracking), MYSQL_SYSVAR(perf_context_level), MYSQL_SYSVAR(wal_recovery_mode), + MYSQL_SYSVAR(stats_level), MYSQL_SYSVAR(access_hint_on_compaction_start), MYSQL_SYSVAR(new_table_reader_for_compaction_inputs), MYSQL_SYSVAR(compaction_readahead_size), @@ -1809,7 +2049,10 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(block_cache_size), MYSQL_SYSVAR(sim_cache_size), MYSQL_SYSVAR(use_clock_cache), + MYSQL_SYSVAR(cache_high_pri_pool_ratio), + MYSQL_SYSVAR(cache_dump), MYSQL_SYSVAR(cache_index_and_filter_blocks), + MYSQL_SYSVAR(cache_index_and_filter_with_high_priority), MYSQL_SYSVAR(pin_l0_filter_and_index_blocks_in_cache), MYSQL_SYSVAR(index_type), MYSQL_SYSVAR(hash_index_allow_collision), @@ -1838,6 +2081,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(debug_optimizer_no_zero_cardinality), MYSQL_SYSVAR(compact_cf), + MYSQL_SYSVAR(delete_cf), MYSQL_SYSVAR(signal_drop_index_thread), MYSQL_SYSVAR(pause_background_work), MYSQL_SYSVAR(enable_2pc), @@ -1883,10 +2127,13 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = { MYSQL_SYSVAR(debug_manual_compaction_delay), MYSQL_SYSVAR(max_manual_compactions), MYSQL_SYSVAR(manual_compaction_threads), + MYSQL_SYSVAR(rollback_on_timeout), + + MYSQL_SYSVAR(enable_insert_with_update_caching), nullptr}; -static rocksdb::WriteOptions -rdb_get_rocksdb_write_options(my_core::THD *const thd) { +static rocksdb::WriteOptions rdb_get_rocksdb_write_options( + my_core::THD *const thd) { rocksdb::WriteOptions opt; opt.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC); @@ -1943,19 +2190,6 @@ static int rocksdb_compact_column_family(THD *const thd, /////////////////////////////////////////////////////////////////////////////////////////// -/** - @brief - Function we use in the creation of our hash to get key. -*/ - -uchar * -Rdb_open_tables_map::get_hash_key(const Rdb_table_handler *const table_handler, - size_t *const length, - my_bool not_used MY_ATTRIBUTE((__unused__))) { - *length = table_handler->m_table_name_length; - return reinterpret_cast<uchar *>(table_handler->m_table_name); -} - /* Drop index thread's control */ @@ -2012,7 +2246,7 @@ class Rdb_snapshot_notifier : public rocksdb::TransactionNotifier { void SnapshotCreated(const rocksdb::Snapshot *snapshot) override; -public: + public: Rdb_snapshot_notifier(const Rdb_snapshot_notifier &) = delete; Rdb_snapshot_notifier &operator=(const Rdb_snapshot_notifier &) = delete; @@ -2046,9 +2280,9 @@ String timeout_message(const char *command, const char *name1, /* This is the base class for transactions when interacting with rocksdb. -*/ + */ class Rdb_transaction { -protected: + protected: ulonglong m_write_count = 0; ulonglong m_insert_count = 0; ulonglong m_update_count = 0; @@ -2059,7 +2293,7 @@ protected: bool m_is_delayed_snapshot = false; bool m_is_two_phase = false; -private: + private: /* Number of write operations this transaction had when we took the last savepoint (the idea is not to take another savepoint if we haven't made @@ -2067,7 +2301,7 @@ private: */ ulonglong m_writes_at_last_savepoint; -protected: + protected: protected: THD *m_thd = nullptr; @@ -2092,9 +2326,9 @@ protected: // This should be used only when updating binlog information. virtual rocksdb::WriteBatchBase *get_write_batch() = 0; virtual bool commit_no_binlog() = 0; - virtual rocksdb::Iterator * - get_iterator(const rocksdb::ReadOptions &options, - rocksdb::ColumnFamilyHandle *column_family) = 0; + virtual rocksdb::Iterator *get_iterator( + const rocksdb::ReadOptions &options, + rocksdb::ColumnFamilyHandle *column_family) = 0; protected: /* @@ -2139,7 +2373,9 @@ protected: String m_detailed_error; int64_t m_snapshot_timestamp = 0; bool m_ddl_transaction; +#ifdef MARIAROCKS_NOT_YET std::shared_ptr<Rdb_explicit_snapshot> m_explicit_snapshot; +#endif /* Tracks the number of tables in use through external_lock. @@ -2173,8 +2409,9 @@ protected: RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex); - for (auto it : s_tx_list) + for (auto it : s_tx_list) { walker->process_tran(it); + } RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex); } @@ -2194,7 +2431,8 @@ protected: convert_error_code_to_mysql() does: force a statement rollback before returning HA_ERR_LOCK_WAIT_TIMEOUT: */ - my_core::thd_mark_transaction_to_rollback(thd, false /*just statement*/); + my_core::thd_mark_transaction_to_rollback( + thd, static_cast<bool>(rocksdb_rollback_on_timeout)); m_detailed_error.copy(timeout_message( "index", tbl_def->full_tablename().c_str(), kd.get_name().c_str())); table_handler->m_lock_wait_timeout_counter.inc(); @@ -2216,9 +2454,10 @@ protected: char user_host_buff[MAX_USER_HOST_SIZE + 1]; make_user_name(thd, user_host_buff); // NO_LINT_DEBUG - sql_print_warning("Got snapshot conflict errors: User: %s " - "Query: %s", - user_host_buff, thd->query()); + sql_print_warning( + "Got snapshot conflict errors: User: %s " + "Query: %s", + user_host_buff, thd->query()); } m_detailed_error = String(" (snapshot conflict)", system_charset_info); table_handler->m_deadlock_counter.inc(); @@ -2315,8 +2554,9 @@ protected: if (m_is_tx_failed) { rollback(); res = false; - } else + } else { res = commit(); + } return res; } @@ -2367,7 +2607,7 @@ protected: bool has_snapshot() const { return m_read_opts.snapshot != nullptr; } -private: + private: // The Rdb_sst_info structures we are currently loading. In a partitioned // table this can have more than one entry std::vector<std::shared_ptr<Rdb_sst_info>> m_curr_bulk_load; @@ -2376,7 +2616,7 @@ private: /* External merge sorts for bulk load: key ID -> merge sort instance */ std::unordered_map<GL_INDEX_ID, Rdb_index_merge> m_key_merge; -public: + public: int get_key_merge(GL_INDEX_ID kd_gl_id, rocksdb::ColumnFamilyHandle *cf, Rdb_index_merge **key_merge) { int res; @@ -2397,22 +2637,62 @@ public: return HA_EXIT_SUCCESS; } - int finish_bulk_load(int print_client_error = true) { - int rc = 0, rc2; + /* Finish bulk loading for all table handlers belongs to one connection */ + int finish_bulk_load(bool *is_critical_error = nullptr, + int print_client_error = true) { + Ensure_cleanup cleanup([&]() { + // Always clear everything regardless of success/failure + m_curr_bulk_load.clear(); + m_curr_bulk_load_tablename.clear(); + m_key_merge.clear(); + }); + + int rc = 0; + if (is_critical_error) { + *is_critical_error = true; + } + + // PREPARE phase: finish all on-going bulk loading Rdb_sst_info and + // collect all Rdb_sst_commit_info containing (SST files, cf) + int rc2 = 0; + std::vector<Rdb_sst_info::Rdb_sst_commit_info> sst_commit_list; + sst_commit_list.reserve(m_curr_bulk_load.size()); + + for (auto &sst_info : m_curr_bulk_load) { + Rdb_sst_info::Rdb_sst_commit_info commit_info; - std::vector<std::shared_ptr<Rdb_sst_info>>::iterator it; - for (it = m_curr_bulk_load.begin(); it != m_curr_bulk_load.end(); it++) { - rc2 = (*it)->commit(print_client_error); - if (rc2 != 0 && rc == 0) { + // Commit the list of SST files and move it to the end of + // sst_commit_list, effectively transfer the ownership over + rc2 = sst_info->finish(&commit_info, print_client_error); + if (rc2 && rc == 0) { + // Don't return yet - make sure we finish all the SST infos rc = rc2; } + + // Make sure we have work to do - we might be losing the race + if (rc2 == 0 && commit_info.has_work()) { + sst_commit_list.emplace_back(std::move(commit_info)); + DBUG_ASSERT(!commit_info.has_work()); + } + } + + if (rc) { + return rc; } - m_curr_bulk_load.clear(); - m_curr_bulk_load_tablename.clear(); - DBUG_ASSERT(m_curr_bulk_load.size() == 0); - // Flush the index_merge sort buffers + // MERGING Phase: Flush the index_merge sort buffers into SST files in + // Rdb_sst_info and collect all Rdb_sst_commit_info containing + // (SST files, cf) if (!m_key_merge.empty()) { + Ensure_cleanup malloc_cleanup([]() { + /* + Explicitly tell jemalloc to clean up any unused dirty pages at this + point. + See https://reviews.facebook.net/D63723 for more details. + */ + purge_all_jemalloc_arenas(); + }); + rocksdb::Slice merge_key; rocksdb::Slice merge_val; for (auto it = m_key_merge.begin(); it != m_key_merge.end(); it++) { @@ -2429,9 +2709,20 @@ public: // be missed by the compaction filter and not be marked for // removal. It is unclear how to lock the sql table from the storage // engine to prevent modifications to it while bulk load is occurring. - if (keydef == nullptr || table_name.empty()) { - rc2 = HA_ERR_ROCKSDB_BULK_LOAD; - break; + if (keydef == nullptr) { + if (is_critical_error) { + // We used to set the error but simply ignores it. This follows + // current behavior and we should revisit this later + *is_critical_error = false; + } + return HA_ERR_KEY_NOT_FOUND; + } else if (table_name.empty()) { + if (is_critical_error) { + // We used to set the error but simply ignores it. This follows + // current behavior and we should revisit this later + *is_critical_error = false; + } + return HA_ERR_NO_SUCH_TABLE; } const std::string &index_name = keydef->get_name(); Rdb_index_merge &rdb_merge = it->second; @@ -2440,38 +2731,112 @@ public: // "./database/table" std::replace(table_name.begin(), table_name.end(), '.', '/'); table_name = "./" + table_name; - Rdb_sst_info sst_info(rdb, table_name, index_name, rdb_merge.get_cf(), - *rocksdb_db_options, - THDVAR(get_thd(), trace_sst_api)); + auto sst_info = std::make_shared<Rdb_sst_info>( + rdb, table_name, index_name, rdb_merge.get_cf(), + *rocksdb_db_options, THDVAR(get_thd(), trace_sst_api)); while ((rc2 = rdb_merge.next(&merge_key, &merge_val)) == 0) { - if ((rc2 = sst_info.put(merge_key, merge_val)) != 0) { + if ((rc2 = sst_info->put(merge_key, merge_val)) != 0) { + rc = rc2; + + // Don't return yet - make sure we finish the sst_info break; } } - // rc2 == -1 => finished ok; rc2 > 0 => error - if (rc2 > 0 || (rc2 = sst_info.commit(print_client_error)) != 0) { - if (rc == 0) { - rc = rc2; - } - break; + // -1 => no more items + if (rc2 != -1 && rc != 0) { + rc = rc2; + } + + Rdb_sst_info::Rdb_sst_commit_info commit_info; + rc2 = sst_info->finish(&commit_info, print_client_error); + if (rc2 != 0 && rc == 0) { + // Only set the error from sst_info->finish if finish failed and we + // didn't fail before. In other words, we don't have finish's + // success mask earlier failures + rc = rc2; + } + + if (rc) { + return rc; + } + + if (commit_info.has_work()) { + sst_commit_list.emplace_back(std::move(commit_info)); + DBUG_ASSERT(!commit_info.has_work()); } } - m_key_merge.clear(); + } - /* - Explicitly tell jemalloc to clean up any unused dirty pages at this - point. - See https://reviews.facebook.net/D63723 for more details. - */ - purge_all_jemalloc_arenas(); + // Early return in case we lost the race completely and end up with no + // work at all + if (sst_commit_list.size() == 0) { + return rc; + } + + // INGEST phase: Group all Rdb_sst_commit_info by cf (as they might + // have the same cf across different indexes) and call out to RocksDB + // to ingest all SST files in one atomic operation + rocksdb::IngestExternalFileOptions options; + options.move_files = true; + options.snapshot_consistency = false; + options.allow_global_seqno = false; + options.allow_blocking_flush = false; + + std::map<rocksdb::ColumnFamilyHandle *, rocksdb::IngestExternalFileArg> + arg_map; + + // Group by column_family + for (auto &commit_info : sst_commit_list) { + if (arg_map.find(commit_info.get_cf()) == arg_map.end()) { + rocksdb::IngestExternalFileArg arg; + arg.column_family = commit_info.get_cf(), + arg.external_files = commit_info.get_committed_files(), + arg.options = options; + + arg_map.emplace(commit_info.get_cf(), arg); + } else { + auto &files = arg_map[commit_info.get_cf()].external_files; + files.insert(files.end(), commit_info.get_committed_files().begin(), + commit_info.get_committed_files().end()); + } + } + + std::vector<rocksdb::IngestExternalFileArg> args; + size_t file_count = 0; + for (auto &cf_files_pair : arg_map) { + args.push_back(cf_files_pair.second); + file_count += cf_files_pair.second.external_files.size(); + } + + const rocksdb::Status s = rdb->IngestExternalFiles(args); + if (THDVAR(m_thd, trace_sst_api)) { + // NO_LINT_DEBUG + sql_print_information( + "SST Tracing: IngestExternalFile '%zu' files returned %s", file_count, + s.ok() ? "ok" : "not ok"); + } + + if (!s.ok()) { + if (print_client_error) { + Rdb_sst_info::report_error_msg(s, nullptr); + } + return HA_ERR_ROCKSDB_BULK_LOAD; + } + + // COMMIT phase: mark everything as completed. This avoids SST file + // deletion kicking in. Otherwise SST files would get deleted if this + // entire operation is aborted + for (auto &commit_info : sst_commit_list) { + commit_info.commit(); } + return rc; } int start_bulk_load(ha_rocksdb *const bulk_load, - std::shared_ptr<Rdb_sst_info> sst_info) { + std::shared_ptr<Rdb_sst_info> sst_info) { /* If we already have an open bulk load of a table and the name doesn't match the current one, close out the currently running one. This allows @@ -2484,8 +2849,6 @@ public: bulk_load->get_table_basename() != m_curr_bulk_load_tablename) { const auto res = finish_bulk_load(); if (res != HA_EXIT_SUCCESS) { - m_curr_bulk_load.clear(); - m_curr_bulk_load_tablename.clear(); return res; } } @@ -2535,12 +2898,10 @@ public: inserts while inside a multi-statement transaction. */ bool flush_batch() { - if (get_write_count() == 0) - return false; + if (get_write_count() == 0) return false; /* Commit the current transaction */ - if (commit_no_binlog()) - return true; + if (commit_no_binlog()) return true; /* Start another one */ start_tx(); @@ -2552,7 +2913,7 @@ public: std::max(m_auto_incr_map[gl_index_id], curr_id); } -#ifndef NDEBUG +#ifndef DBUG_OFF ulonglong get_auto_incr(const GL_INDEX_ID &gl_index_id) { if (m_auto_incr_map.count(gl_index_id) > 0) { return m_auto_incr_map[gl_index_id]; @@ -2563,13 +2924,14 @@ public: virtual rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, - const rocksdb::Slice &value) = 0; - virtual rocksdb::Status - delete_key(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) = 0; - virtual rocksdb::Status - single_delete(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) = 0; + const rocksdb::Slice &value, + const bool assume_tracked) = 0; + virtual rocksdb::Status delete_key( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, const bool assume_tracked) = 0; + virtual rocksdb::Status single_delete( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, const bool assume_tracked) = 0; virtual bool has_modifications() const = 0; @@ -2585,25 +2947,23 @@ public: virtual rocksdb::Status get(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, rocksdb::PinnableSlice *const value) const = 0; - virtual rocksdb::Status - get_for_update(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, - bool exclusive) = 0; - - rocksdb::Iterator * - get_iterator(rocksdb::ColumnFamilyHandle *const column_family, - bool skip_bloom_filter, bool fill_cache, - const rocksdb::Slice &eq_cond_lower_bound, - const rocksdb::Slice &eq_cond_upper_bound, - bool read_current = false, bool create_snapshot = true) { + virtual rocksdb::Status get_for_update( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, + bool exclusive, const bool do_validate) = 0; + + rocksdb::Iterator *get_iterator( + rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter, + bool fill_cache, const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, bool read_current = false, + bool create_snapshot = true) { // Make sure we are not doing both read_current (which implies we don't // want a snapshot) and create_snapshot which makes sure we create // a snapshot DBUG_ASSERT(column_family != nullptr); DBUG_ASSERT(!read_current || !create_snapshot); - if (create_snapshot) - acquire_snapshot(true); + if (create_snapshot) acquire_snapshot(true); rocksdb::ReadOptions options = m_read_opts; @@ -2635,25 +2995,33 @@ public: entire transaction. */ do_set_savepoint(); - m_writes_at_last_savepoint= m_write_count; + m_writes_at_last_savepoint = m_write_count; } /* Called when a "top-level" statement inside a transaction completes successfully and its changes become part of the transaction's changes. */ - void make_stmt_savepoint_permanent() { - + int make_stmt_savepoint_permanent() { // Take another RocksDB savepoint only if we had changes since the last // one. This is very important for long transactions doing lots of // SELECTs. - if (m_writes_at_last_savepoint != m_write_count) - { + if (m_writes_at_last_savepoint != m_write_count) { + rocksdb::WriteBatchBase *batch = get_write_batch(); + rocksdb::Status status = rocksdb::Status::NotFound(); + while ((status = batch->PopSavePoint()) == rocksdb::Status::OK()) { + } + + if (status != rocksdb::Status::NotFound()) { + return HA_EXIT_FAILURE; + } + do_set_savepoint(); - m_writes_at_last_savepoint= m_write_count; + m_writes_at_last_savepoint = m_write_count; } - } + return HA_EXIT_SUCCESS; + } /* Rollback to the savepoint we've set before the last statement @@ -2669,7 +3037,7 @@ public: statement start) because setting a savepoint is cheap. */ do_set_savepoint(); - m_writes_at_last_savepoint= m_write_count; + m_writes_at_last_savepoint = m_write_count; } } @@ -2733,10 +3101,11 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Transaction *m_rocksdb_tx = nullptr; rocksdb::Transaction *m_rocksdb_reuse_tx = nullptr; -public: + public: void set_lock_timeout(int timeout_sec_arg) override { - if (m_rocksdb_tx) + if (m_rocksdb_tx) { m_rocksdb_tx->SetLockTimeout(rdb_convert_sec_to_ms(m_timeout_sec)); + } } void set_sync(bool sync) override { @@ -2753,7 +3122,7 @@ public: virtual bool is_writebatch_trx() const override { return false; } -private: + private: void release_tx(void) { // We are done with the current active transaction object. Preserve it // for later reuse. @@ -2803,7 +3172,7 @@ private: goto error; } -error: + error: /* Save the transaction object to be reused */ release_tx(); @@ -2817,7 +3186,7 @@ error: return res; } -public: + public: void rollback() override { m_write_count = 0; m_insert_count = 0; @@ -2884,39 +3253,42 @@ public: m_read_opts.snapshot = nullptr; } - if (need_clear && m_rocksdb_tx != nullptr) - m_rocksdb_tx->ClearSnapshot(); + if (need_clear && m_rocksdb_tx != nullptr) m_rocksdb_tx->ClearSnapshot(); } bool has_snapshot() { return m_read_opts.snapshot != nullptr; } rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key, - const rocksdb::Slice &value) override { + const rocksdb::Slice &key, const rocksdb::Slice &value, + const bool assume_tracked) override { ++m_write_count; ++m_lock_count; - if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) + if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) { return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit); - return m_rocksdb_tx->Put(column_family, key, value); + } + return m_rocksdb_tx->Put(column_family, key, value, assume_tracked); } rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) override { + const rocksdb::Slice &key, + const bool assume_tracked) override { ++m_write_count; ++m_lock_count; - if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) + if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) { return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit); - return m_rocksdb_tx->Delete(column_family, key); + } + return m_rocksdb_tx->Delete(column_family, key, assume_tracked); } - rocksdb::Status - single_delete(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) override { + rocksdb::Status single_delete( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, const bool assume_tracked) override { ++m_write_count; ++m_lock_count; - if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) + if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) { return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit); - return m_rocksdb_tx->SingleDelete(column_family, key); + } + return m_rocksdb_tx->SingleDelete(column_family, key, assume_tracked); } bool has_modifications() const override { @@ -2952,23 +3324,39 @@ public: return m_rocksdb_tx->Get(m_read_opts, column_family, key, value); } - rocksdb::Status - get_for_update(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, - bool exclusive) override { - if (++m_lock_count > m_max_row_locks) + rocksdb::Status get_for_update( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, + bool exclusive, const bool do_validate) override { + if (++m_lock_count > m_max_row_locks) { return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit); + } if (value != nullptr) { value->Reset(); } - return m_rocksdb_tx->GetForUpdate(m_read_opts, column_family, key, value, - exclusive); + rocksdb::Status s; + // If snapshot is null, pass it to GetForUpdate and snapshot is + // initialized there. Snapshot validation is skipped in that case. + if (m_read_opts.snapshot == nullptr || do_validate) { + s = m_rocksdb_tx->GetForUpdate( + m_read_opts, column_family, key, value, exclusive, + m_read_opts.snapshot ? do_validate : false); + } else { + // If snapshot is set, and if skipping validation, + // call GetForUpdate without validation and set back old snapshot + auto saved_snapshot = m_read_opts.snapshot; + m_read_opts.snapshot = nullptr; + s = m_rocksdb_tx->GetForUpdate(m_read_opts, column_family, key, value, + exclusive, false); + m_read_opts.snapshot = saved_snapshot; + } + return s; } - rocksdb::Iterator * - get_iterator(const rocksdb::ReadOptions &options, - rocksdb::ColumnFamilyHandle *const column_family) override { + rocksdb::Iterator *get_iterator( + const rocksdb::ReadOptions &options, + rocksdb::ColumnFamilyHandle *const column_family) override { global_stats.queries[QUERIES_RANGE].inc(); return m_rocksdb_tx->GetIterator(options, column_family); } @@ -3013,10 +3401,9 @@ public: m_ddl_transaction = false; } - /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */ - void do_set_savepoint() override { - m_rocksdb_tx->SetSavePoint(); - } + /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints + */ + void do_set_savepoint() override { m_rocksdb_tx->SetSavePoint(); } void do_rollback_to_savepoint() override { m_rocksdb_tx->RollbackToSavePoint(); @@ -3048,14 +3435,14 @@ public: const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot(); if (org_snapshot != cur_snapshot) { - if (org_snapshot != nullptr) - m_snapshot_timestamp = 0; + if (org_snapshot != nullptr) m_snapshot_timestamp = 0; m_read_opts.snapshot = cur_snapshot; - if (cur_snapshot != nullptr) + if (cur_snapshot != nullptr) { rdb->GetEnv()->GetCurrentTime(&m_snapshot_timestamp); - else + } else { m_is_delayed_snapshot = true; + } } } } @@ -3066,7 +3453,7 @@ public: m_notifier = std::make_shared<Rdb_snapshot_notifier>(this); } - virtual ~Rdb_transaction_impl() { + virtual ~Rdb_transaction_impl() override { rollback(); // Theoretically the notifier could outlive the Rdb_transaction_impl @@ -3098,7 +3485,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { m_ddl_transaction = false; } -private: + private: bool prepare(const rocksdb::TransactionName &name) override { return true; } bool commit_no_binlog() override { @@ -3122,7 +3509,7 @@ private: res = true; goto error; } -error: + error: reset(); m_write_count = 0; @@ -3135,16 +3522,12 @@ error: } /* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */ - void do_set_savepoint() override { - m_batch->SetSavePoint(); - } + void do_set_savepoint() override { m_batch->SetSavePoint(); } - void do_rollback_to_savepoint() override { - m_batch->RollbackToSavePoint(); - } + void do_rollback_to_savepoint() override { m_batch->RollbackToSavePoint(); } -public: + public: bool is_writebatch_trx() const override { return true; } void set_lock_timeout(int timeout_sec_arg) override { @@ -3172,8 +3555,7 @@ public: } void acquire_snapshot(bool acquire_now) override { - if (m_read_opts.snapshot == nullptr) - snapshot_created(rdb->GetSnapshot()); + if (m_read_opts.snapshot == nullptr) snapshot_created(rdb->GetSnapshot()); } void release_snapshot() override { @@ -3184,8 +3566,8 @@ public: } rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key, - const rocksdb::Slice &value) override { + const rocksdb::Slice &key, const rocksdb::Slice &value, + const bool assume_tracked) override { ++m_write_count; m_batch->Put(column_family, key, value); // Note Put/Delete in write batch doesn't return any error code. We simply @@ -3194,15 +3576,16 @@ public: } rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) override { + const rocksdb::Slice &key, + const bool assume_tracked) override { ++m_write_count; m_batch->Delete(column_family, key); return rocksdb::Status::OK(); } - rocksdb::Status - single_delete(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key) override { + rocksdb::Status single_delete( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, const bool /* assume_tracked */) override { ++m_write_count; m_batch->SingleDelete(column_family, key); return rocksdb::Status::OK(); @@ -3227,10 +3610,10 @@ public: value); } - rocksdb::Status - get_for_update(rocksdb::ColumnFamilyHandle *const column_family, - const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, - bool exclusive) override { + rocksdb::Status get_for_update( + rocksdb::ColumnFamilyHandle *const column_family, + const rocksdb::Slice &key, rocksdb::PinnableSlice *const value, + bool /* exclusive */, const bool /* do_validate */) override { if (value == nullptr) { rocksdb::PinnableSlice pin_val; rocksdb::Status s = get(column_family, key, &pin_val); @@ -3241,9 +3624,9 @@ public: return get(column_family, key, value); } - rocksdb::Iterator * - get_iterator(const rocksdb::ReadOptions &options, - rocksdb::ColumnFamilyHandle *const column_family) override { + rocksdb::Iterator *get_iterator( + const rocksdb::ReadOptions &options, + rocksdb::ColumnFamilyHandle *const /* column_family */) override { const auto it = rdb->NewIterator(options); return m_batch->NewIteratorWithBase(it); } @@ -3264,8 +3647,7 @@ public: void start_stmt() override {} void rollback_stmt() override { - if (m_batch) - rollback_to_stmt_savepoint(); + if (m_batch) rollback_to_stmt_savepoint(); } explicit Rdb_writebatch_impl(THD *const thd) @@ -3274,7 +3656,7 @@ public: true); } - virtual ~Rdb_writebatch_impl() { + virtual ~Rdb_writebatch_impl() override { rollback(); delete m_batch; } @@ -3332,7 +3714,7 @@ class Rdb_perf_context_guard { } }; -} // anonymous namespace +} // anonymous namespace /* TODO: maybe, call this in external_lock() and store in ha_rocksdb.. @@ -3347,9 +3729,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) { false /* MARIAROCKS_NOT_YET: THDVAR(thd, master_skip_tx_api) && !thd->rgi_slave)*/) { tx = new Rdb_writebatch_impl(thd); - } - else - { + } else { tx = new Rdb_transaction_impl(thd); } tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks)); @@ -3368,12 +3748,14 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) { static int rocksdb_close_connection(handlerton *const hton, THD *const thd) { Rdb_transaction *tx = get_tx_from_thd(thd); if (tx != nullptr) { - int rc = tx->finish_bulk_load(false); - if (rc != 0) { + bool is_critical_error; + int rc = tx->finish_bulk_load(&is_critical_error, false); + if (rc != 0 && is_critical_error) { // NO_LINT_DEBUG - sql_print_error("RocksDB: Error %d finalizing last SST file while " - "disconnecting", - rc); + sql_print_error( + "RocksDB: Error %d finalizing last SST file while " + "disconnecting", + rc); } delete tx; @@ -3514,9 +3896,9 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) } DEBUG_SYNC(thd, "rocksdb.prepared"); - } - else + } else { tx->make_stmt_savepoint_permanent(); + } return HA_EXIT_SUCCESS; } @@ -3557,9 +3939,8 @@ static int rocksdb_commit_by_xid(handlerton *const hton, XID *const xid) { DBUG_RETURN(HA_EXIT_SUCCESS); } -static int -rocksdb_rollback_by_xid(handlerton *const hton MY_ATTRIBUTE((__unused__)), - XID *const xid) { +static int rocksdb_rollback_by_xid( + handlerton *const hton MY_ATTRIBUTE((__unused__)), XID *const xid) { DBUG_ENTER_FUNC(); DBUG_ASSERT(hton != nullptr); @@ -3605,6 +3986,7 @@ static void rdb_xid_from_string(const std::string &src, XID *const dst) { DBUG_ASSERT(dst->gtrid_length >= 0 && dst->gtrid_length <= MAXGTRIDSIZE); DBUG_ASSERT(dst->bqual_length >= 0 && dst->bqual_length <= MAXBQUALSIZE); + memset(dst->data, 0, XIDDATASIZE); src.copy(dst->data, (dst->gtrid_length) + (dst->bqual_length), RDB_XIDHDR_LEN); } @@ -3629,13 +4011,16 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len) if (is_binlog_advanced(binlog_file, *binlog_pos, file_buf, pos)) { memcpy(binlog_file, file_buf, FN_REFLEN + 1); *binlog_pos = pos; - fprintf(stderr, "RocksDB: Last binlog file position %llu," - " file name %s\n", + // NO_LINT_DEBUG + fprintf(stderr, + "RocksDB: Last binlog file position %llu," + " file name %s\n", pos, file_buf); if (*gtid_buf) { global_sid_lock->rdlock(); binlog_max_gtid->parse(global_sid_map, gtid_buf); global_sid_lock->unlock(); + // NO_LINT_DEBUG fprintf(stderr, "RocksDB: Last MySQL Gtid %s\n", gtid_buf); } } @@ -3733,8 +4118,8 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd)); if (tx != nullptr) { - if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | - OPTION_BEGIN))) { + if (commit_tx || (!my_core::thd_test_options( + thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { /* This will not add anything to commit_latency_stats, and this is correct right? @@ -3865,7 +4250,7 @@ static std::string format_string(const char *const format, ...) { char *buff = static_buff; std::unique_ptr<char[]> dynamic_buff = nullptr; - len++; // Add one for null terminator + len++; // Add one for null terminator // for longer output use an allocated buffer if (static_cast<uint>(len) > sizeof(static_buff)) { @@ -3890,7 +4275,7 @@ static std::string format_string(const char *const format, ...) { } class Rdb_snapshot_status : public Rdb_tx_list_walker { -private: + private: std::string m_data; static std::string current_timestamp(void) { @@ -3924,9 +4309,8 @@ private: "=========================================\n"; } - static Rdb_deadlock_info::Rdb_dl_trx_info - get_dl_txn_info(const rocksdb::DeadlockInfo &txn, - const GL_INDEX_ID &gl_index_id) { + static Rdb_deadlock_info::Rdb_dl_trx_info get_dl_txn_info( + const rocksdb::DeadlockInfo &txn, const GL_INDEX_ID &gl_index_id) { Rdb_deadlock_info::Rdb_dl_trx_info txn_data; txn_data.trx_id = txn.m_txn_id; @@ -3953,13 +4337,12 @@ private: return txn_data; } - static Rdb_deadlock_info - get_dl_path_trx_info(const rocksdb::DeadlockPath &path_entry) { + static Rdb_deadlock_info get_dl_path_trx_info( + const rocksdb::DeadlockPath &path_entry) { Rdb_deadlock_info deadlock_info; - for (auto it = path_entry.path.begin(); it != path_entry.path.end(); - it++) { - auto txn = *it; + for (auto it = path_entry.path.begin(); it != path_entry.path.end(); it++) { + const auto &txn = *it; const GL_INDEX_ID gl_index_id = { txn.m_cf_id, rdb_netbuf_to_uint32(reinterpret_cast<const uchar *>( txn.m_waiting_key.c_str()))}; @@ -3968,7 +4351,7 @@ private: DBUG_ASSERT_IFF(path_entry.limit_exceeded, path_entry.path.empty()); /* print the first txn in the path to display the full deadlock cycle */ if (!path_entry.path.empty() && !path_entry.limit_exceeded) { - auto deadlocking_txn = *(path_entry.path.end() - 1); + const auto &deadlocking_txn = *(path_entry.path.end() - 1); deadlock_info.victim_trx_id = deadlocking_txn.m_txn_id; deadlock_info.deadlock_time = path_entry.deadlock_time; } @@ -3997,7 +4380,7 @@ private: #endif m_data += format_string( "---SNAPSHOT, ACTIVE %lld sec\n" - "%s\n" + "%s\n" "lock count %llu, write count %llu\n" "insert count %llu, update count %llu, delete count %llu\n", (longlong)(curr_time - snapshot_timestamp), buffer, tx->get_lock_count(), @@ -4010,19 +4393,21 @@ private: auto dlock_buffer = rdb->GetDeadlockInfoBuffer(); m_data += "----------LATEST DETECTED DEADLOCKS----------\n"; - for (auto path_entry : dlock_buffer) { + for (const auto &path_entry : dlock_buffer) { std::string path_data; if (path_entry.limit_exceeded) { path_data += "\n-------DEADLOCK EXCEEDED MAX DEPTH-------\n"; } else { - path_data += "\n*** DEADLOCK PATH\n" - "=========================================\n"; + path_data += + "\n*** DEADLOCK PATH\n" + "=========================================\n"; const auto dl_info = get_dl_path_trx_info(path_entry); const auto deadlock_time = dl_info.deadlock_time; for (auto it = dl_info.path.begin(); it != dl_info.path.end(); it++) { - const auto trx_info = *it; + const auto &trx_info = *it; path_data += format_string( - "TIMESTAMP: %" PRId64 "\n" + "TIMESTAMP: %" PRId64 + "\n" "TRANSACTION ID: %u\n" "COLUMN FAMILY NAME: %s\n" "WAITING KEY: %s\n" @@ -4037,9 +4422,9 @@ private: path_data += "---------------WAITING FOR---------------\n"; } } - path_data += - format_string("\n--------TRANSACTION ID: %u GOT DEADLOCK---------\n", - dl_info.victim_trx_id); + path_data += format_string( + "\n--------TRANSACTION ID: %u GOT DEADLOCK---------\n", + dl_info.victim_trx_id); } m_data += path_data; } @@ -4048,7 +4433,7 @@ private: std::vector<Rdb_deadlock_info> get_deadlock_info() { std::vector<Rdb_deadlock_info> deadlock_info; auto dlock_buffer = rdb->GetDeadlockInfoBuffer(); - for (auto path_entry : dlock_buffer) { + for (const auto &path_entry : dlock_buffer) { if (!path_entry.limit_exceeded) { deadlock_info.push_back(get_dl_path_trx_info(path_entry)); } @@ -4063,10 +4448,10 @@ private: * out relevant information for information_schema.rocksdb_trx */ class Rdb_trx_info_aggregator : public Rdb_tx_list_walker { -private: + private: std::vector<Rdb_trx_info> *m_trx_info; -public: + public: explicit Rdb_trx_info_aggregator(std::vector<Rdb_trx_info> *const trx_info) : m_trx_info(trx_info) {} @@ -4197,9 +4582,10 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, // sure that output will look unified. DBUG_ASSERT(commit_latency_stats != nullptr); - snprintf(buf, sizeof(buf), "rocksdb.commit_latency statistics " - "Percentiles :=> 50 : %.2f 95 : %.2f " - "99 : %.2f 100 : %.2f\n", + snprintf(buf, sizeof(buf), + "rocksdb.commit_latency statistics " + "Percentiles :=> 50 : %.2f 95 : %.2f " + "99 : %.2f 100 : %.2f\n", commit_latency_stats->Percentile(50), commit_latency_stats->Percentile(95), commit_latency_stats->Percentile(99), @@ -4221,7 +4607,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, } if (rdb->GetIntProperty("rocksdb.actual-delayed-write-rate", &v)) { - snprintf(buf, sizeof(buf), "rocksdb.actual_delayed_write_rate " + snprintf(buf, sizeof(buf), "COUNT : %llu\n", (ulonglong)v); str.append(buf); @@ -4309,6 +4695,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, rocksdb::Status s = rdb->GetEnv()->GetThreadList(&thread_list); if (!s.ok()) { + // NO_LINT_DEBUG sql_print_error("RocksDB: Returned error (%s) from GetThreadList.\n", s.ToString().c_str()); res |= true; @@ -4325,37 +4712,23 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd, "\noperation_type: " + it.GetOperationName(it.operation_type) + "\noperation_stage: " + it.GetOperationStageName(it.operation_stage) + - "\nelapsed_time_ms: " + - it.MicrosToString(it.op_elapsed_micros); + "\nelapsed_time_ms: " + it.MicrosToString(it.op_elapsed_micros); - for (auto &it_props : - it.InterpretOperationProperties(it.operation_type, - it.op_properties)) { + for (auto &it_props : it.InterpretOperationProperties( + it.operation_type, it.op_properties)) { str += "\n" + it_props.first + ": " + std::to_string(it_props.second); } str += "\nstate_type: " + it.GetStateName(it.state_type); - res |= print_stats(thd, "BG_THREADS", std::to_string(it.thread_id), - str, stat_print); + res |= print_stats(thd, "BG_THREADS", std::to_string(it.thread_id), str, + stat_print); } } #ifdef MARIAROCKS_NOT_YET /* Explicit snapshot information */ - str.clear(); - { - std::lock_guard<std::mutex> lock(explicit_snapshot_mutex); - for (const auto &elem : explicit_snapshots) { - const auto &ss = elem.second.lock(); - DBUG_ASSERT(ss != nullptr); - const auto &info = ss->ss_info; - str += "\nSnapshot ID: " + std::to_string(info.snapshot_id) + - "\nBinlog File: " + info.binlog_file + - "\nBinlog Pos: " + std::to_string(info.binlog_pos) + - "\nGtid Executed: " + info.gtid_executed + "\n"; - } - } + str = Rdb_explicit_snapshot::dump_snapshots(); #endif if (!str.empty()) { @@ -4390,38 +4763,38 @@ static bool rocksdb_explicit_snapshot( snapshot_info_st *ss_info) /*!< out: Snapshot information */ { switch (ss_info->op) { - case snapshot_operation::SNAPSHOT_CREATE: { - if (mysql_bin_log_is_open()) { - mysql_bin_log_lock_commits(ss_info); + case snapshot_operation::SNAPSHOT_CREATE: { + if (mysql_bin_log_is_open()) { + mysql_bin_log_lock_commits(ss_info); + } + auto s = Rdb_explicit_snapshot::create(ss_info, rdb, rdb->GetSnapshot()); + if (mysql_bin_log_is_open()) { + mysql_bin_log_unlock_commits(ss_info); + } + + thd->set_explicit_snapshot(s); + return s == nullptr; } - auto s = Rdb_explicit_snapshot::create(ss_info, rdb, rdb->GetSnapshot()); - if (mysql_bin_log_is_open()) { - mysql_bin_log_unlock_commits(ss_info); + case snapshot_operation::SNAPSHOT_ATTACH: { + auto s = Rdb_explicit_snapshot::get(ss_info->snapshot_id); + if (!s) { + return true; + } + *ss_info = s->ss_info; + thd->set_explicit_snapshot(s); + return false; } - - thd->set_explicit_snapshot(s); - return s == nullptr; - } - case snapshot_operation::SNAPSHOT_ATTACH: { - auto s = Rdb_explicit_snapshot::get(ss_info->snapshot_id); - if (!s) { - return true; + case snapshot_operation::SNAPSHOT_RELEASE: { + if (!thd->get_explicit_snapshot()) { + return true; + } + *ss_info = thd->get_explicit_snapshot()->ss_info; + thd->set_explicit_snapshot(nullptr); + return false; } - *ss_info = s->ss_info; - thd->set_explicit_snapshot(s); - return false; - } - case snapshot_operation::SNAPSHOT_RELEASE: { - if (!thd->get_explicit_snapshot()) { + default: + DBUG_ASSERT(false); return true; - } - *ss_info = thd->get_explicit_snapshot()->ss_info; - thd->set_explicit_snapshot(nullptr); - return false; - } - default: - DBUG_ASSERT(false); - return true; } return true; } @@ -4567,7 +4940,7 @@ static int rocksdb_start_tx_with_shared_read_view( // case: an explicit snapshot was not assigned to this transaction if (!tx->m_explicit_snapshot) { tx->m_explicit_snapshot = - Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot); + Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot); if (!tx->m_explicit_snapshot) { my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0)); error = HA_EXIT_FAILURE; @@ -4611,9 +4984,8 @@ static int rocksdb_rollback_to_savepoint(handlerton *const hton, THD *const thd, return tx->rollback_to_savepoint(savepoint); } -static bool -rocksdb_rollback_to_savepoint_can_release_mdl(handlerton *const hton, - THD *const thd) { +static bool rocksdb_rollback_to_savepoint_can_release_mdl( + handlerton *const /* hton */, THD *const /* thd */) { return true; } @@ -4661,7 +5033,7 @@ static void rocksdb_update_table_stats( /* Function needs to return void because of the interface and we've * detected an error which shouldn't happen. There's no way to let * caller know that something failed. - */ + */ SHIP_ASSERT(false); return; } @@ -4741,8 +5113,9 @@ static rocksdb::Status check_rocksdb_options_compatibility( } if (loaded_cf_descs.size() != cf_descr.size()) { - return rocksdb::Status::NotSupported("Mismatched size of column family " - "descriptors."); + return rocksdb::Status::NotSupported( + "Mismatched size of column family " + "descriptors."); } // Please see RocksDB documentation for more context about why we need to set @@ -4792,17 +5165,22 @@ static int rocksdb_init_func(void *const p) { } if (rdb_check_rocksdb_corruption()) { - sql_print_error("RocksDB: There was a corruption detected in RockDB files. " - "Check error log emitted earlier for more details."); + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: There was a corruption detected in RockDB files. " + "Check error log emitted earlier for more details."); if (rocksdb_allow_to_start_after_corruption) { + // NO_LINT_DEBUG sql_print_information( "RocksDB: Remove rocksdb_allow_to_start_after_corruption to prevent " "server operating if RocksDB corruption is detected."); } else { - sql_print_error("RocksDB: The server will exit normally and stop restart " - "attempts. Remove %s file from data directory and " - "start mysqld manually.", - rdb_corruption_marker_file_name().c_str()); + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: The server will exit normally and stop restart " + "attempts. Remove %s file from data directory and " + "start mysqld manually.", + rdb_corruption_marker_file_name().c_str()); exit(0); } } @@ -4813,8 +5191,10 @@ static int rocksdb_init_func(void *const p) { init_rocksdb_psi_keys(); rocksdb_hton = (handlerton *)p; - mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &rdb_open_tables.m_mutex, - MY_MUTEX_INIT_FAST); + + rdb_open_tables.init(); + Ensure_cleanup rdb_open_tables_cleanup([]() { rdb_open_tables.free(); }); + #ifdef HAVE_PSI_INTERFACE rdb_bg_thread.init(rdb_signal_bg_psi_mutex_key, rdb_signal_bg_psi_cond_key); rdb_drop_idx_thread.init(rdb_signal_drop_idx_psi_mutex_key, @@ -4885,6 +5265,8 @@ static int rocksdb_init_func(void *const p) { /* Not needed in MariaDB: rocksdb_hton->flush_logs = rocksdb_flush_wal; + rocksdb_hton->handle_single_table_select = rocksdb_handle_single_table_select; + */ rocksdb_hton->flags = HTON_TEMPORARY_NOT_SUPPORTED | @@ -4894,16 +5276,23 @@ static int rocksdb_init_func(void *const p) { DBUG_ASSERT(!mysqld_embedded); if (rocksdb_db_options->max_open_files > (long)open_files_limit) { - sql_print_information("RocksDB: rocksdb_max_open_files should not be " - "greater than the open_files_limit, effective value " - "of rocksdb_max_open_files is being set to " - "open_files_limit / 2."); + // NO_LINT_DEBUG + sql_print_information( + "RocksDB: rocksdb_max_open_files should not be " + "greater than the open_files_limit, effective value " + "of rocksdb_max_open_files is being set to " + "open_files_limit / 2."); rocksdb_db_options->max_open_files = open_files_limit / 2; } else if (rocksdb_db_options->max_open_files == -2) { rocksdb_db_options->max_open_files = open_files_limit / 2; } + rdb_read_free_regex_handler.set_patterns(DEFAULT_READ_FREE_RPL_TABLES); + rocksdb_stats = rocksdb::CreateDBStatistics(); + rocksdb_stats->set_stats_level( + static_cast<rocksdb::StatsLevel>(rocksdb_stats_level)); + rocksdb_stats_level = rocksdb_stats->get_stats_level(); rocksdb_db_options->statistics = rocksdb_stats; if (rocksdb_rate_limiter_bytes_per_sec != 0) { @@ -4937,13 +5326,15 @@ static int rocksdb_init_func(void *const p) { rocksdb_db_options->use_direct_reads) { // allow_mmap_reads implies !use_direct_reads and RocksDB will not open if // mmap_reads and direct_reads are both on. (NO_LINT_DEBUG) - sql_print_error("RocksDB: Can't enable both use_direct_reads " - "and allow_mmap_reads\n"); + sql_print_error( + "RocksDB: Can't enable both use_direct_reads " + "and allow_mmap_reads\n"); DBUG_RETURN(HA_EXIT_FAILURE); } // Check whether the filesystem backing rocksdb_datadir allows O_DIRECT - if (rocksdb_db_options->use_direct_reads) { + if (rocksdb_db_options->use_direct_reads || + rocksdb_db_options->use_direct_io_for_flush_and_compaction) { rocksdb::EnvOptions soptions; rocksdb::Status check_status; rocksdb::Env *const env = rocksdb_db_options->env; @@ -4964,9 +5355,11 @@ static int rocksdb_init_func(void *const p) { } if (!check_status.ok()) { - sql_print_error("RocksDB: Unable to use direct io in rocksdb-datadir:" - "(%s)", check_status.getState()); - rdb_open_tables.free_hash(); + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: Unable to use direct io in rocksdb-datadir:" + "(%s)", + check_status.getState()); DBUG_RETURN(HA_EXIT_FAILURE); } } @@ -4974,17 +5367,19 @@ static int rocksdb_init_func(void *const p) { if (rocksdb_db_options->allow_mmap_writes && rocksdb_db_options->use_direct_io_for_flush_and_compaction) { // See above comment for allow_mmap_reads. (NO_LINT_DEBUG) - sql_print_error("RocksDB: Can't enable both " - "use_direct_io_for_flush_and_compaction and " - "allow_mmap_writes\n"); + sql_print_error( + "RocksDB: Can't enable both " + "use_direct_io_for_flush_and_compaction and " + "allow_mmap_writes\n"); DBUG_RETURN(HA_EXIT_FAILURE); } if (rocksdb_db_options->allow_mmap_writes && rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER) { // NO_LINT_DEBUG - sql_print_error("RocksDB: rocksdb_flush_log_at_trx_commit needs to be 0 " - "to use allow_mmap_writes"); + sql_print_error( + "RocksDB: rocksdb_flush_log_at_trx_commit needs to be 0 " + "to use allow_mmap_writes"); DBUG_RETURN(HA_EXIT_FAILURE); } @@ -5011,15 +5406,19 @@ static int rocksdb_init_func(void *const p) { #endif ) { sql_print_information("RocksDB: Got ENOENT when listing column families"); + + // NO_LINT_DEBUG sql_print_information( "RocksDB: assuming that we're creating a new database"); } else { rdb_log_status_error(status, "Error listing column families"); DBUG_RETURN(HA_EXIT_FAILURE); } - } else + } else { + // NO_LINT_DEBUG sql_print_information("RocksDB: %ld column families found", cf_names.size()); + } std::vector<rocksdb::ColumnFamilyDescriptor> cf_descr; std::vector<rocksdb::ColumnFamilyHandle *> cf_handles; @@ -5028,9 +5427,33 @@ static int rocksdb_init_func(void *const p) { (rocksdb::BlockBasedTableOptions::IndexType)rocksdb_index_type; if (!rocksdb_tbl_options->no_block_cache) { - std::shared_ptr<rocksdb::Cache> block_cache = rocksdb_use_clock_cache - ? rocksdb::NewClockCache(rocksdb_block_cache_size) - : rocksdb::NewLRUCache(rocksdb_block_cache_size); + std::shared_ptr<rocksdb::MemoryAllocator> memory_allocator; + if (!rocksdb_cache_dump) { + size_t block_size = rocksdb_tbl_options->block_size; + rocksdb::JemallocAllocatorOptions alloc_opt; + // Limit jemalloc tcache memory usage. The range + // [block_size/4, block_size] should be enough to cover most of + // block cache allocation sizes. + alloc_opt.limit_tcache_size = true; + alloc_opt.tcache_size_lower_bound = block_size / 4; + alloc_opt.tcache_size_upper_bound = block_size; + rocksdb::Status new_alloc_status = + rocksdb::NewJemallocNodumpAllocator(alloc_opt, &memory_allocator); + if (!new_alloc_status.ok()) { + // Fallback to use default malloc/free. + rdb_log_status_error(new_alloc_status, + "Error excluding block cache from core dump"); + memory_allocator = nullptr; + DBUG_RETURN(HA_EXIT_FAILURE); + } + } + std::shared_ptr<rocksdb::Cache> block_cache = + rocksdb_use_clock_cache + ? rocksdb::NewClockCache(rocksdb_block_cache_size) + : rocksdb::NewLRUCache( + rocksdb_block_cache_size, -1 /*num_shard_bits*/, + false /*strict_capcity_limit*/, + rocksdb_cache_high_pri_pool_ratio, memory_allocator); if (rocksdb_sim_cache_size > 0) { // Simulated cache enabled // Wrap block cache inside a simulated cache and pass it to RocksDB @@ -5065,7 +5488,7 @@ static int rocksdb_init_func(void *const p) { if (rocksdb_persistent_cache_size_mb > 0) { std::shared_ptr<rocksdb::PersistentCache> pcache; - uint64_t cache_size_bytes= rocksdb_persistent_cache_size_mb * 1024 * 1024; + uint64_t cache_size_bytes = rocksdb_persistent_cache_size_mb * 1024 * 1024; status = rocksdb::NewPersistentCache( rocksdb::Env::Default(), std::string(rocksdb_persistent_cache_path), cache_size_bytes, myrocks_logger, true, &pcache); @@ -5077,6 +5500,7 @@ static int rocksdb_init_func(void *const p) { } rocksdb_tbl_options->persistent_cache = pcache; } else if (strlen(rocksdb_persistent_cache_path)) { + // NO_LINT_DEBUG sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size_mb"); DBUG_RETURN(HA_EXIT_FAILURE); } @@ -5094,17 +5518,23 @@ static int rocksdb_init_func(void *const p) { If there are no column families, we're creating the new database. Create one column family named "default". */ - if (cf_names.size() == 0) - cf_names.push_back(DEFAULT_CF_NAME); + if (cf_names.size() == 0) cf_names.push_back(DEFAULT_CF_NAME); std::vector<int> compaction_enabled_cf_indices; + + // NO_LINT_DEBUG sql_print_information("RocksDB: Column Families at start:"); for (size_t i = 0; i < cf_names.size(); ++i) { rocksdb::ColumnFamilyOptions opts; cf_options_map->get_cf_options(cf_names[i], &opts); + // NO_LINT_DEBUG sql_print_information(" cf=%s", cf_names[i].c_str()); + + // NO_LINT_DEBUG sql_print_information(" write_buffer_size=%ld", opts.write_buffer_size); + + // NO_LINT_DEBUG sql_print_information(" target_file_size_base=%" PRIu64, opts.target_file_size_base); @@ -5185,25 +5615,27 @@ static int rocksdb_init_func(void *const p) { DBUG_RETURN(HA_EXIT_FAILURE); } - auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME -#ifdef HAVE_PSI_INTERFACE - , - rdb_background_psi_thread_key +#ifndef HAVE_PSI_INTERFACE + auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME); +#else + auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME, + rdb_background_psi_thread_key); #endif - ); if (err != 0) { + // NO_LINT_DEBUG sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)", err); DBUG_RETURN(HA_EXIT_FAILURE); } - err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME -#ifdef HAVE_PSI_INTERFACE - , - rdb_drop_idx_psi_thread_key +#ifndef HAVE_PSI_INTERFACE + err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME); +#else + err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME, + rdb_drop_idx_psi_thread_key); #endif - ); if (err != 0) { + // NO_LINT_DEBUG sql_print_error("RocksDB: Couldn't start the drop index thread: (errno=%d)", err); DBUG_RETURN(HA_EXIT_FAILURE); @@ -5220,7 +5652,6 @@ static int rocksdb_init_func(void *const p) { sql_print_error( "RocksDB: Couldn't start the manual compaction thread: (errno=%d)", err); - rdb_open_tables.free_hash(); DBUG_RETURN(HA_EXIT_FAILURE); } @@ -5254,7 +5685,6 @@ static int rocksdb_init_func(void *const p) { if (err != 0) { // NO_LINT_DEBUG sql_print_error("RocksDB: Couldn't initialize error messages"); - rdb_open_tables.m_hash.~Rdb_table_set(); DBUG_RETURN(HA_EXIT_FAILURE); } @@ -5277,13 +5707,17 @@ static int rocksdb_init_func(void *const p) { } #if !defined(_WIN32) && !defined(__APPLE__) - io_watchdog = new Rdb_io_watchdog(directories); + io_watchdog = new Rdb_io_watchdog(std::move(directories)); io_watchdog->reset_timeout(rocksdb_io_write_timeout_secs); #endif // NO_LINT_DEBUG - sql_print_information("MyRocks storage engine plugin has been successfully " - "initialized."); + sql_print_information( + "MyRocks storage engine plugin has been successfully " + "initialized."); + + // Skip cleaning up rdb_open_tables as we've succeeded + rdb_open_tables_cleanup.skip(); DBUG_RETURN(HA_EXIT_SUCCESS); } @@ -5340,18 +5774,18 @@ static int rocksdb_done_func(void *const p) { "RocksDB: Couldn't stop the manual compaction thread: (errno=%d)", err); } - if (rdb_open_tables.m_hash.size()) { + if (rdb_open_tables.count()) { // Looks like we are getting unloaded and yet we have some open tables // left behind. error = 1; } + rdb_open_tables.free(); /* destructors for static objects can be called at _exit(), but we want to free the memory at dlclose() */ - rdb_open_tables.m_hash.~Rdb_table_set(); - mysql_mutex_destroy(&rdb_open_tables.m_mutex); + // MARIADB_MERGE_2019: rdb_open_tables.m_hash.~Rdb_table_set(); mysql_mutex_destroy(&rdb_sysvars_mutex); mysql_mutex_destroy(&rdb_block_cache_resize_mutex); @@ -5436,7 +5870,7 @@ static inline void rocksdb_smart_next(bool seek_backward, } } -#ifndef NDEBUG +#ifndef DBUG_OFF // simulate that RocksDB has reported corrupted data static void dbug_change_status_to_corrupted(rocksdb::Status *status) { *status = rocksdb::Status::Corruption(); @@ -5471,39 +5905,39 @@ static inline bool is_valid(rocksdb::Iterator *scan_it) { they are needed to function. */ -Rdb_table_handler * -Rdb_open_tables_map::get_table_handler(const char *const table_name) { +Rdb_table_handler *Rdb_open_tables_map::get_table_handler( + const char *const table_name) { + DBUG_ASSERT(table_name != nullptr); + Rdb_table_handler *table_handler; - uint length; - char *tmp_name; - DBUG_ASSERT(table_name != nullptr); - length = (uint)strlen(table_name); + std::string table_name_str(table_name); // First, look up the table in the hash map. RDB_MUTEX_LOCK_CHECK(m_mutex); - if (!m_hash.size() || !(table_handler = m_hash.find(table_name, length))) { + const auto it = m_table_map.find(table_name_str); + if (it != m_table_map.end()) { + // Found it + table_handler = it->second; + } else { + char *tmp_name; + // Since we did not find it in the hash map, attempt to create and add it // to the hash map. if (!(table_handler = reinterpret_cast<Rdb_table_handler *>(my_multi_malloc( MYF(MY_WME | MY_ZEROFILL), &table_handler, sizeof(*table_handler), - &tmp_name, length + 1, NullS)))) { + &tmp_name, table_name_str.length() + 1, NullS)))) { // Allocating a new Rdb_table_handler and a new table name failed. RDB_MUTEX_UNLOCK_CHECK(m_mutex); return nullptr; } table_handler->m_ref_count = 0; - table_handler->m_table_name_length = length; + table_handler->m_table_name_length = table_name_str.length(); table_handler->m_table_name = tmp_name; strmov(table_handler->m_table_name, table_name); - if (m_hash.insert(table_handler)) { - // Inserting into the hash map failed. - RDB_MUTEX_UNLOCK_CHECK(m_mutex); - my_free(table_handler); - return nullptr; - } + m_table_map.emplace(table_name_str, table_handler); thr_lock_init(&table_handler->m_thr_lock); #ifdef MARIAROCKS_NOT_YET @@ -5524,16 +5958,15 @@ std::vector<std::string> rdb_get_open_table_names(void) { } std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const { - size_t i; const Rdb_table_handler *table_handler; std::vector<std::string> names; RDB_MUTEX_LOCK_CHECK(m_mutex); - for (i = 0; (table_handler = m_hash.at(i)); i++) { + for (const auto &kv : m_table_map) { + table_handler = kv.second; DBUG_ASSERT(table_handler != nullptr); names.push_back(table_handler->m_table_name); } - DBUG_ASSERT(i == m_hash.size()); RDB_MUTEX_UNLOCK_CHECK(m_mutex); return names; @@ -5546,44 +5979,44 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const { static ulonglong rdb_get_int_col_max_value(const Field *field) { ulonglong max_value = 0; switch (field->key_type()) { - case HA_KEYTYPE_BINARY: - max_value = 0xFFULL; - break; - case HA_KEYTYPE_INT8: - max_value = 0x7FULL; - break; - case HA_KEYTYPE_USHORT_INT: - max_value = 0xFFFFULL; - break; - case HA_KEYTYPE_SHORT_INT: - max_value = 0x7FFFULL; - break; - case HA_KEYTYPE_UINT24: - max_value = 0xFFFFFFULL; - break; - case HA_KEYTYPE_INT24: - max_value = 0x7FFFFFULL; - break; - case HA_KEYTYPE_ULONG_INT: - max_value = 0xFFFFFFFFULL; - break; - case HA_KEYTYPE_LONG_INT: - max_value = 0x7FFFFFFFULL; - break; - case HA_KEYTYPE_ULONGLONG: - max_value = 0xFFFFFFFFFFFFFFFFULL; - break; - case HA_KEYTYPE_LONGLONG: - max_value = 0x7FFFFFFFFFFFFFFFULL; - break; - case HA_KEYTYPE_FLOAT: - max_value = 0x1000000ULL; - break; - case HA_KEYTYPE_DOUBLE: - max_value = 0x20000000000000ULL; - break; - default: - abort(); + case HA_KEYTYPE_BINARY: + max_value = 0xFFULL; + break; + case HA_KEYTYPE_INT8: + max_value = 0x7FULL; + break; + case HA_KEYTYPE_USHORT_INT: + max_value = 0xFFFFULL; + break; + case HA_KEYTYPE_SHORT_INT: + max_value = 0x7FFFULL; + break; + case HA_KEYTYPE_UINT24: + max_value = 0xFFFFFFULL; + break; + case HA_KEYTYPE_INT24: + max_value = 0x7FFFFFULL; + break; + case HA_KEYTYPE_ULONG_INT: + max_value = 0xFFFFFFFFULL; + break; + case HA_KEYTYPE_LONG_INT: + max_value = 0x7FFFFFFFULL; + break; + case HA_KEYTYPE_ULONGLONG: + max_value = 0xFFFFFFFFFFFFFFFFULL; + break; + case HA_KEYTYPE_LONGLONG: + max_value = 0x7FFFFFFFFFFFFFFFULL; + break; + case HA_KEYTYPE_FLOAT: + max_value = 0x1000000ULL; + break; + case HA_KEYTYPE_DOUBLE: + max_value = 0x20000000000000ULL; + break; + default: + abort(); } return max_value; @@ -5592,7 +6025,7 @@ static ulonglong rdb_get_int_col_max_value(const Field *field) { void ha_rocksdb::load_auto_incr_value() { ulonglong auto_incr = 0; bool validate_last = false, use_datadic = true; -#ifndef NDEBUG +#ifndef DBUG_OFF DBUG_EXECUTE_IF("myrocks_autoinc_upgrade", use_datadic = false;); validate_last = true; #endif @@ -5639,7 +6072,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { // appropriately and non-index-only lookup will not read the value) const bool save_keyread_only = m_keyread_only; m_keyread_only = true; - m_key_requested = true; + m_converter->set_is_key_requested(true); if (!index_last(table->record[0])) { Field *field = @@ -5651,7 +6084,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { if (last_val != max_val) { last_val++; } -#ifndef NDEBUG +#ifndef DBUG_OFF ulonglong dd_val; if (last_val <= max_val) { const auto &gl_index_id = m_tbl_def->get_autoincr_gl_index_id(); @@ -5776,8 +6209,9 @@ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) { // Get hidden primary key from old key slice Rdb_string_reader reader(&rowkey_slice); - if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE))) + if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE))) { return HA_ERR_ROCKSDB_CORRUPT_DATA; + } const int length= 8; /* was Field_longlong::PACK_LENGTH in FB MySQL tree */ const uchar *from = reinterpret_cast<const uchar *>(reader.read(length)); @@ -5805,8 +6239,9 @@ void Rdb_open_tables_map::release_table_handler( DBUG_ASSERT(table_handler->m_ref_count > 0); if (!--table_handler->m_ref_count) { // Last reference was released. Tear down the hash entry. - const auto ret MY_ATTRIBUTE((__unused__)) = m_hash.remove(table_handler); - DBUG_ASSERT(!ret); // the hash entry must actually be found and deleted + const auto ret MY_ATTRIBUTE((__unused__)) = + m_table_map.erase(std::string(table_handler->m_table_name)); + DBUG_ASSERT(ret == 1); // the hash entry must actually be found and deleted my_core::thr_lock_delete(&table_handler->m_thr_lock); my_free(table_handler); } @@ -5822,19 +6257,34 @@ static handler *rocksdb_create_handler(my_core::handlerton *const hton, ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton, my_core::TABLE_SHARE *const table_arg) - : handler(hton, table_arg), m_table_handler(nullptr), m_scan_it(nullptr), - m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr), - m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr), - m_tbl_def(nullptr), m_pk_descr(nullptr), m_key_descr_arr(nullptr), - m_pk_can_be_decoded(false), m_maybe_unpack_info(false), - m_pk_tuple(nullptr), m_pk_packed_tuple(nullptr), - m_sk_packed_tuple(nullptr), m_end_key_packed_tuple(nullptr), - m_sk_match_prefix(nullptr), m_sk_match_prefix_buf(nullptr), - m_sk_packed_tuple_old(nullptr), m_dup_sk_packed_tuple(nullptr), - m_dup_sk_packed_tuple_old(nullptr), m_pack_buffer(nullptr), - m_lock_rows(RDB_LOCK_NONE), m_keyread_only(FALSE), m_encoder_arr(nullptr), - m_row_checksums_checked(0), m_in_rpl_delete_rows(false), - m_in_rpl_update_rows(false), m_force_skip_unique_check(false) {} + : handler(hton, table_arg), + m_table_handler(nullptr), + m_scan_it(nullptr), + m_scan_it_skips_bloom(false), + m_scan_it_snapshot(nullptr), + m_scan_it_lower_bound(nullptr), + m_scan_it_upper_bound(nullptr), + m_tbl_def(nullptr), + m_pk_descr(nullptr), + m_key_descr_arr(nullptr), + m_pk_can_be_decoded(false), + m_pk_tuple(nullptr), + m_pk_packed_tuple(nullptr), + m_sk_packed_tuple(nullptr), + m_end_key_packed_tuple(nullptr), + m_sk_match_prefix(nullptr), + m_sk_match_prefix_buf(nullptr), + m_sk_packed_tuple_old(nullptr), + m_dup_sk_packed_tuple(nullptr), + m_dup_sk_packed_tuple_old(nullptr), + m_pack_buffer(nullptr), + m_lock_rows(RDB_LOCK_NONE), + m_keyread_only(false), + m_insert_with_update(false), + m_dup_pk_found(false), + m_in_rpl_delete_rows(false), + m_in_rpl_update_rows(false), + m_force_skip_unique_check(false) {} const std::string &ha_rocksdb::get_table_basename() const { @@ -5853,9 +6303,9 @@ bool ha_rocksdb::init_with_fields() { if (pk != MAX_KEY) { const uint key_parts = table_share->key_info[pk].user_defined_key_parts; check_keyread_allowed(pk /*PK*/, key_parts - 1, true); - } else + } else { m_pk_can_be_decoded = false; - + } cached_table_flags = table_flags(); DBUG_RETURN(false); /* Ok */ @@ -5912,298 +6362,52 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd, RDB_MAX_HEXDUMP_LEN); const GL_INDEX_ID gl_index_id = kd.get_gl_index_id(); // NO_LINT_DEBUG - sql_print_error("Decoding ttl from PK value failed, " - "for index (%u,%u), val: %s", - gl_index_id.cf_id, gl_index_id.index_id, buf.c_str()); + sql_print_error( + "Decoding ttl from PK value failed, " + "for index (%u,%u), val: %s", + gl_index_id.cf_id, gl_index_id.index_id, buf.c_str()); DBUG_ASSERT(0); return false; } /* Hide record if it has expired before the current snapshot time. */ uint64 read_filter_ts = 0; -#ifndef NDEBUG +#ifndef DBUG_OFF read_filter_ts += rdb_dbug_set_ttl_read_filter_ts(); #endif bool is_hide_ttl = ts + kd.m_ttl_duration + read_filter_ts <= static_cast<uint64>(curr_ts); if (is_hide_ttl) { update_row_stats(ROWS_FILTERED); + + /* increment examined row count when rows are skipped */ + THD *thd = ha_thd(); + thd->inc_examined_row_count(1); + DEBUG_SYNC(thd, "rocksdb.ttl_rows_examined"); } return is_hide_ttl; } -void ha_rocksdb::rocksdb_skip_expired_records(const Rdb_key_def &kd, - rocksdb::Iterator *const iter, - bool seek_backward) { +int ha_rocksdb::rocksdb_skip_expired_records(const Rdb_key_def &kd, + rocksdb::Iterator *const iter, + bool seek_backward) { if (kd.has_ttl()) { + THD *thd = ha_thd(); while (iter->Valid() && should_hide_ttl_rec( kd, iter->value(), get_or_create_tx(table->in_use)->m_snapshot_timestamp)) { - rocksdb_smart_next(seek_backward, iter); - } - } -} - -/** - Convert record from table->record[0] form into a form that can be written - into rocksdb. - - @param pk_packed_slice Packed PK tuple. We need it in order to compute - and store its CRC. - @param packed_rec OUT Data slice with record data. -*/ - -int ha_rocksdb::convert_record_to_storage_format( - const struct update_row_info &row_info, rocksdb::Slice *const packed_rec) { - DBUG_ASSERT_IMP(m_maybe_unpack_info, row_info.new_pk_unpack_info); - DBUG_ASSERT(m_pk_descr != nullptr); - - const rocksdb::Slice &pk_packed_slice = row_info.new_pk_slice; - Rdb_string_writer *const pk_unpack_info = row_info.new_pk_unpack_info; - bool has_ttl = m_pk_descr->has_ttl(); - bool has_ttl_column = !m_pk_descr->m_ttl_column.empty(); - bool ttl_in_pk = has_ttl_column && (row_info.ttl_pk_offset != UINT_MAX); - - m_storage_record.length(0); - - if (has_ttl) { - /* If it's a TTL record, reserve space for 8 byte TTL value in front. */ - m_storage_record.fill(ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_in_rec, 0); - m_ttl_bytes_updated = false; - - /* - If the TTL is contained within the key, we use the offset to find the - TTL value and place it in the beginning of the value record. - */ - if (ttl_in_pk) { - Rdb_string_reader reader(&pk_packed_slice); - const char *ts; - if (!reader.read(row_info.ttl_pk_offset) || - !(ts = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) { - std::string buf; - buf = rdb_hexdump(pk_packed_slice.data(), pk_packed_slice.size(), - RDB_MAX_HEXDUMP_LEN); - const GL_INDEX_ID gl_index_id = m_pk_descr->get_gl_index_id(); - // NO_LINT_DEBUG - sql_print_error("Decoding ttl from PK failed during insert, " - "for index (%u,%u), key: %s", - gl_index_id.cf_id, gl_index_id.index_id, buf.c_str()); - return HA_EXIT_FAILURE; - } - - char *const data = const_cast<char *>(m_storage_record.ptr()); - memcpy(data, ts, ROCKSDB_SIZEOF_TTL_RECORD); -#ifndef NDEBUG - // Adjust for test case if needed - rdb_netbuf_store_uint64( - reinterpret_cast<uchar *>(data), - rdb_netbuf_to_uint64(reinterpret_cast<const uchar *>(data)) + - rdb_dbug_set_ttl_rec_ts()); -#endif - // Also store in m_ttl_bytes to propagate to update_sk - memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); - } else if (!has_ttl_column) { - /* - For implicitly generated TTL records we need to copy over the old - TTL value from the old record in the event of an update. It was stored - in m_ttl_bytes. - - Otherwise, generate a timestamp using the current time. - */ - if (!row_info.old_pk_slice.empty()) { - char *const data = const_cast<char *>(m_storage_record.ptr()); - memcpy(data, m_ttl_bytes, sizeof(uint64)); - } else { - uint64 ts = static_cast<uint64>(std::time(nullptr)); -#ifndef NDEBUG - ts += rdb_dbug_set_ttl_rec_ts(); -#endif - char *const data = const_cast<char *>(m_storage_record.ptr()); - rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts); - // Also store in m_ttl_bytes to propagate to update_sk - memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); - } - } - } else { - /* All NULL bits are initially 0 */ - m_storage_record.fill(m_null_bytes_in_rec, 0); - } - - // If a primary key may have non-empty unpack_info for certain values, - // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block - // itself was prepared in Rdb_key_def::pack_record. - if (m_maybe_unpack_info) { - m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()), - pk_unpack_info->get_current_pos()); - } - - for (uint i = 0; i < table->s->fields; i++) { - /* Don't pack decodable PK key parts */ - if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { - continue; - } - - Field *const field = table->field[i]; - if (m_encoder_arr[i].maybe_null()) { - char *data = const_cast<char *>(m_storage_record.ptr()); - if (has_ttl) { - data += ROCKSDB_SIZEOF_TTL_RECORD; - } - - if (field->is_null()) { - data[m_encoder_arr[i].m_null_offset] |= m_encoder_arr[i].m_null_mask; - /* Don't write anything for NULL values */ - continue; - } - } - - if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB) { - my_core::Field_blob *blob = (my_core::Field_blob *)field; - /* Get the number of bytes needed to store length*/ - const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; - - /* Store the length of the value */ - m_storage_record.append(reinterpret_cast<char *>(blob->ptr), - length_bytes); - - /* Store the blob value itself */ - char *data_ptr; - memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **)); - m_storage_record.append(data_ptr, blob->get_length()); - } else if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) { - Field_varstring *const field_var = (Field_varstring *)field; - uint data_len; - /* field_var->length_bytes is 1 or 2 */ - if (field_var->length_bytes == 1) { - data_len = field_var->ptr[0]; - } else { - DBUG_ASSERT(field_var->length_bytes == 2); - data_len = uint2korr(field_var->ptr); - } - m_storage_record.append(reinterpret_cast<char *>(field_var->ptr), - field_var->length_bytes + data_len); - } else { - /* Copy the field data */ - const uint len = field->pack_length_in_rec(); - m_storage_record.append(reinterpret_cast<char *>(field->ptr), len); - - /* - Check if this is the TTL field within the table, if so store the TTL - in the front of the record as well here. - */ - if (has_ttl && has_ttl_column && - i == m_pk_descr->get_ttl_field_offset()) { - DBUG_ASSERT(len == ROCKSDB_SIZEOF_TTL_RECORD); - DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG); - DBUG_ASSERT(m_pk_descr->get_ttl_field_offset() != UINT_MAX); - - char *const data = const_cast<char *>(m_storage_record.ptr()); - uint64 ts = uint8korr(field->ptr); -#ifndef NDEBUG - ts += rdb_dbug_set_ttl_rec_ts(); -#endif - rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts); - - // If this is an update and the timestamp has been updated, take note - // so we can avoid updating SKs unnecessarily. - if (!row_info.old_pk_slice.empty()) { - m_ttl_bytes_updated = - memcmp(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); - } - // Store timestamp in m_ttl_bytes to propagate to update_sk - memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD); + DEBUG_SYNC(thd, "rocksdb.check_flags_ser"); + if (thd && thd->killed) { + return HA_ERR_QUERY_INTERRUPTED; } + rocksdb_smart_next(seek_backward, iter); } } - - if (should_store_row_debug_checksums()) { - const uint32_t key_crc32 = my_core::crc32( - 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size()); - const uint32_t val_crc32 = - my_core::crc32(0, rdb_mysql_str_to_uchar_str(&m_storage_record), - m_storage_record.length()); - uchar key_crc_buf[RDB_CHECKSUM_SIZE]; - uchar val_crc_buf[RDB_CHECKSUM_SIZE]; - rdb_netbuf_store_uint32(key_crc_buf, key_crc32); - rdb_netbuf_store_uint32(val_crc_buf, val_crc32); - m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1); - m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE); - m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE); - } - - *packed_rec = - rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length()); - return HA_EXIT_SUCCESS; } -/* - @brief - Setup which fields will be unpacked when reading rows - - @detail - Three special cases when we still unpack all fields: - - When this table is being updated (m_lock_rows==RDB_LOCK_WRITE). - - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to - read all fields to find whether there is a row checksum at the end. We could - skip the fields instead of decoding them, but currently we do decoding.) - - On index merge as bitmap is cleared during that operation - - @seealso - ha_rocksdb::setup_field_converters() - ha_rocksdb::convert_record_from_storage_format() -*/ -void ha_rocksdb::setup_read_decoders() { - m_decoders_vect.clear(); - m_key_requested = false; - - int last_useful = 0; - int skip_size = 0; - - for (uint i = 0; i < table->s->fields; i++) { - // bitmap is cleared on index merge, but it still needs to decode columns - const bool field_requested = - m_lock_rows == RDB_LOCK_WRITE || m_verify_row_debug_checksums || - bitmap_is_clear_all(table->read_set) || - bitmap_is_set(table->read_set, table->field[i]->field_index); - - // We only need the decoder if the whole record is stored. - if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) { - // the field potentially needs unpacking - if (field_requested) { - // the field is in the read set - m_key_requested = true; - } - continue; - } - - if (field_requested) { - // We will need to decode this field - m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size}); - last_useful = m_decoders_vect.size(); - skip_size = 0; - } else { - if (m_encoder_arr[i].uses_variable_len_encoding() || - m_encoder_arr[i].maybe_null()) { - // For variable-length field, we need to read the data and skip it - m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size}); - skip_size = 0; - } else { - // Fixed-width field can be skipped without looking at it. - // Add appropriate skip_size to the next field. - skip_size += m_encoder_arr[i].m_pack_length_in_rec; - } - } - } - - // It could be that the last few elements are varchars that just do - // skipping. Remove them. - m_decoders_vect.erase(m_decoders_vect.begin() + last_useful, - m_decoders_vect.end()); -} - -#ifndef NDEBUG +#ifndef DBUG_OFF void dbug_append_garbage_at_end(rocksdb::PinnableSlice *on_disk_rec) { std::string str(on_disk_rec->data(), on_disk_rec->size()); on_disk_rec->Reset(); @@ -6228,17 +6432,6 @@ void dbug_modify_rec_varchar12(rocksdb::PinnableSlice *on_disk_rec) { on_disk_rec->PinSelf(rocksdb::Slice(res)); } -void dbug_modify_key_varchar8(String &on_disk_rec) { - std::string res; - // The key starts with index number - res.append(on_disk_rec.ptr(), Rdb_key_def::INDEX_NUMBER_SIZE); - - // Then, a mem-comparable form of a varchar(8) value. - res.append("ABCDE\0\0\0\xFC", 9); - on_disk_rec.length(0); - on_disk_rec.append(res.data(), res.size()); -} - void dbug_create_err_inplace_alter() { my_printf_error(ER_UNKNOWN_ERROR, "Intentional failure in inplace alter occurred.", MYF(0)); @@ -6247,7 +6440,6 @@ void dbug_create_err_inplace_alter() { int ha_rocksdb::convert_record_from_storage_format( const rocksdb::Slice *const key, uchar *const buf) { - DBUG_EXECUTE_IF("myrocks_simulate_bad_row_read1", dbug_append_garbage_at_end(&m_retrieved_record);); DBUG_EXECUTE_IF("myrocks_simulate_bad_row_read2", @@ -6258,91 +6450,6 @@ int ha_rocksdb::convert_record_from_storage_format( return convert_record_from_storage_format(key, &m_retrieved_record, buf); } -int ha_rocksdb::convert_blob_from_storage_format( - my_core::Field_blob *const blob, - Rdb_string_reader *const reader, - bool decode) -{ - /* Get the number of bytes needed to store length*/ - const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr; - - const char *data_len_str; - if (!(data_len_str = reader->read(length_bytes))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - memcpy(blob->ptr, data_len_str, length_bytes); - - const uint32 data_len = blob->get_length( - reinterpret_cast<const uchar*>(data_len_str), length_bytes); - const char *blob_ptr; - if (!(blob_ptr = reader->read(data_len))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) { - // set 8-byte pointer to 0, like innodb does (relevant for 32-bit - // platforms) - memset(blob->ptr + length_bytes, 0, 8); - memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **)); - } - - return HA_EXIT_SUCCESS; -} - -int ha_rocksdb::convert_varchar_from_storage_format( - my_core::Field_varstring *const field_var, - Rdb_string_reader *const reader, - bool decode) -{ - const char *data_len_str; - if (!(data_len_str = reader->read(field_var->length_bytes))) - return HA_ERR_ROCKSDB_CORRUPT_DATA; - - uint data_len; - /* field_var->length_bytes is 1 or 2 */ - if (field_var->length_bytes == 1) { - data_len = (uchar)data_len_str[0]; - } else { - DBUG_ASSERT(field_var->length_bytes == 2); - data_len = uint2korr(data_len_str); - } - - if (data_len > field_var->field_length) { - /* The data on disk is longer than table DDL allows? */ - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (!reader->read(data_len)) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) { - memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len); - } - - return HA_EXIT_SUCCESS; -} - -int ha_rocksdb::convert_field_from_storage_format( - my_core::Field *const field, - Rdb_string_reader *const reader, - bool decode, - uint len) -{ - const char *data_bytes; - if (len > 0) { - if ((data_bytes = reader->read(len)) == nullptr) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (decode) - memcpy(field->ptr, data_bytes, len); - } - - return HA_EXIT_SUCCESS; -} - /* @brief Unpack the record in this->m_retrieved_record and this->m_last_rowkey from @@ -6359,8 +6466,8 @@ int ha_rocksdb::convert_field_from_storage_format( m_retrieved_record). @seealso - ha_rocksdb::setup_read_decoders() Sets up data structures which tell which - columns to decode. + rdb_converter::setup_read_decoders() Sets up data structures which tell + which columns to decode. @return 0 OK @@ -6370,241 +6477,7 @@ int ha_rocksdb::convert_field_from_storage_format( int ha_rocksdb::convert_record_from_storage_format( const rocksdb::Slice *const key, const rocksdb::Slice *const value, uchar *const buf) { - Rdb_string_reader reader(value); - - /* - Decode PK fields from the key - */ - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1", - dbug_modify_key_varchar8(m_last_rowkey);); - - const rocksdb::Slice rowkey_slice(m_last_rowkey.ptr(), - m_last_rowkey.length()); - const char *unpack_info = nullptr; - uint16 unpack_info_len = 0; - rocksdb::Slice unpack_slice; - - /* If it's a TTL record, skip the 8 byte TTL value */ - const char *ttl_bytes; - if (m_pk_descr->has_ttl()) { - if ((ttl_bytes = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) { - memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD); - } else { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - } - - /* Other fields are decoded from the value */ - const char *null_bytes = nullptr; - if (m_null_bytes_in_rec && !(null_bytes = reader.read(m_null_bytes_in_rec))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - if (m_maybe_unpack_info) { - unpack_info = reader.get_current_ptr(); - if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) || - !reader.read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - unpack_info_len = - rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1)); - unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len); - - reader.read(unpack_info_len - - Rdb_key_def::get_unpack_header_size(unpack_info[0])); - } - - int err = HA_EXIT_SUCCESS; - if (m_key_requested) { - err = m_pk_descr->unpack_record(table, buf, &rowkey_slice, - unpack_info ? &unpack_slice : nullptr, - false /* verify_checksum */); - } - - if (err != HA_EXIT_SUCCESS) { - return err; - } - - for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) { - const Rdb_field_encoder *const field_dec = it->m_field_enc; - const bool decode = it->m_decode; - const bool isNull = - field_dec->maybe_null() && - ((null_bytes[field_dec->m_null_offset] & field_dec->m_null_mask) != 0); - - Field *const field = table->field[field_dec->m_field_index]; - - /* Skip the bytes we need to skip */ - if (it->m_skip && !reader.read(it->m_skip)) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - uint field_offset = field->ptr - table->record[0]; - uint null_offset = field->null_offset(); - bool maybe_null = field->real_maybe_null(); - field->move_field(buf + field_offset, - maybe_null ? buf + null_offset : nullptr, - field->null_bit); - // WARNING! - Don't return before restoring field->ptr and field->null_ptr! - - if (isNull) { - if (decode) { - /* This sets the NULL-bit of this record */ - field->set_null(); - /* - Besides that, set the field value to default value. CHECKSUM TABLE - depends on this. - */ - memcpy(field->ptr, table->s->default_values + field_offset, - field->pack_length()); - } - } else { - if (decode) { - field->set_notnull(); - } - - if (field_dec->m_field_type == MYSQL_TYPE_BLOB) { - err = convert_blob_from_storage_format( - (my_core::Field_blob *) field, &reader, decode); - } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) { - err = convert_varchar_from_storage_format( - (my_core::Field_varstring *) field, &reader, decode); - } else { - err = convert_field_from_storage_format( - field, &reader, decode, field_dec->m_pack_length_in_rec); - } - } - - // Restore field->ptr and field->null_ptr - field->move_field(table->record[0] + field_offset, - maybe_null ? table->record[0] + null_offset : nullptr, - field->null_bit); - - if (err != HA_EXIT_SUCCESS) { - return err; - } - } - - if (m_verify_row_debug_checksums) { - if (reader.remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE && - reader.read(1)[0] == RDB_CHECKSUM_DATA_TAG) { - uint32_t stored_key_chksum = - rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE)); - uint32_t stored_val_chksum = - rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE)); - - const uint32_t computed_key_chksum = - my_core::crc32(0, rdb_slice_to_uchar_ptr(key), key->size()); - const uint32_t computed_val_chksum = - my_core::crc32(0, rdb_slice_to_uchar_ptr(value), - value->size() - RDB_CHECKSUM_CHUNK_SIZE); - - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", - stored_key_chksum++;); - - if (stored_key_chksum != computed_key_chksum) { - m_pk_descr->report_checksum_mismatch(true, key->data(), key->size()); - return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; - } - - DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", - stored_val_chksum++;); - if (stored_val_chksum != computed_val_chksum) { - m_pk_descr->report_checksum_mismatch(false, value->data(), - value->size()); - return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; - } - - m_row_checksums_checked++; - } - if (reader.remaining_bytes()) - return HA_ERR_ROCKSDB_CORRUPT_DATA; - } - - return HA_EXIT_SUCCESS; -} - -void ha_rocksdb::get_storage_type(Rdb_field_encoder *const encoder, - const uint &kp) { - // STORE_SOME uses unpack_info. - if (m_pk_descr->has_unpack_info(kp)) { - DBUG_ASSERT(m_pk_descr->can_unpack(kp)); - encoder->m_storage_type = Rdb_field_encoder::STORE_SOME; - m_maybe_unpack_info = true; - } else if (m_pk_descr->can_unpack(kp)) { - encoder->m_storage_type = Rdb_field_encoder::STORE_NONE; - } -} - -/* - Setup data needed to convert table->record[] to and from record storage - format. - - @seealso - ha_rocksdb::convert_record_to_storage_format, - ha_rocksdb::convert_record_from_storage_format -*/ - -void ha_rocksdb::setup_field_converters() { - uint i; - uint null_bytes = 0; - uchar cur_null_mask = 0x1; - - DBUG_ASSERT(m_encoder_arr == nullptr); - m_encoder_arr = static_cast<Rdb_field_encoder *>( - my_malloc(table->s->fields * sizeof(Rdb_field_encoder), MYF(0))); - if (m_encoder_arr == nullptr) { - return; - } - - for (i = 0; i < table->s->fields; i++) { - Field *const field = table->field[i]; - m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL; - - /* - Check if this field is - - a part of primary key, and - - it can be decoded back from its key image. - If both hold, we don't need to store this field in the value part of - RocksDB's key-value pair. - - If hidden pk exists, we skip this check since the field will never be - part of the hidden pk. - */ - if (!has_hidden_pk(table)) { - KEY *const pk_info = &table->key_info[table->s->primary_key]; - for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) { - /* key_part->fieldnr is counted from 1 */ - if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) { - get_storage_type(&m_encoder_arr[i], kp); - break; - } - } - } - - m_encoder_arr[i].m_field_type = field->real_type(); - m_encoder_arr[i].m_field_index = i; - m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec(); - - if (field->real_maybe_null()) { - m_encoder_arr[i].m_null_mask = cur_null_mask; - m_encoder_arr[i].m_null_offset = null_bytes; - if (cur_null_mask == 0x80) { - cur_null_mask = 0x1; - null_bytes++; - } else - cur_null_mask = cur_null_mask << 1; - } else { - m_encoder_arr[i].m_null_mask = 0; - } - } - - /* Count the last, unfinished NULL-bits byte */ - if (cur_null_mask != 0x1) - null_bytes++; - - m_null_bytes_in_rec = null_bytes; + return m_converter->decode(m_pk_descr, buf, key, value); } int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, @@ -6641,8 +6514,8 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, /* Sometimes, we may use m_sk_packed_tuple for storing packed PK */ max_packed_sk_len = pack_key_len; for (uint i = 0; i < table_arg->s->keys; i++) { - if (i == table_arg->s->primary_key) /* Primary key was processed above */ - continue; + /* Primary key was processed above */ + if (i == table_arg->s->primary_key) continue; // TODO: move this into get_table_handler() ?? kd_arr[i]->setup(table_arg, tbl_def_arg); @@ -6731,25 +6604,10 @@ void ha_rocksdb::free_key_buffers() { } #ifdef MARIAROCKS_NOT_YET -void ha_rocksdb::set_use_read_free_rpl(const char *const whitelist) { - DBUG_ASSERT(whitelist != nullptr); - -#if defined(HAVE_PSI_INTERFACE) - Regex_list_handler regex_handler(key_rwlock_read_free_rpl_tables); -#else - Regex_list_handler regex_handler; -#endif - - if (!regex_handler.set_patterns(whitelist)) { - warn_about_bad_patterns(®ex_handler, "read_free_rpl_tables"); - } - - m_use_read_free_rpl = regex_handler.matches(m_tbl_def->base_tablename()); -} #endif - void ha_rocksdb::set_skip_unique_check_tables(const char *const whitelist) { - DBUG_ASSERT(whitelist != nullptr); + const char *const wl = + whitelist ? whitelist : DEFAULT_SKIP_UNIQUE_CHECK_TABLES; #if defined(HAVE_PSI_INTERFACE) Regex_list_handler regex_handler(key_rwlock_skip_unique_check_tables); @@ -6757,7 +6615,7 @@ void ha_rocksdb::set_skip_unique_check_tables(const char *const whitelist) { Regex_list_handler regex_handler; #endif - if (!regex_handler.set_patterns(whitelist)) { + if (!regex_handler.set_patterns(wl)) { warn_about_bad_patterns(®ex_handler, "skip_unique_check_tables"); } @@ -6804,8 +6662,8 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) { "dictionary"); DBUG_RETURN(HA_ERR_ROCKSDB_INVALID_TABLE); } - m_lock_rows = RDB_LOCK_NONE; + m_lock_rows = RDB_LOCK_NONE; m_key_descr_arr = m_tbl_def->m_key_descr_arr; /* @@ -6834,7 +6692,15 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) { */ init_with_fields(); - setup_field_converters(); + /* Initialize decoder */ + m_converter = std::make_shared<Rdb_converter>(ha_thd(), m_tbl_def, table); + + /* + Update m_ttl_bytes address to same as Rdb_converter's m_ttl_bytes. + Remove this code after moving convert_record_to_storage_format() into + Rdb_converter class. + */ + m_ttl_bytes = m_converter->get_ttl_bytes_buffer(); /* MariaDB: adjust field->part_of_key for PK columns. We can only do it here @@ -6877,11 +6743,10 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) { The following load_XXX code calls row decode functions, and they do that without having done ::external_lock() or index_init()/rnd_init(). (Note: this also means we're doing a read when there was no - setup_field_converters() call) + rdb_converter::setup_field_encoders() call) Initialize the necessary variables for them: */ - m_verify_row_debug_checksums = false; /* Load auto_increment value only once on first use. */ if (table->found_next_number_field && m_tbl_def->m_auto_incr_val == 0) { @@ -6899,10 +6764,7 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) { stats.block_size = rocksdb_tbl_options->block_size; #ifdef MARIAROCKS_NOT_YET // MDEV-10976 - /* Determine at open whether we can use Read Free Replication or not */ - set_use_read_free_rpl(THDVAR(ha_thd(), read_free_rpl_tables)); #endif - /* Determine at open whether we should skip unique checks for this table */ set_skip_unique_check_tables(THDVAR(ha_thd(), skip_unique_check_tables)); @@ -6914,19 +6776,15 @@ int ha_rocksdb::close(void) { m_pk_descr = nullptr; m_key_descr_arr = nullptr; - + m_converter = nullptr; free_key_buffers(); - my_free(m_encoder_arr); - m_encoder_arr = nullptr; - if (m_table_handler != nullptr) { rdb_open_tables.release_table_handler(m_table_handler); m_table_handler = nullptr; } // These are needed to suppress valgrind errors in rocksdb.partition - m_storage_record.free(); m_last_rowkey.free(); m_sk_tails.free(); m_sk_tails_old.free(); @@ -7017,61 +6875,72 @@ int ha_rocksdb::rdb_error_to_mysql(const rocksdb::Status &s, int err; switch (s.code()) { - case rocksdb::Status::Code::kOk: - err = HA_EXIT_SUCCESS; - break; - case rocksdb::Status::Code::kNotFound: - err = HA_ERR_ROCKSDB_STATUS_NOT_FOUND; - break; - case rocksdb::Status::Code::kCorruption: - err = HA_ERR_ROCKSDB_STATUS_CORRUPTION; - break; - case rocksdb::Status::Code::kNotSupported: - err = HA_ERR_ROCKSDB_STATUS_NOT_SUPPORTED; - break; - case rocksdb::Status::Code::kInvalidArgument: - err = HA_ERR_ROCKSDB_STATUS_INVALID_ARGUMENT; - break; - case rocksdb::Status::Code::kIOError: - err = (s.IsNoSpace()) ? HA_ERR_ROCKSDB_STATUS_NO_SPACE - : HA_ERR_ROCKSDB_STATUS_IO_ERROR; - break; - case rocksdb::Status::Code::kMergeInProgress: - err = HA_ERR_ROCKSDB_STATUS_MERGE_IN_PROGRESS; - break; - case rocksdb::Status::Code::kIncomplete: - err = HA_ERR_ROCKSDB_STATUS_INCOMPLETE; - break; - case rocksdb::Status::Code::kShutdownInProgress: - err = HA_ERR_ROCKSDB_STATUS_SHUTDOWN_IN_PROGRESS; - break; - case rocksdb::Status::Code::kTimedOut: - err = HA_ERR_ROCKSDB_STATUS_TIMED_OUT; - break; - case rocksdb::Status::Code::kAborted: - err = (s.IsLockLimit()) ? HA_ERR_ROCKSDB_STATUS_LOCK_LIMIT - : HA_ERR_ROCKSDB_STATUS_ABORTED; - break; - case rocksdb::Status::Code::kBusy: - err = (s.IsDeadlock()) ? HA_ERR_ROCKSDB_STATUS_DEADLOCK - : HA_ERR_ROCKSDB_STATUS_BUSY; - break; - case rocksdb::Status::Code::kExpired: - err = HA_ERR_ROCKSDB_STATUS_EXPIRED; - break; - case rocksdb::Status::Code::kTryAgain: - err = HA_ERR_ROCKSDB_STATUS_TRY_AGAIN; - break; - default: - DBUG_ASSERT(0); - return -1; + case rocksdb::Status::Code::kOk: + err = HA_EXIT_SUCCESS; + break; + case rocksdb::Status::Code::kNotFound: + err = HA_ERR_ROCKSDB_STATUS_NOT_FOUND; + break; + case rocksdb::Status::Code::kCorruption: + err = HA_ERR_ROCKSDB_STATUS_CORRUPTION; + break; + case rocksdb::Status::Code::kNotSupported: + err = HA_ERR_ROCKSDB_STATUS_NOT_SUPPORTED; + break; + case rocksdb::Status::Code::kInvalidArgument: + err = HA_ERR_ROCKSDB_STATUS_INVALID_ARGUMENT; + break; + case rocksdb::Status::Code::kIOError: + err = (s.IsNoSpace()) ? HA_ERR_ROCKSDB_STATUS_NO_SPACE + : HA_ERR_ROCKSDB_STATUS_IO_ERROR; + break; + case rocksdb::Status::Code::kMergeInProgress: + err = HA_ERR_ROCKSDB_STATUS_MERGE_IN_PROGRESS; + break; + case rocksdb::Status::Code::kIncomplete: + err = HA_ERR_ROCKSDB_STATUS_INCOMPLETE; + break; + case rocksdb::Status::Code::kShutdownInProgress: + err = HA_ERR_ROCKSDB_STATUS_SHUTDOWN_IN_PROGRESS; + break; + case rocksdb::Status::Code::kTimedOut: + err = HA_ERR_ROCKSDB_STATUS_TIMED_OUT; + break; + case rocksdb::Status::Code::kAborted: + err = (s.IsLockLimit()) ? HA_ERR_ROCKSDB_STATUS_LOCK_LIMIT + : HA_ERR_ROCKSDB_STATUS_ABORTED; + break; + case rocksdb::Status::Code::kBusy: + err = (s.IsDeadlock()) ? HA_ERR_ROCKSDB_STATUS_DEADLOCK + : HA_ERR_ROCKSDB_STATUS_BUSY; + break; + case rocksdb::Status::Code::kExpired: + err = HA_ERR_ROCKSDB_STATUS_EXPIRED; + break; + case rocksdb::Status::Code::kTryAgain: + err = HA_ERR_ROCKSDB_STATUS_TRY_AGAIN; + break; + default: + DBUG_ASSERT(0); + return -1; + } + + std::string errMsg; + if (s.IsLockLimit()) { + errMsg = + "Operation aborted: Failed to acquire lock due to " + "rocksdb_max_row_locks limit"; + } else { + errMsg = s.ToString(); } if (opt_msg) { - std::string concatenated_error = s.ToString() + " (" + std::string(opt_msg) + ")"; - my_error(ER_GET_ERRMSG, MYF(0), s.code(), concatenated_error.c_str(), rocksdb_hton_name); + std::string concatenated_error = errMsg + " (" + std::string(opt_msg) + ")"; + my_error(ER_GET_ERRMSG, MYF(0), s.code(), concatenated_error.c_str(), + rocksdb_hton_name); } else { - my_error(ER_GET_ERRMSG, MYF(0), s.code(), s.ToString().c_str(), rocksdb_hton_name); + my_error(ER_GET_ERRMSG, MYF(0), s.code(), errMsg.c_str(), + rocksdb_hton_name); } return err; @@ -7081,8 +6950,8 @@ int ha_rocksdb::rdb_error_to_mysql(const rocksdb::Status &s, static const std::set<uint> RDB_INDEX_COLLATIONS = { COLLATION_BINARY, COLLATION_UTF8_BIN, COLLATION_LATIN1_BIN}; -static bool -rdb_is_index_collation_supported(const my_core::Field *const field) { +static bool rdb_is_index_collation_supported( + const my_core::Field *const field) { const my_core::enum_field_types type = field->real_type(); /* Handle [VAR](CHAR|BINARY) or TEXT|BLOB */ if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING || @@ -7142,8 +7011,6 @@ int ha_rocksdb::create_key_defs( DBUG_ASSERT(table_arg->s != nullptr); - uint i; - /* These need to be one greater than MAX_INDEXES since the user can create MAX_INDEXES secondary keys and no primary key which would cause us @@ -7160,6 +7027,36 @@ int ha_rocksdb::create_key_defs( DBUG_RETURN(HA_EXIT_FAILURE); } + uint64 ttl_duration = 0; + std::string ttl_column; + uint ttl_field_offset; + + uint err; + if ((err = Rdb_key_def::extract_ttl_duration(table_arg, tbl_def_arg, + &ttl_duration))) { + DBUG_RETURN(err); + } + + if ((err = Rdb_key_def::extract_ttl_col(table_arg, tbl_def_arg, &ttl_column, + &ttl_field_offset))) { + DBUG_RETURN(err); + } + + /* We don't currently support TTL on tables with hidden primary keys. */ + if (ttl_duration > 0 && has_hidden_pk(table_arg)) { + my_error(ER_RDB_TTL_UNSUPPORTED, MYF(0)); + DBUG_RETURN(HA_EXIT_FAILURE); + } + + /* + If TTL duration is not specified but TTL column was specified, throw an + error because TTL column requires duration. + */ + if (ttl_duration == 0 && !ttl_column.empty()) { + my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_column.c_str()); + DBUG_RETURN(HA_EXIT_FAILURE); + } + if (!old_tbl_def_arg) { /* old_tbl_def doesn't exist. this means we are in the process of creating @@ -7168,9 +7065,9 @@ int ha_rocksdb::create_key_defs( Get the index numbers (this will update the next_index_number) and create Rdb_key_def structures. */ - for (i = 0; i < tbl_def_arg->m_key_count; i++) { - if (create_key_def(table_arg, i, tbl_def_arg, &m_key_descr_arr[i], - cfs[i])) { + for (uint i = 0; i < tbl_def_arg->m_key_count; i++) { + if (create_key_def(table_arg, i, tbl_def_arg, &m_key_descr_arr[i], cfs[i], + ttl_duration, ttl_column)) { DBUG_RETURN(HA_EXIT_FAILURE); } } @@ -7181,7 +7078,8 @@ int ha_rocksdb::create_key_defs( generate the necessary new key definitions if any. */ if (create_inplace_key_defs(table_arg, tbl_def_arg, old_table_arg, - old_tbl_def_arg, cfs)) { + old_tbl_def_arg, cfs, ttl_duration, + ttl_column)) { DBUG_RETURN(HA_EXIT_FAILURE); } } @@ -7267,8 +7165,8 @@ int ha_rocksdb::create_cfs( // Generate the name for the column family to use. bool per_part_match_found = false; - std::string cf_name = generate_cf_name(i, table_arg, tbl_def_arg, - &per_part_match_found); + std::string cf_name = + generate_cf_name(i, table_arg, tbl_def_arg, &per_part_match_found); // Prevent create from using the system column family. if (cf_name == DEFAULT_SYSTEM_CF_NAME) { @@ -7313,7 +7211,8 @@ int ha_rocksdb::create_cfs( int ha_rocksdb::create_inplace_key_defs( const TABLE *const table_arg, Rdb_tbl_def *const tbl_def_arg, const TABLE *const old_table_arg, const Rdb_tbl_def *const old_tbl_def_arg, - const std::array<key_def_cf_info, MAX_INDEXES + 1> &cfs) const { + const std::array<key_def_cf_info, MAX_INDEXES + 1> &cfs, + uint64 ttl_duration, const std::string &ttl_column) const { DBUG_ENTER_FUNC(); std::shared_ptr<Rdb_key_def> *const old_key_descr = @@ -7339,10 +7238,11 @@ int ha_rocksdb::create_inplace_key_defs( struct Rdb_index_info index_info; if (!dict_manager.get_index_info(gl_index_id, &index_info)) { // NO_LINT_DEBUG - sql_print_error("RocksDB: Could not get index information " - "for Index Number (%u,%u), table %s", - gl_index_id.cf_id, gl_index_id.index_id, - old_tbl_def_arg->full_tablename().c_str()); + sql_print_error( + "RocksDB: Could not get index information " + "for Index Number (%u,%u), table %s", + gl_index_id.cf_id, gl_index_id.index_id, + old_tbl_def_arg->full_tablename().c_str()); DBUG_RETURN(HA_EXIT_FAILURE); } @@ -7366,7 +7266,7 @@ int ha_rocksdb::create_inplace_key_defs( dict_manager.get_stats(gl_index_id), index_info.m_index_flags, ttl_rec_offset, index_info.m_ttl_duration); } else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i], - cfs[i])) { + cfs[i], ttl_duration, ttl_column)) { DBUG_RETURN(HA_EXIT_FAILURE); } @@ -7516,44 +7416,16 @@ int ha_rocksdb::compare_key_parts(const KEY *const old_key, 0 - Ok other - error, either given table ddl is not supported by rocksdb or OOM. */ -int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i, +int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint i, const Rdb_tbl_def *const tbl_def_arg, std::shared_ptr<Rdb_key_def> *const new_key_def, - const struct key_def_cf_info &cf_info) const { + const struct key_def_cf_info &cf_info, + uint64 ttl_duration, + const std::string &ttl_column) const { DBUG_ENTER_FUNC(); DBUG_ASSERT(*new_key_def == nullptr); - uint64 ttl_duration = 0; - std::string ttl_column; - uint ttl_field_offset; - - uint err; - if ((err = Rdb_key_def::extract_ttl_duration(table_arg, tbl_def_arg, - &ttl_duration))) { - DBUG_RETURN(err); - } - - if ((err = Rdb_key_def::extract_ttl_col(table_arg, tbl_def_arg, &ttl_column, - &ttl_field_offset))) { - DBUG_RETURN(err); - } - - /* We don't currently support TTL on tables with hidden primary keys. */ - if (ttl_duration > 0 && is_hidden_pk(i, table_arg, tbl_def_arg)) { - my_error(ER_RDB_TTL_UNSUPPORTED, MYF(0)); - DBUG_RETURN(HA_EXIT_FAILURE); - } - - /* - If TTL duration is not specified but TTL column was specified, throw an - error because TTL column requires duration. - */ - if (ttl_duration == 0 && !ttl_column.empty()) { - my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_column.c_str()); - DBUG_RETURN(HA_EXIT_FAILURE); - } - const uint index_id = ddl_manager.get_and_update_next_number(&dict_manager); const uint16_t index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST; uchar index_type; @@ -7602,7 +7474,8 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i, if (!ttl_column.empty()) { (*new_key_def)->m_ttl_column = ttl_column; } - + // initialize key_def + (*new_key_def)->setup(table_arg, tbl_def_arg); DBUG_RETURN(HA_EXIT_SUCCESS); } @@ -7610,7 +7483,7 @@ int rdb_normalize_tablename(const std::string &tablename, std::string *const strbuf) { if (tablename.size() < 2 || tablename[0] != '.' || (tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) { - DBUG_ASSERT(0); // We were not passed table name? + DBUG_ASSERT(0); // We were not passed table name? return HA_ERR_ROCKSDB_INVALID_TABLE; } @@ -7620,7 +7493,7 @@ int rdb_normalize_tablename(const std::string &tablename, } if (pos == std::string::npos) { - DBUG_ASSERT(0); // We were not passed table name? + DBUG_ASSERT(0); // We were not passed table name? return HA_ERR_ROCKSDB_INVALID_TABLE; } @@ -7734,95 +7607,34 @@ int rdb_split_normalized_tablename(const std::string &fullname, return HA_EXIT_SUCCESS; } -/** - @brief - create() is called to create a database. The variable name will have the name - of the table. - - @details - When create() is called you do not need to worry about - opening the table. Also, the .frm file will have already been - created so adjusting create_info is not necessary. You can overwrite - the .frm file at this point if you wish to change the table - definition, but there are no methods currently provided for doing - so. +/* + Create a table's Rdb_tbl_def and its Rdb_key_defs and store table information + into MyRocks Data Dictionary + The method is called during create table/partition, truncate table/partition - Called from handle.cc by ha_create_table(). + @param table_name IN table's name formated as + 'dbname.tablename' + @param table_arg IN sql table + @param auto_increment_value IN specified table's auto increment value @return HA_EXIT_SUCCESS OK other HA_ERR error code (can be SE-specific) - - @see - ha_create_table() in handle.cc */ - -int ha_rocksdb::create(const char *const name, TABLE *const table_arg, - HA_CREATE_INFO *const create_info) { +int ha_rocksdb::create_table(const std::string &table_name, + const TABLE *table_arg, + ulonglong auto_increment_value) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(table_arg != nullptr); - DBUG_ASSERT(create_info != nullptr); - if (create_info->data_file_name) { - // DATA DIRECTORY is used to create tables under a specific location - // outside the MySQL data directory. We don't support this for MyRocks. - // The `rocksdb_datadir` setting should be used to configure RocksDB data - // directory. - print_error(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED, - MYF(ME_JUST_WARNING)); - DBUG_RETURN(HA_WRONG_CREATE_OPTION); - } - if (create_info->index_file_name) { - // Similar check for INDEX DIRECTORY as well. - print_error(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED, - MYF(ME_JUST_WARNING)); - DBUG_RETURN(HA_WRONG_CREATE_OPTION); - } - - std::string str; int err; - THD *const thd = my_core::thd_get_current_thd(); - - if (get_table_if_exists(name)) { - if (thd->lex->sql_command == SQLCOM_TRUNCATE) { - err = delete_table(name); - if (err != HA_EXIT_SUCCESS) { - DBUG_RETURN(err); - } - } else { - err = rdb_normalize_tablename(name, &str); - if (err != HA_EXIT_SUCCESS) { - DBUG_RETURN(err); - } - my_error(ER_METADATA_INCONSISTENCY, MYF(0), str.c_str(), name); - DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA); - } - } - - /* - Construct dbname.tablename ourselves, because parititioning - passes strings like "./test/t14#P#p0" for individual partitions, - while table_arg->s->table_name has none of that. - */ - err = rdb_normalize_tablename(name, &str); - if (err != HA_EXIT_SUCCESS) { - DBUG_RETURN(err); - } - - if (contains_foreign_key(thd)) { - my_error(ER_NOT_SUPPORTED_YET, MYF(0), - "FOREIGN KEY for the RocksDB storage engine"); - DBUG_RETURN(HA_ERR_UNSUPPORTED); - } - const std::unique_ptr<rocksdb::WriteBatch> wb = dict_manager.begin(); rocksdb::WriteBatch *const batch = wb.get(); /* Create table/key descriptions and put them into the data dictionary */ - m_tbl_def = new Rdb_tbl_def(str); + m_tbl_def = new Rdb_tbl_def(table_name); uint n_keys = table_arg->s->keys; @@ -7832,6 +7644,9 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg, */ if (has_hidden_pk(table_arg)) { n_keys += 1; + // reset hidden pk id + // the starting valid value for hidden pk is 1 + m_tbl_def->m_hidden_pk_val = 1; } m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[n_keys]; @@ -7845,9 +7660,9 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg, m_pk_descr = m_key_descr_arr[pk_index(table_arg, m_tbl_def)]; - if (create_info->auto_increment_value) { + if (auto_increment_value) { bool autoinc_upgrade_test = false; - m_tbl_def->m_auto_incr_val = create_info->auto_increment_value; + m_tbl_def->m_auto_incr_val = auto_increment_value; DBUG_EXECUTE_IF("myrocks_autoinc_upgrade", autoinc_upgrade_test = true;); if (!autoinc_upgrade_test) { auto s = dict_manager.put_auto_incr_val( @@ -7886,6 +7701,86 @@ error: } /** + @brief + create() is called to create a table. The variable name will have the name + of the table. + + @details + When create() is called you do not need to worry about + opening the table. Also, the .frm file will have already been + created so adjusting create_info is not necessary. You can overwrite + the .frm file at this point if you wish to change the table + definition, but there are no methods currently provided for doing + so. + + Called from handle.cc by ha_create_table(). + + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) + + @see + ha_create_table() in handle.cc +*/ + +int ha_rocksdb::create(const char *const name, TABLE *const table_arg, + HA_CREATE_INFO *const create_info) { + DBUG_ENTER_FUNC(); + + DBUG_ASSERT(table_arg != nullptr); + DBUG_ASSERT(create_info != nullptr); + + if (create_info->data_file_name) { + // DATA DIRECTORY is used to create tables under a specific location + // outside the MySQL data directory. We don't support this for MyRocks. + // The `rocksdb_datadir` setting should be used to configure RocksDB data + // directory. + DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED); + } + + if (create_info->index_file_name) { + // Similar check for INDEX DIRECTORY as well. + DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED); + } + + int err; + /* + Construct dbname.tablename ourselves, because parititioning + passes strings like "./test/t14#P#p0" for individual partitions, + while table_arg->s->table_name has none of that. + */ + std::string str; + err = rdb_normalize_tablename(name, &str); + if (err != HA_EXIT_SUCCESS) { + DBUG_RETURN(err); + } + + // FOREIGN KEY isn't supported yet + THD *const thd = my_core::thd_get_current_thd(); + if (contains_foreign_key(thd)) { + my_error(ER_NOT_SUPPORTED_YET, MYF(0), + "FOREIGN KEY for the RocksDB storage engine"); + DBUG_RETURN(HA_ERR_UNSUPPORTED); + } + + // Check whether Data Dictionary contain information + Rdb_tbl_def *tbl = ddl_manager.find(str); + if (tbl != nullptr) { + if (thd->lex->sql_command == SQLCOM_TRUNCATE) { + err = delete_table(tbl); + if (err != HA_EXIT_SUCCESS) { + DBUG_RETURN(err); + } + } else { + my_error(ER_METADATA_INCONSISTENCY, MYF(0), str.c_str(), name); + DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA); + } + } + + DBUG_RETURN(create_table(str, table_arg, create_info->auto_increment_value)); +} + +/** @note This function is used only when the table has not yet been opened, and keyread_allowed bitmap doesn't have the correct values yet. @@ -7928,9 +7823,10 @@ bool ha_rocksdb::check_keyread_allowed(uint inx, uint part, int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, rocksdb::Iterator *const iter, - const bool &full_key_match, + const bool /* unused */, const rocksdb::Slice &key_slice, const int64_t ttl_filter_ts) { + THD *thd = ha_thd(); /* We are looking for the first record such that index_tuple= lookup_tuple. @@ -7939,6 +7835,9 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice); while (iter->Valid() && kd.value_matches_prefix(iter->key(), key_slice)) { + if (thd && thd->killed) { + return HA_ERR_QUERY_INTERRUPTED; + } /* If TTL is enabled we need to check if the given key has already expired from the POV of the current transaction. If it has, try going to the next @@ -7960,9 +7859,10 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, } int ha_rocksdb::read_before_key(const Rdb_key_def &kd, - const bool &full_key_match, + const bool full_key_match, const rocksdb::Slice &key_slice, const int64_t ttl_filter_ts) { + THD *thd = ha_thd(); /* We are looking for record with the biggest t.key such that t.key < lookup_tuple. @@ -7970,6 +7870,9 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd, rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice); while (is_valid(m_scan_it)) { + if (thd && thd->killed) { + return HA_ERR_QUERY_INTERRUPTED; + } /* We are using full key and we've hit an exact match, or... @@ -7994,6 +7897,7 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd, int ha_rocksdb::read_after_key(const Rdb_key_def &kd, const rocksdb::Slice &key_slice, const int64_t ttl_filter_ts) { + THD *thd = ha_thd(); /* We are looking for the first record such that @@ -8011,6 +7915,9 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd, */ while (is_valid(m_scan_it) && kd.has_ttl() && should_hide_ttl_rec(kd, m_scan_it->value(), ttl_filter_ts)) { + if (thd && thd->killed) { + return HA_ERR_QUERY_INTERRUPTED; + } rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it); } @@ -8019,7 +7926,7 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd, int ha_rocksdb::position_to_correct_key( const Rdb_key_def &kd, const enum ha_rkey_function &find_flag, - const bool &full_key_match, const uchar *const key, + const bool full_key_match, const uchar *const key, const key_part_map &keypart_map, const rocksdb::Slice &key_slice, bool *const move_forward, const int64_t ttl_filter_ts) { int rc = 0; @@ -8027,65 +7934,66 @@ int ha_rocksdb::position_to_correct_key( *move_forward = true; switch (find_flag) { - case HA_READ_KEY_EXACT: - rc = - read_key_exact(kd, m_scan_it, full_key_match, key_slice, ttl_filter_ts); - break; - case HA_READ_BEFORE_KEY: - *move_forward = false; - rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts); - if (rc == 0 && !kd.covers_key(m_scan_it->key())) { - /* The record we've got is not from this index */ - rc = HA_ERR_KEY_NOT_FOUND; - } - break; - case HA_READ_AFTER_KEY: - case HA_READ_KEY_OR_NEXT: - rc = read_after_key(kd, key_slice, ttl_filter_ts); - if (rc == 0 && !kd.covers_key(m_scan_it->key())) { - /* The record we've got is not from this index */ - rc = HA_ERR_KEY_NOT_FOUND; - } - break; - case HA_READ_KEY_OR_PREV: - case HA_READ_PREFIX: - /* This flag is not used by the SQL layer, so we don't support it yet. */ - rc = HA_ERR_UNSUPPORTED; - break; - case HA_READ_PREFIX_LAST: - case HA_READ_PREFIX_LAST_OR_PREV: - *move_forward = false; - /* - Find the last record with the specified index prefix lookup. - - HA_READ_PREFIX_LAST requires that the record has the - prefix=lookup (if there are no such records, - HA_ERR_KEY_NOT_FOUND should be returned). - - HA_READ_PREFIX_LAST_OR_PREV has no such requirement. If there are no - records with prefix=lookup, we should return the last record - before that. - */ - rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts); - if (rc == 0) { - const rocksdb::Slice &rkey = m_scan_it->key(); - if (!kd.covers_key(rkey)) { + case HA_READ_KEY_EXACT: + rc = read_key_exact(kd, m_scan_it, full_key_match, key_slice, + ttl_filter_ts); + break; + case HA_READ_BEFORE_KEY: + *move_forward = false; + rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts); + if (rc == 0 && !kd.covers_key(m_scan_it->key())) { /* The record we've got is not from this index */ rc = HA_ERR_KEY_NOT_FOUND; - } else if (find_flag == HA_READ_PREFIX_LAST) { - uint size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple, - key, keypart_map); - rocksdb::Slice lookup_tuple(reinterpret_cast<char *>(m_sk_packed_tuple), - size); - - // We need to compare the key we've got with the original search prefix. - if (!kd.value_matches_prefix(rkey, lookup_tuple)) { + } + break; + case HA_READ_AFTER_KEY: + case HA_READ_KEY_OR_NEXT: + rc = read_after_key(kd, key_slice, ttl_filter_ts); + if (rc == 0 && !kd.covers_key(m_scan_it->key())) { + /* The record we've got is not from this index */ + rc = HA_ERR_KEY_NOT_FOUND; + } + break; + case HA_READ_KEY_OR_PREV: + case HA_READ_PREFIX: + /* This flag is not used by the SQL layer, so we don't support it yet. */ + rc = HA_ERR_UNSUPPORTED; + break; + case HA_READ_PREFIX_LAST: + case HA_READ_PREFIX_LAST_OR_PREV: + *move_forward = false; + /* + Find the last record with the specified index prefix lookup. + - HA_READ_PREFIX_LAST requires that the record has the + prefix=lookup (if there are no such records, + HA_ERR_KEY_NOT_FOUND should be returned). + - HA_READ_PREFIX_LAST_OR_PREV has no such requirement. If there are no + records with prefix=lookup, we should return the last record + before that. + */ + rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts); + if (rc == 0) { + const rocksdb::Slice &rkey = m_scan_it->key(); + if (!kd.covers_key(rkey)) { + /* The record we've got is not from this index */ rc = HA_ERR_KEY_NOT_FOUND; + } else if (find_flag == HA_READ_PREFIX_LAST) { + uint size = kd.pack_index_tuple(table, m_pack_buffer, + m_sk_packed_tuple, key, keypart_map); + rocksdb::Slice lookup_tuple( + reinterpret_cast<char *>(m_sk_packed_tuple), size); + + // We need to compare the key we've got with the original search + // prefix. + if (!kd.value_matches_prefix(rkey, lookup_tuple)) { + rc = HA_ERR_KEY_NOT_FOUND; + } } } - } - break; - default: - DBUG_ASSERT(0); - break; + break; + default: + DBUG_ASSERT(0); + break; } return rc; @@ -8094,11 +8002,10 @@ int ha_rocksdb::position_to_correct_key( int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd, const enum ha_rkey_function &find_flag, const rocksdb::Slice &slice, - const int &bytes_changed_by_succ, + const int bytes_changed_by_succ, const key_range *const end_key, uint *const end_key_packed_size) { - if (find_flag == HA_READ_KEY_EXACT) - return slice.size(); + if (find_flag == HA_READ_KEY_EXACT) return slice.size(); if (find_flag == HA_READ_PREFIX_LAST) { /* @@ -8168,8 +8075,18 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf, const rocksdb::Slice &rkey = m_scan_it->key(); const rocksdb::Slice &value = m_scan_it->value(); - bool covered_lookup = - m_keyread_only || kd.covers_lookup(table, &value, &m_lookup_bitmap); +#ifndef DBUG_OFF + bool save_keyread_only = m_keyread_only; +#endif + DBUG_EXECUTE_IF("dbug.rocksdb.HA_EXTRA_KEYREAD", { m_keyread_only = true; }); + + bool covered_lookup = (m_keyread_only && kd.can_cover_lookup()) || + kd.covers_lookup(&value, &m_lookup_bitmap); + +#ifndef DBUG_OFF + m_keyread_only = save_keyread_only; +#endif + if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) { pk_size = kd.get_primary_key_tuple(table, *m_pk_descr, &rkey, m_pk_packed_tuple); @@ -8177,12 +8094,11 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf, rc = HA_ERR_ROCKSDB_CORRUPT_DATA; } else { rc = kd.unpack_record(table, buf, &rkey, &value, - m_verify_row_debug_checksums); + m_converter->get_verify_row_debug_checksums()); global_stats.covered_secondary_key_lookups.inc(); } } else { - if (kd.m_is_reverse_cf) - move_forward = !move_forward; + if (kd.m_is_reverse_cf) move_forward = !move_forward; rc = find_icp_matching_index_rec(move_forward, buf); if (!rc) { @@ -8229,11 +8145,12 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf, ulong ha_rocksdb::index_flags(uint inx, uint part, bool all_parts) const { DBUG_ENTER_FUNC(); - ulong base_flags = HA_READ_NEXT | // doesn't seem to be used + ulong base_flags = HA_READ_NEXT | // doesn't seem to be used HA_READ_ORDER | HA_READ_RANGE | HA_READ_PREV; - if (check_keyread_allowed(inx, part, all_parts)) + if (check_keyread_allowed(inx, part, all_parts)) { base_flags |= HA_KEYREAD_ONLY; + } if (inx == table_share->primary_key) { /* @@ -8289,13 +8206,15 @@ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) { rocksdb::Slice value = m_scan_it->value(); bool covered_lookup = - m_keyread_only || m_key_descr_arr[keyno]->covers_lookup( - table, &value, &m_lookup_bitmap); + (m_keyread_only && m_key_descr_arr[keyno]->can_cover_lookup()) || + m_key_descr_arr[keyno]->covers_lookup(&value, &m_lookup_bitmap); if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) { rc = m_key_descr_arr[keyno]->unpack_record( - table, buf, &key, &value, m_verify_row_debug_checksums); + table, buf, &key, &value, + m_converter->get_verify_row_debug_checksums()); global_stats.covered_secondary_key_lookups.inc(); } else { + DEBUG_SYNC(ha_thd(), "rocksdb_concurrent_delete_sk"); rc = get_row_by_rowid(buf, m_pk_packed_tuple, size); } @@ -8345,9 +8264,10 @@ int ha_rocksdb::read_range_first(const key_range *const start_key, range_key_part = table->key_info[active_index].key_part; - if (!start_key) // Read first record + if (!start_key) { + // Read first record result = ha_index_first(table->record[0]); - else { + } else { #ifdef MARIAROCKS_NOT_YET if (is_using_prohibited_gap_locks( is_using_full_unique_key(active_index, start_key->keypart_map, @@ -8363,8 +8283,9 @@ int ha_rocksdb::read_range_first(const key_range *const start_key, start_key->keypart_map, start_key->flag, end_key); }) } - if (result) + if (result) { DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND) ? HA_ERR_END_OF_FILE : result); + } if (compare_key(end_range) <= 0) { DBUG_RETURN(HA_EXIT_SUCCESS); @@ -8441,14 +8362,23 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, const key_range *end_key) { DBUG_ENTER_FUNC(); + DBUG_EXECUTE_IF("myrocks_busy_loop_on_row_read", int debug_i = 0; + while (1) { debug_i++; }); + int rc = 0; + THD *thd = ha_thd(); + DEBUG_SYNC(thd, "rocksdb.check_flags_rmi"); + if (thd && thd->killed) { + rc = HA_ERR_QUERY_INTERRUPTED; + DBUG_RETURN(rc); + } + const Rdb_key_def &kd = *m_key_descr_arr[active_index]; const uint actual_key_parts = kd.get_key_parts(); bool using_full_key = is_using_full_key(keypart_map, actual_key_parts); - if (!end_key) - end_key = end_range; + if (!end_key) end_key = end_range; /* By default, we don't need the retrieved records to match the prefix */ m_sk_match_prefix = nullptr; @@ -8465,8 +8395,7 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, m_pk_packed_tuple, key, keypart_map); bool skip_lookup = is_blind_delete_enabled(); - rc = get_row_by_rowid(buf, m_pk_packed_tuple, size, - skip_lookup, false); + rc = get_row_by_rowid(buf, m_pk_packed_tuple, size, skip_lookup, false); if (!rc && !skip_lookup) { #ifdef MARIAROCKS_NOT_YET @@ -8491,8 +8420,9 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple, key, tmp_map); if (table->key_info[active_index].user_defined_key_parts != - kd.get_key_parts()) + kd.get_key_parts()) { using_full_key = false; + } } else { packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple, key, keypart_map); @@ -8546,14 +8476,20 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, bool use_all_keys = false; if (find_flag == HA_READ_KEY_EXACT && - my_count_bits(keypart_map) == kd.get_key_parts()) + my_count_bits(keypart_map) == kd.get_key_parts()) { use_all_keys = true; + } Rdb_transaction *const tx = get_or_create_tx(table->in_use); const bool is_new_snapshot = !tx->has_snapshot(); // Loop as long as we get a deadlock error AND we end up creating the // snapshot here (i.e. it did not exist prior to this) for (;;) { + DEBUG_SYNC(thd, "rocksdb.check_flags_rmi_scan"); + if (thd && thd->killed) { + rc = HA_ERR_QUERY_INTERRUPTED; + break; + } /* This will open the iterator and position it at a record that's equal or greater than the lookup tuple. @@ -8570,9 +8506,7 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, slice, &move_forward, tx->m_snapshot_timestamp); if (rc) { - /* This status is returned on any error */ - table->status = STATUS_NOT_FOUND; - DBUG_RETURN(rc); + break; } m_skip_scan_it_next_call = false; @@ -8582,13 +8516,15 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, then we have all the rows we need. For a secondary key we now need to lookup the primary key. */ - if (active_index == table->s->primary_key) + if (active_index == table->s->primary_key) { rc = read_row_from_primary_key(buf); - else + } else { rc = read_row_from_secondary_key(buf, kd, move_forward); + } - if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) + if (!should_recreate_snapshot(rc, is_new_snapshot)) { break; /* Exit the loop */ + } // release the snapshot and iterator so they will be regenerated tx->release_snapshot(); @@ -8596,7 +8532,10 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, } if (rc) { - /* the only possible error condition is record-not-found */ + /* + This status is returned on any error + the only possible error condition is record-not-found + */ table->status = STATUS_NOT_FOUND; } else { table->status = 0; @@ -8630,13 +8569,21 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key, other - error code */ -int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward, +int ha_rocksdb::find_icp_matching_index_rec(const bool move_forward, uchar *const buf) { if (pushed_idx_cond && pushed_idx_cond_keyno == active_index) { const Rdb_key_def &kd = *m_key_descr_arr[active_index]; + THD *thd = ha_thd(); while (1) { - rocksdb_skip_expired_records(kd, m_scan_it, !move_forward); + int rc = rocksdb_skip_expired_records(kd, m_scan_it, !move_forward); + if (rc != HA_EXIT_SUCCESS) { + return rc; + } + + if (thd && thd->killed) { + return HA_ERR_QUERY_INTERRUPTED; + } if (!is_valid(m_scan_it)) { table->status = STATUS_NOT_FOUND; @@ -8660,7 +8607,7 @@ int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward, const rocksdb::Slice value = m_scan_it->value(); int err = kd.unpack_record(table, buf, &rkey, &value, - m_verify_row_debug_checksums); + m_converter->get_verify_row_debug_checksums()); if (err != HA_EXIT_SUCCESS) { return err; } @@ -8681,14 +8628,6 @@ int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward, * row. */ break; } - /* - TODO: should we have this here, or RockDB handles this internally? - if (my_core::thd_killed(current_thd)) - { - rc= HA_ERR_INTERNAL_ERROR; // doesn't matter - break; - } - */ } } return HA_EXIT_SUCCESS; @@ -8720,14 +8659,14 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { // Rdb_transaction *tx= get_or_create_tx(table->in_use); // tx->snapshot= nullptr; - const bool save_verify_row_debug_checksums = m_verify_row_debug_checksums; - m_verify_row_debug_checksums = true; - + bool save_verify_row_debug_checksums = + m_converter->get_verify_row_debug_checksums(); + m_converter->set_verify_row_debug_checksums(true); /* For each secondary index, check that we can get a PK value from it */ // NO_LINT_DEBUG sql_print_verbose_info("CHECKTABLE %s: Checking table %s", table_name, table_name); - ha_rows UNINIT_VAR(row_checksums_at_start); // set/used iff first_index==true + ha_rows UNINIT_VAR(row_checksums_at_start); // set/used iff first_index==true ha_rows row_checksums = ha_rows(-1); bool first_index = true; @@ -8737,20 +8676,21 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { ha_index_init(keyno, true); ha_rows rows = 0; ha_rows checksums = 0; - if (first_index) - row_checksums_at_start = m_row_checksums_checked; + if (first_index) { + row_checksums_at_start = m_converter->get_row_checksums_checked(); + } int res; // NO_LINT_DEBUG sql_print_verbose_info("CHECKTABLE %s: Checking index %s", table_name, table->key_info[keyno].name); while (1) { - if (!rows) + if (!rows) { res = index_first(table->record[0]); - else + } else { res = index_next(table->record[0]); + } - if (res == HA_ERR_END_OF_FILE) - break; + if (res == HA_ERR_END_OF_FILE) break; if (res) { // error // NO_LINT_DEBUG @@ -8771,16 +8711,18 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { if ((res = get_row_by_rowid(table->record[0], rowkey_copy.ptr(), rowkey_copy.length()))) { // NO_LINT_DEBUG - sql_print_error("CHECKTABLE %s: .. row %lld: " - "failed to fetch row by rowid", - table_name, rows); + sql_print_error( + "CHECKTABLE %s: .. row %lld: " + "failed to fetch row by rowid", + table_name, rows); goto error; } longlong hidden_pk_id = 0; if (has_hidden_pk(table) && - read_hidden_pk_id_from_rowkey(&hidden_pk_id)) + read_hidden_pk_id_from_rowkey(&hidden_pk_id)) { goto error; + } /* Check if we get the same PK value */ uint packed_size = m_pk_descr->pack_record( @@ -8801,9 +8743,10 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { if (packed_size != sec_key_copy.length() || memcmp(m_sk_packed_tuple, sec_key_copy.ptr(), packed_size)) { // NO_LINT_DEBUG - sql_print_error("CHECKTABLE %s: .. row %lld: " - "secondary index value mismatch", - table_name, rows); + sql_print_error( + "CHECKTABLE %s: .. row %lld: " + "secondary index value mismatch", + table_name, rows); goto print_and_error; } rows++; @@ -8831,11 +8774,13 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { } // NO_LINT_DEBUG sql_print_verbose_info("CHECKTABLE %s: ... %lld index entries checked " + "CHECKTABLE %s: ... %lld index entries checked " "(%lld had checksums)", table_name, rows, checksums); if (first_index) { - row_checksums = m_row_checksums_checked - row_checksums_at_start; + row_checksums = + m_converter->get_row_checksums_checked() - row_checksums_at_start; first_index = false; } ha_index_end(); @@ -8848,16 +8793,16 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) { } extra(HA_EXTRA_NO_KEYREAD); - m_verify_row_debug_checksums = save_verify_row_debug_checksums; + m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums); /* - TODO: we should check also for PK records that are missing in the secondary - indexes. + TODO(sergiy): we should check also for PK records that are missing in + the secondary indexes. For that, need to walk through the PK and check that every PK record has a proper counterpart in each secondary index. */ DBUG_RETURN(HA_ADMIN_OK); error: - m_verify_row_debug_checksums = save_verify_row_debug_checksums; + m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums); ha_index_or_rnd_end(); extra(HA_EXTRA_NO_KEYREAD); @@ -8867,10 +8812,11 @@ error: static void dbug_dump_str(FILE *const out, const char *const str, int len) { fprintf(out, "\""); for (int i = 0; i < len; i++) { - if (str[i] > 32) + if (str[i] > 32) { fprintf(out, "%c", str[i]); - else + } else { fprintf(out, "\\%d", str[i]); + } } fprintf(out, "\""); } @@ -8883,8 +8829,7 @@ static void dbug_dump_str(FILE *const out, const char *const str, int len) { void dbug_dump_database(rocksdb::DB *const db) { FILE *const out = fopen("/tmp/rocksdb.dump", "wt"); - if (!out) - return; + if (!out) return; rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -8906,30 +8851,23 @@ rocksdb::Status ha_rocksdb::get_for_update( DBUG_ASSERT(m_lock_rows != RDB_LOCK_NONE); const bool exclusive = m_lock_rows != RDB_LOCK_READ; - rocksdb::Status s = tx->get_for_update(column_family, key, value, exclusive); - - // If we have a lock conflict and we are running in READ COMMITTTED mode - // release and reacquire the snapshot and then retry the get_for_update(). - if (s.IsBusy() && !s.IsDeadlock() && - my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED) { - tx->release_snapshot(); - tx->acquire_snapshot(false); - - s = tx->get_for_update(column_family, key, value, exclusive); - } + const bool do_validate = + my_core::thd_tx_isolation(ha_thd()) > ISO_READ_COMMITTED; + rocksdb::Status s = + tx->get_for_update(column_family, key, value, exclusive, do_validate); +#ifndef DBUG_OFF + ++rocksdb_num_get_for_update_calls; +#endif return s; } -bool ha_rocksdb::is_blind_delete_enabled() -{ +bool ha_rocksdb::is_blind_delete_enabled() { THD *thd = ha_thd(); return (THDVAR(thd, blind_delete_primary_key) && thd->lex->sql_command == SQLCOM_DELETE && - thd->lex->table_count == 1 && - table->s->keys == 1 && - !has_hidden_pk(table) && - !thd->rgi_slave); + thd->lex->table_count == 1 && table->s->keys == 1 && + !has_hidden_pk(table) && !thd->rgi_slave); } /* @@ -8957,8 +8895,9 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, DEBUG_SYNC(ha_thd(), "rocksdb.get_row_by_rowid"); DBUG_EXECUTE_IF("dbug.rocksdb.get_row_by_rowid", { THD *thd = ha_thd(); - const char act[] = "now signal Reached " - "wait_for signal.rocksdb.get_row_by_rowid_let_running"; + const char act[] = + "now signal Reached " + "wait_for signal.rocksdb.get_row_by_rowid_let_running"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act))); };); @@ -8967,8 +8906,7 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, rocksdb::Status s; /* Pretend row found without looking up */ - if (skip_lookup) - { + if (skip_lookup) { #ifdef MARIAROCKS_NOT_YET stats.rows_deleted_blind++; #endif @@ -8981,6 +8919,17 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, if (m_lock_rows == RDB_LOCK_NONE) { tx->acquire_snapshot(true); s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record); + } else if (m_insert_with_update && m_dup_pk_found) { + DBUG_ASSERT(m_pk_descr->get_keyno() == m_dupp_errkey); + DBUG_ASSERT(m_dup_pk_retrieved_record.length() == + m_retrieved_record.size()); + DBUG_ASSERT(memcmp(m_dup_pk_retrieved_record.ptr(), + m_retrieved_record.data(), + m_retrieved_record.size()) == 0); + + // do nothing - we already have the result in m_retrieved_record and + // already taken the lock + s = rocksdb::Status::OK(); } else { s = get_for_update(tx, m_pk_descr->get_cf(), key_slice, &m_retrieved_record); @@ -9035,8 +8984,7 @@ int ha_rocksdb::index_next(uchar *const buf) { } int rc = index_next_with_direction(buf, moves_forward); - if (rc == HA_ERR_KEY_NOT_FOUND) - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); } @@ -9055,8 +9003,7 @@ int ha_rocksdb::index_prev(uchar *const buf) { } int rc = index_next_with_direction(buf, moves_forward); - if (rc == HA_ERR_KEY_NOT_FOUND) - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); } @@ -9069,19 +9016,33 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) { if (active_index == pk_index(table, m_tbl_def)) { rc = rnd_next_with_direction(buf, move_forward); } else { - if (m_skip_scan_it_next_call) { - m_skip_scan_it_next_call = false; - } else { - if (move_forward) - m_scan_it->Next(); /* this call cannot fail */ - else - m_scan_it->Prev(); + THD *thd = ha_thd(); + for (;;) { + DEBUG_SYNC(thd, "rocksdb.check_flags_inwd"); + if (thd && thd->killed) { + rc = HA_ERR_QUERY_INTERRUPTED; + break; + } + if (m_skip_scan_it_next_call) { + m_skip_scan_it_next_call = false; + } else { + if (move_forward) { + m_scan_it->Next(); /* this call cannot fail */ + } else { + m_scan_it->Prev(); + } + } + rc = rocksdb_skip_expired_records(*m_key_descr_arr[active_index], + m_scan_it, !move_forward); + if (rc != HA_EXIT_SUCCESS) { + break; + } + rc = find_icp_matching_index_rec(move_forward, buf); + if (!rc) rc = secondary_index_read(active_index, buf); + if (!should_skip_invalidated_record(rc)) { + break; + } } - rocksdb_skip_expired_records(*m_key_descr_arr[active_index], m_scan_it, - !move_forward); - rc = find_icp_matching_index_rec(move_forward, buf); - if (!rc) - rc = secondary_index_read(active_index, buf); } DBUG_RETURN(rc); @@ -9099,8 +9060,7 @@ int ha_rocksdb::index_first(uchar *const buf) { int rc = m_key_descr_arr[active_index]->m_is_reverse_cf ? index_last_intern(buf) : index_first_intern(buf); - if (rc == HA_ERR_KEY_NOT_FOUND) - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); } @@ -9117,8 +9077,7 @@ int ha_rocksdb::index_last(uchar *const buf) { int rc = m_key_descr_arr[active_index]->m_is_reverse_cf ? index_first_intern(buf) : index_last_intern(buf); - if (rc == HA_ERR_KEY_NOT_FOUND) - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); } @@ -9186,8 +9145,9 @@ int ha_rocksdb::index_first_intern(uchar *const buf) { m_skip_scan_it_next_call = true; rc = index_next_with_direction(buf, true); - if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) - break; // exit the loop + if (!should_recreate_snapshot(rc, is_new_snapshot)) { + break; /* exit the loop */ + } // release the snapshot and iterator so they will be regenerated tx->release_snapshot(); @@ -9261,11 +9221,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) { DBUG_ASSERT(key != nullptr); const Rdb_key_def &kd = *m_key_descr_arr[active_index]; - if (kd.m_is_reverse_cf) { - kd.get_infimum_key(key, &key_size); - } else { - kd.get_supremum_key(key, &key_size); - } + int key_end_matching_bytes = kd.get_last_key(key, &key_size); rocksdb::Slice index_key((const char *)key, key_size); @@ -9276,7 +9232,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) { // Loop as long as we get a deadlock error AND we end up creating the // snapshot here (i.e. it did not exist prior to this) for (;;) { - setup_scan_iterator(kd, &index_key, false, Rdb_key_def::INDEX_NUMBER_SIZE); + setup_scan_iterator(kd, &index_key, false, key_end_matching_bytes); m_scan_it->SeekForPrev(index_key); m_skip_scan_it_next_call = false; @@ -9285,12 +9241,12 @@ int ha_rocksdb::index_last_intern(uchar *const buf) { rc = rnd_next_with_direction(buf, false); } else { rc = find_icp_matching_index_rec(false /*move_forward*/, buf); - if (!rc) - rc = secondary_index_read(active_index, buf); + if (!rc) rc = secondary_index_read(active_index, buf); } - if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot) + if (!should_recreate_snapshot(rc, is_new_snapshot)) { break; /* exit the loop */ + } // release the snapshot and iterator so they will be regenerated tx->release_snapshot(); @@ -9329,7 +9285,7 @@ void ha_rocksdb::unlock_row() { - If the index is PRIMARY KEY, and if all of the columns of the table are covered by the PRIMARY KEY, SingleDelete can be used. */ -bool ha_rocksdb::can_use_single_delete(const uint &index) const { +bool ha_rocksdb::can_use_single_delete(const uint index) const { return (index != pk_index(table, m_tbl_def) || (!has_hidden_pk(table) && table->key_info[index].ext_key_parts == table->s->fields)); @@ -9345,12 +9301,18 @@ bool ha_rocksdb::skip_unique_check() const { 3) the user set unique_checks option to 0, and the table does not have any indexes. If the table has secondary keys, then those might becomes inconsisted/corrupted + 4) We're using read-free replication */ return THDVAR(table->in_use, bulk_load) || (m_force_skip_unique_check && m_skip_unique_check) || (my_core::thd_test_options(table->in_use, OPTION_RELAXED_UNIQUE_CHECKS) && - m_tbl_def->m_key_count == 1); + m_tbl_def->m_key_count == 1) || +#ifdef MARIAROCKS_NOT_YET + use_read_free_rpl(); +#else + FALSE; +#endif } #ifdef MARIAROCKS_NOT_YET // MDEV-10975 @@ -9450,10 +9412,9 @@ const char *ha_rocksdb::get_key_comment(const uint index, return table_arg->key_info[index].comment.str; } -const std::string ha_rocksdb::generate_cf_name(const uint index, - const TABLE *const table_arg, - const Rdb_tbl_def *const tbl_def_arg, - bool *per_part_match_found) { +const std::string ha_rocksdb::generate_cf_name( + const uint index, const TABLE *const table_arg, + const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found) { DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(tbl_def_arg != nullptr); DBUG_ASSERT(per_part_match_found != nullptr); @@ -9498,6 +9459,9 @@ const std::string ha_rocksdb::get_table_comment(const TABLE *const table_arg) { } /** + Write a new row + + @param[in] buf new row data to write @return HA_EXIT_SUCCESS OK other HA_ERR error code (can be SE-specific) @@ -9520,6 +9484,11 @@ int ha_rocksdb::write_row(uchar *const buf) { } } + // clear cache at beginning of write for INSERT ON DUPLICATE + // we may get multiple write->fail->read->update if there are multiple + // values from INSERT + m_dup_pk_found = false; + const int rv = update_write_row(nullptr, buf, skip_unique_check()); if (rv == 0) { @@ -9552,6 +9521,15 @@ void ha_rocksdb::set_last_rowkey(const uchar *const old_data) { #endif } +/** + Collect update data for primary key + + @param[in, out] row_info hold all data for update row, such as + new row data/old row data + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) { int size; @@ -9564,10 +9542,9 @@ int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) { row_info->new_pk_unpack_info = &m_pk_unpack_info; - size = - m_pk_descr->pack_record(table, m_pack_buffer, row_info->new_data, - m_pk_packed_tuple, row_info->new_pk_unpack_info, - false, 0, 0, nullptr, &row_info->ttl_pk_offset); + size = m_pk_descr->pack_record( + table, m_pack_buffer, row_info->new_data, m_pk_packed_tuple, + row_info->new_pk_unpack_info, false, 0, 0, nullptr); } else if (row_info->old_data == nullptr) { row_info->hidden_pk_id = update_hidden_pk_val(); size = @@ -9591,30 +9568,32 @@ int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) { return HA_EXIT_SUCCESS; } -int ha_rocksdb::check_and_lock_unique_pk(const uint &key_id, +/** + Check the specified primary key value is unique and also lock the row + + @param[in] key_id key index + @param[in] row_info hold all data for update row, such as old row + data and new row data + @param[out] found whether the primary key exists before. + @param[out] pk_changed whether primary key is changed + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ +int ha_rocksdb::check_and_lock_unique_pk(const uint key_id, const struct update_row_info &row_info, - bool *const found, - bool *const pk_changed) { + bool *const found) { DBUG_ASSERT(found != nullptr); - DBUG_ASSERT(pk_changed != nullptr); - *pk_changed = false; - - /* - For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs - always require locking. - */ - if (row_info.old_pk_slice.size() > 0) { - /* - If the keys are the same, then no lock is needed - */ - if (!row_info.new_pk_slice.compare(row_info.old_pk_slice)) { - *found = false; - return HA_EXIT_SUCCESS; - } + DBUG_ASSERT(row_info.old_pk_slice.size() == 0 || + row_info.new_pk_slice.compare(row_info.old_pk_slice) != 0); - *pk_changed = true; - } + /* Ignore PK violations if this is a optimized 'replace into' */ +#ifdef MARIAROCKS_NOT_YET + const bool ignore_pk_unique_check = ha_thd()->lex->blind_replace_into; +#else + const bool ignore_pk_unique_check= false; +#endif /* Perform a read to determine if a duplicate entry exists. For primary @@ -9637,17 +9616,56 @@ int ha_rocksdb::check_and_lock_unique_pk(const uint &key_id, */ const rocksdb::Status s = get_for_update(row_info.tx, m_pk_descr->get_cf(), row_info.new_pk_slice, - &m_retrieved_record); + ignore_pk_unique_check ? nullptr : &m_retrieved_record); if (!s.ok() && !s.IsNotFound()) { return row_info.tx->set_status_error( table->in_use, s, *m_key_descr_arr[key_id], m_tbl_def, m_table_handler); } - *found = !s.IsNotFound(); + bool key_found = ignore_pk_unique_check ? false : !s.IsNotFound(); + + /* + If the pk key has ttl, we may need to pretend the row wasn't + found if it is already expired. + */ + if (key_found && m_pk_descr->has_ttl() && + should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, + (row_info.tx->m_snapshot_timestamp + ? row_info.tx->m_snapshot_timestamp + : static_cast<int64_t>(std::time(nullptr))))) { + key_found = false; + } + + if (key_found && row_info.old_data == nullptr && m_insert_with_update) { + // In INSERT ON DUPLICATE KEY UPDATE ... case, if the insert failed + // due to a duplicate key, remember the last key and skip the check + // next time + m_dup_pk_found = true; + +#ifndef DBUG_OFF + // save it for sanity checking later + m_dup_pk_retrieved_record.copy(m_retrieved_record.data(), + m_retrieved_record.size(), &my_charset_bin); +#endif + } + + *found = key_found; + return HA_EXIT_SUCCESS; } -int ha_rocksdb::check_and_lock_sk(const uint &key_id, +/** + Check the specified secondary key value is unique and also lock the row + + @param[in] key_id key index + @param[in] row_info hold all data for update row, such as old row + data and new row data + @param[out] found whether specified key value exists before. + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ +int ha_rocksdb::check_and_lock_sk(const uint key_id, const struct update_row_info &row_info, bool *const found) { DBUG_ASSERT(found != nullptr); @@ -9777,8 +9795,18 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id, return HA_EXIT_SUCCESS; } +/** + Enumerate all keys to check their uniquess and also lock it + + @param[in] row_info hold all data for update row, such as old row + data and new row data + @param[out] pk_changed whether primary key is changed + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ int ha_rocksdb::check_uniqueness_and_lock( - const struct update_row_info &row_info, bool *const pk_changed) { + const struct update_row_info &row_info, bool pk_changed) { /* Go through each index and determine if the index has uniqueness requirements. If it does, then try to obtain a row lock on the new values. @@ -9790,7 +9818,12 @@ int ha_rocksdb::check_uniqueness_and_lock( int rc; if (is_pk(key_id, table, m_tbl_def)) { - rc = check_and_lock_unique_pk(key_id, row_info, &found, pk_changed); + if (row_info.old_pk_slice.size() > 0 && !pk_changed) { + found = false; + rc = HA_EXIT_SUCCESS; + } else { + rc = check_and_lock_unique_pk(key_id, row_info, &found); + } } else { rc = check_and_lock_sk(key_id, row_info, &found); } @@ -9799,23 +9832,11 @@ int ha_rocksdb::check_uniqueness_and_lock( return rc; } - /* - If the pk key has ttl, we may need to pretend the row wasn't - found if it is already expired. The pk record is read into - m_retrieved_record by check_and_lock_unique_pk(). - */ - if (is_pk(key_id, table, m_tbl_def) && found && m_pk_descr->has_ttl() && - should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, - (row_info.tx->m_snapshot_timestamp - ? row_info.tx->m_snapshot_timestamp - : static_cast<int64_t>(std::time(nullptr))))) { - found = false; - } - if (found) { /* There is a row with this key already, so error out. */ errkey = key_id; m_dupp_errkey = errkey; + return HA_ERR_FOUND_DUPP_KEY; } } @@ -9823,19 +9844,31 @@ int ha_rocksdb::check_uniqueness_and_lock( return HA_EXIT_SUCCESS; } +/** + Check whether secondary key value is duplicate or not + + @param[in] table_arg the table currently working on + @param[in key_def the key_def is being checked + @param[in] key secondary key storage data + @param[out] sk_info hold secondary key memcmp datas(new/old) + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ + int ha_rocksdb::check_duplicate_sk(const TABLE *table_arg, - const Rdb_key_def &index, + const Rdb_key_def &key_def, const rocksdb::Slice *key, struct unique_sk_buf_info *sk_info) { uint n_null_fields = 0; - const rocksdb::Comparator *index_comp = index.get_cf()->GetComparator(); + const rocksdb::Comparator *index_comp = key_def.get_cf()->GetComparator(); /* Get proper SK buffer. */ uchar *sk_buf = sk_info->swap_and_get_sk_buf(); /* Get memcmp form of sk without extended pk tail */ uint sk_memcmp_size = - index.get_memcmp_sk_parts(table_arg, *key, sk_buf, &n_null_fields); + key_def.get_memcmp_sk_parts(table_arg, *key, sk_buf, &n_null_fields); sk_info->sk_memcmp_key = rocksdb::Slice(reinterpret_cast<char *>(sk_buf), sk_memcmp_size); @@ -9855,12 +9888,17 @@ int ha_rocksdb::bulk_load_key(Rdb_transaction *const tx, const Rdb_key_def &kd, const rocksdb::Slice &value, bool sort) { DBUG_ENTER_FUNC(); int res; + THD *thd = ha_thd(); + if (thd && thd->killed) { + DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); + } + rocksdb::ColumnFamilyHandle *cf = kd.get_cf(); // In the case of unsorted inserts, m_sst_info allocated here is not // used to store the keys. It is still used to indicate when tables // are switched. - if (m_sst_info == nullptr || m_sst_info->is_committed()) { + if (m_sst_info == nullptr || m_sst_info->is_done()) { m_sst_info.reset(new Rdb_sst_info(rdb, m_table_handler->m_table_name, kd.get_name(), cf, *rocksdb_db_options, THDVAR(ha_thd(), trace_sst_api))); @@ -9893,17 +9931,61 @@ int ha_rocksdb::finalize_bulk_load(bool print_client_error) { /* Skip if there are no possible ongoing bulk loads */ if (m_sst_info) { - res = m_sst_info->commit(print_client_error); + if (m_sst_info->is_done()) { + m_sst_info.reset(); + DBUG_RETURN(res); + } + + Rdb_sst_info::Rdb_sst_commit_info commit_info; + + // Wrap up the current work in m_sst_info and get ready to commit + // This transfer the responsibility of commit over to commit_info + res = m_sst_info->finish(&commit_info, print_client_error); + if (res == 0) { + // Make sure we have work to do - under race condition we could lose + // to another thread and end up with no work + if (commit_info.has_work()) { + rocksdb::IngestExternalFileOptions opts; + opts.move_files = true; + opts.snapshot_consistency = false; + opts.allow_global_seqno = false; + opts.allow_blocking_flush = false; + + const rocksdb::Status s = rdb->IngestExternalFile( + commit_info.get_cf(), commit_info.get_committed_files(), opts); + if (!s.ok()) { + if (print_client_error) { + Rdb_sst_info::report_error_msg(s, nullptr); + } + res = HA_ERR_ROCKSDB_BULK_LOAD; + } else { + // Mark the list of SST files as committed, otherwise they'll get + // cleaned up when commit_info destructs + commit_info.commit(); + } + } + } m_sst_info.reset(); } DBUG_RETURN(res); } -int ha_rocksdb::update_pk(const Rdb_key_def &kd, - const struct update_row_info &row_info, - const bool &pk_changed) { - const uint key_id = kd.get_keyno(); - const bool hidden_pk = is_hidden_pk(key_id, table, m_tbl_def); +/** + Update an existing primary key record or write a new primary key record + + @param[in] kd the primary key is being update/write + @param[in] update_row_info hold all row data, such as old row data and + new row data + @param[in] pk_changed whether primary key is changed + @return + HA_EXIT_SUCCESS OK + Other HA_ERR error code (can be SE-specific) + */ +int ha_rocksdb::update_write_pk(const Rdb_key_def &kd, + const struct update_row_info &row_info, + bool pk_changed) { + uint key_id = kd.get_keyno(); + bool hidden_pk = is_hidden_pk(key_id, table, m_tbl_def); ulonglong bytes_written = 0; /* @@ -9931,7 +10013,10 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd, int rc = HA_EXIT_SUCCESS; rocksdb::Slice value_slice; /* Prepare the new record to be written into RocksDB */ - if ((rc = convert_record_to_storage_format(row_info, &value_slice))) { + if ((rc = m_converter->encode_value_slice( + m_pk_descr, row_info.new_pk_slice, row_info.new_pk_unpack_info, + !row_info.old_pk_slice.empty(), should_store_row_debug_checksums(), + m_ttl_bytes, &m_ttl_bytes_updated, &value_slice))) { return rc; } @@ -9951,7 +10036,9 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd, row_info.tx->get_indexed_write_batch()->Put(cf, row_info.new_pk_slice, value_slice); } else { - const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice); + const bool assume_tracked = can_assume_tracked(ha_thd()); + const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice, + assume_tracked); if (!s.ok()) { if (s.IsBusy()) { errkey = table->s->primary_key; @@ -9971,9 +10058,22 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd, return rc; } -int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, - const struct update_row_info &row_info, - const bool bulk_load_sk) { +/** + update an existing secondary key record or write a new secondary key record + + @param[in] table_arg Table we're working on + @param[in] kd The secondary key being update/write + @param[in] row_info data structure contains old row data and new row data + @param[in] bulk_load_sk whether support bulk load. Currently it is only + support for write + @return + HA_EXIT_SUCCESS OK + Other HA_ERR error code (can be SE-specific) + */ +int ha_rocksdb::update_write_sk(const TABLE *const table_arg, + const Rdb_key_def &kd, + const struct update_row_info &row_info, + const bool bulk_load_sk) { int new_packed_size; int old_packed_size; int rc = HA_EXIT_SUCCESS; @@ -9995,19 +10095,18 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, return HA_EXIT_SUCCESS; } - const bool store_row_debug_checksums = should_store_row_debug_checksums(); - + bool store_row_debug_checksums = should_store_row_debug_checksums(); new_packed_size = kd.pack_record(table_arg, m_pack_buffer, row_info.new_data, m_sk_packed_tuple, &m_sk_tails, store_row_debug_checksums, - row_info.hidden_pk_id, 0, nullptr, nullptr, m_ttl_bytes); + row_info.hidden_pk_id, 0, nullptr, m_ttl_bytes); if (row_info.old_data != nullptr) { // The old value old_packed_size = kd.pack_record( table_arg, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old, &m_sk_tails_old, store_row_debug_checksums, row_info.hidden_pk_id, 0, - nullptr, nullptr, m_ttl_bytes); + nullptr, m_ttl_bytes); /* Check if we are going to write the same value. This can happen when @@ -10067,13 +10166,22 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd, return rc; } -int ha_rocksdb::update_indexes(const struct update_row_info &row_info, - const bool &pk_changed) { +/** + Update existing indexes(PK/SKs) or write new indexes(PK/SKs) + + @param[in] row_info hold all row data, such as old key/new key + @param[in] pk_changed whether primary key is changed + @return + HA_EXIT_SUCCESS OK + Other HA_ERR error code (can be SE-specific) + */ +int ha_rocksdb::update_write_indexes(const struct update_row_info &row_info, + const bool pk_changed) { int rc; bool bulk_load_sk; // The PK must be updated first to pull out the TTL value. - rc = update_pk(*m_pk_descr, row_info, pk_changed); + rc = update_write_pk(*m_pk_descr, row_info, pk_changed); if (rc != HA_EXIT_SUCCESS) { return rc; } @@ -10088,7 +10196,8 @@ int ha_rocksdb::update_indexes(const struct update_row_info &row_info, continue; } - rc = update_sk(table, *m_key_descr_arr[key_id], row_info, bulk_load_sk); + rc = update_write_sk(table, *m_key_descr_arr[key_id], row_info, + bulk_load_sk); if (rc != HA_EXIT_SUCCESS) { return rc; } @@ -10097,11 +10206,26 @@ int ha_rocksdb::update_indexes(const struct update_row_info &row_info, return HA_EXIT_SUCCESS; } +/** + Update an existing row or write a new row + + @param[in] old_data nullptr for write, non-null for update + @param[in] new_data non-null for write/update + @param[in] skip_unique_check whether to check uniqueness + @return + HA_EXIT_SUCCESS OK + Other HA_ERR error code (can be SE-specific) + */ int ha_rocksdb::update_write_row(const uchar *const old_data, const uchar *const new_data, const bool skip_unique_check) { DBUG_ENTER_FUNC(); + THD *thd = ha_thd(); + if (thd && thd->killed) { + DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); + } + bool pk_changed = false; struct update_row_info row_info; @@ -10109,7 +10233,6 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, row_info.new_data = new_data; row_info.skip_unique_check = skip_unique_check; row_info.new_pk_unpack_info = nullptr; - set_last_rowkey(old_data); row_info.tx = get_or_create_tx(table->in_use); @@ -10130,12 +10253,20 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, DBUG_RETURN(rc); } + /* + For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs + always require locking. + */ + if (row_info.old_pk_slice.size() > 0) { + pk_changed = row_info.new_pk_slice.compare(row_info.old_pk_slice) != 0; + } + if (!skip_unique_check) { /* Check to see if we are going to have failures because of unique keys. Also lock the appropriate key values. */ - rc = check_uniqueness_and_lock(row_info, &pk_changed); + rc = check_uniqueness_and_lock(row_info, pk_changed); if (rc != HA_EXIT_SUCCESS) { DBUG_RETURN(rc); } @@ -10149,7 +10280,7 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, here because updates to the transaction will be made and those updates cannot be easily removed without rolling back the entire transaction. */ - rc = update_indexes(row_info, pk_changed); + rc = update_write_indexes(row_info, pk_changed); if (rc != HA_EXIT_SUCCESS) { DBUG_RETURN(rc); } @@ -10190,18 +10321,29 @@ void ha_rocksdb::setup_iterator_bounds( const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len, uchar *const lower_bound, uchar *const upper_bound, rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice) { - uint min_len = std::min(eq_cond.size(), bound_len); - memcpy(upper_bound, eq_cond.data(), min_len); - kd.successor(upper_bound, min_len); - memcpy(lower_bound, eq_cond.data(), min_len); - kd.predecessor(lower_bound, min_len); + // If eq_cond is shorter than Rdb_key_def::INDEX_NUMBER_SIZE, we should be + // able to get better bounds just by using index id directly. + if (eq_cond.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) { + DBUG_ASSERT(bound_len == Rdb_key_def::INDEX_NUMBER_SIZE); + uint size; + kd.get_infimum_key(lower_bound, &size); + DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE); + kd.get_supremum_key(upper_bound, &size); + DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE); + } else { + DBUG_ASSERT(bound_len <= eq_cond.size()); + memcpy(upper_bound, eq_cond.data(), bound_len); + kd.successor(upper_bound, bound_len); + memcpy(lower_bound, eq_cond.data(), bound_len); + kd.predecessor(lower_bound, bound_len); + } if (kd.m_is_reverse_cf) { - *upper_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len); - *lower_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len); + *upper_bound_slice = rocksdb::Slice((const char *)lower_bound, bound_len); + *lower_bound_slice = rocksdb::Slice((const char *)upper_bound, bound_len); } else { - *upper_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len); - *lower_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len); + *upper_bound_slice = rocksdb::Slice((const char *)upper_bound, bound_len); + *lower_bound_slice = rocksdb::Slice((const char *)lower_bound, bound_len); } } @@ -10220,8 +10362,17 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, bool skip_bloom = true; const rocksdb::Slice eq_cond(slice->data(), eq_cond_len); + // The size of m_scan_it_lower_bound (and upper) is technically + // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers. Rather + // than recalculating that number, we pass in the max of eq_cond_len and + // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than + // max_packed_sk_len, hence ensuring no buffer overrun. + // + // See ha_rocksdb::setup_iterator_bounds on how the bound_len parameter is + // used. if (check_bloom_and_set_bounds( - ha_thd(), kd, eq_cond, use_all_keys, eq_cond_len, + ha_thd(), kd, eq_cond, use_all_keys, + std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_NUMBER_SIZE), m_scan_it_lower_bound, m_scan_it_upper_bound, &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) { skip_bloom = false; @@ -10259,7 +10410,8 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, m_scan_it_snapshot = rdb->GetSnapshot(); auto read_opts = rocksdb::ReadOptions(); - read_opts.total_order_seek = true; // TODO: set based on WHERE conditions + // TODO(mung): set based on WHERE conditions + read_opts.total_order_seek = true; read_opts.snapshot = m_scan_it_snapshot; m_scan_it = rdb->NewIterator(read_opts, kd.get_cf()); } else { @@ -10284,12 +10436,12 @@ void ha_rocksdb::release_scan_iterator() { void ha_rocksdb::setup_iterator_for_rnd_scan() { uint key_size; - int key_start_matching_bytes = m_pk_descr->get_first_key(m_pk_packed_tuple, &key_size); + int key_start_matching_bytes = + m_pk_descr->get_first_key(m_pk_packed_tuple, &key_size); rocksdb::Slice table_key((const char *)m_pk_packed_tuple, key_size); - setup_scan_iterator(*m_pk_descr, &table_key, false, - key_start_matching_bytes); + setup_scan_iterator(*m_pk_descr, &table_key, false, key_start_matching_bytes); m_scan_it->Seek(table_key); m_skip_scan_it_next_call = true; } @@ -10302,8 +10454,16 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() { int ha_rocksdb::rnd_init(bool scan) { DBUG_ENTER_FUNC(); + THD *thd = ha_thd(); + if (thd && thd->killed) { + DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); + } + Rdb_transaction *const tx = get_or_create_tx(table->in_use); - setup_read_decoders(); + + // when this table is being updated, decode all fields + m_converter->setup_field_decoders(table->read_set, + m_lock_rows == RDB_LOCK_WRITE); if (scan) { m_rnd_scan_is_new_snapshot = !tx->has_snapshot(); @@ -10331,9 +10491,9 @@ int ha_rocksdb::rnd_next(uchar *const buf) { int rc; for (;;) { rc = rnd_next_with_direction(buf, true); - if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !m_rnd_scan_is_new_snapshot) + if (!should_recreate_snapshot(rc, m_rnd_scan_is_new_snapshot)) { break; /* exit the loop */ - + } // release the snapshot and iterator and then regenerate them Rdb_transaction *tx = get_or_create_tx(table->in_use); tx->release_snapshot(); @@ -10343,8 +10503,7 @@ int ha_rocksdb::rnd_next(uchar *const buf) { m_rnd_scan_is_new_snapshot = false; - if (rc == HA_ERR_KEY_NOT_FOUND) - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); } @@ -10356,6 +10515,7 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) { DBUG_ENTER_FUNC(); int rc; + THD *thd = ha_thd(); table->status = STATUS_NOT_FOUND; #ifdef MARIAROCKS_NOT_YET @@ -10374,13 +10534,20 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) { } for (;;) { + DEBUG_SYNC(thd, "rocksdb.check_flags_rnwd"); + if (thd && thd->killed) { + rc = HA_ERR_QUERY_INTERRUPTED; + break; + } + if (m_skip_scan_it_next_call) { m_skip_scan_it_next_call = false; } else { - if (move_forward) + if (move_forward) { m_scan_it->Next(); /* this call cannot fail */ - else + } else { m_scan_it->Prev(); /* this call cannot fail */ + } } if (!is_valid(m_scan_it)) { @@ -10414,11 +10581,7 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) { const rocksdb::Status s = get_for_update(tx, m_pk_descr->get_cf(), key, &m_retrieved_record); if (s.IsNotFound() && - my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED) { - // This occurs if we accessed a row, tried to lock it, failed, - // released and reacquired the snapshot (because of READ COMMITTED - // mode) and the row was deleted by someone else in the meantime. - // If so, we just want to move on to the next row. + should_skip_invalidated_record(HA_ERR_KEY_NOT_FOUND)) { continue; } @@ -10478,11 +10641,18 @@ int ha_rocksdb::rnd_end() { int ha_rocksdb::index_init(uint idx, bool sorted) { DBUG_ENTER_FUNC(); + THD *thd = ha_thd(); + if (thd && thd->killed) { + DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); + } + Rdb_transaction *const tx = get_or_create_tx(table->in_use); DBUG_ASSERT(tx != nullptr); - setup_read_decoders(); - m_start_range= NULL; + m_start_range= NULL; + // when this table is being updated, decode all fields + m_converter->setup_field_decoders(table->read_set, + m_lock_rows == RDB_LOCK_WRITE); if (!m_keyread_only) { m_key_descr_arr[idx]->get_lookup_bitmap(table, &m_lookup_bitmap); @@ -10526,10 +10696,20 @@ int ha_rocksdb::truncate() { DBUG_ASSERT(m_tbl_def != nullptr); - m_tbl_def->m_auto_incr_val = 1; - m_tbl_def->m_hidden_pk_val = 1; + // Save table name to use later + std::string table_name = m_tbl_def->full_tablename(); + + // Delete current table + int err = delete_table(m_tbl_def); + if (err != HA_EXIT_SUCCESS) { + DBUG_RETURN(err); + } - DBUG_RETURN(remove_rows(m_tbl_def)); + // Reset auto_increment_value to 1 if auto-increment feature is enabled + // By default, the starting valid value for auto_increment_value is 1 + DBUG_RETURN(create_table( + table_name, table, + table->found_next_number_field ? 1 : 0 /* auto_increment_value */)); } /* @@ -10607,9 +10787,11 @@ rocksdb::Status ha_rocksdb::delete_or_singledelete( uint index, Rdb_transaction *const tx, rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key) { - if (can_use_single_delete(index)) - return tx->single_delete(column_family, key); - return tx->delete_key(column_family, key); + const bool assume_tracked = can_assume_tracked(ha_thd()); + if (can_use_single_delete(index)) { + return tx->single_delete(column_family, key, assume_tracked); + } + return tx->delete_key(column_family, key, assume_tracked); } void ha_rocksdb::update_stats(void) { @@ -10640,8 +10822,9 @@ void ha_rocksdb::update_stats(void) { int ha_rocksdb::info(uint flag) { DBUG_ENTER_FUNC(); - if (!table) + if (!table) { DBUG_RETURN(HA_EXIT_FAILURE); + } if (flag & HA_STATUS_VARIABLE) { /* @@ -10669,10 +10852,8 @@ int ha_rocksdb::info(uint flag) { // if number of records is hardcoded, we do not want to force computation // of memtable cardinalities - if (stats.records == 0 || - (rocksdb_force_compute_memtable_stats && - rocksdb_debug_optimizer_n_rows == 0)) - { + if (stats.records == 0 || (rocksdb_force_compute_memtable_stats && + rocksdb_debug_optimizer_n_rows == 0)) { // First, compute SST files stats uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; auto r = get_range(pk_index(table, m_tbl_def), buf); @@ -10682,8 +10863,8 @@ int ha_rocksdb::info(uint flag) { if (stats.records == 0) { rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz, include_flags); - stats.records+= sz/ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE; - stats.data_file_length+= sz; + stats.records += sz / ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE; + stats.data_file_length += sz; } // Second, compute memtable stats. This call is expensive, so cache // values computed for some time. @@ -10720,14 +10901,17 @@ int ha_rocksdb::info(uint flag) { stats.records += m_table_handler->m_mtcache_count; stats.data_file_length += m_table_handler->m_mtcache_size; } + } - if (rocksdb_debug_optimizer_n_rows > 0) - stats.records = rocksdb_debug_optimizer_n_rows; + if (rocksdb_debug_optimizer_n_rows > 0) { + stats.records = rocksdb_debug_optimizer_n_rows; } - if (stats.records != 0) + if (stats.records != 0) { stats.mean_rec_length = stats.data_file_length / stats.records; + } } + if (flag & HA_STATUS_CONST) { ref_length = m_pk_descr->max_storage_fmt_length(); @@ -10753,8 +10937,7 @@ int ha_rocksdb::info(uint flag) { } else { x = 0; } - if (x > stats.records) - x = stats.records; + if (x > stats.records) x = stats.records; if ((x == 0 && rocksdb_debug_optimizer_no_zero_cardinality) || rocksdb_debug_optimizer_n_rows > 0) { // Fake cardinality implementation. For example, (idx1, idx2, idx3) @@ -10789,7 +10972,7 @@ int ha_rocksdb::info(uint flag) { uniqueness violation. */ errkey = m_dupp_errkey; - dup_ref = m_pk_tuple; // TODO(?): this should store packed PK. + dup_ref = m_pk_tuple; // TODO(?): this should store packed PK. } if (flag & HA_STATUS_AUTO) { @@ -10803,8 +10986,9 @@ void ha_rocksdb::position(const uchar *const record) { DBUG_ENTER_FUNC(); longlong hidden_pk_id = 0; - if (has_hidden_pk(table) && read_hidden_pk_id_from_rowkey(&hidden_pk_id)) - DBUG_ASSERT(false); // should never reach here + if (has_hidden_pk(table) && read_hidden_pk_id_from_rowkey(&hidden_pk_id)) { + DBUG_ASSERT(false); // should never reach here + } /* Get packed primary key value from the record. @@ -10826,8 +11010,9 @@ void ha_rocksdb::position(const uchar *const record) { It could be that mem-comparable form of PK occupies less than ref_length bytes. Fill the remainder with zeros. */ - if (ref_length > packed_size) + if (ref_length > packed_size) { memset(ref + packed_size, 0, ref_length - packed_size); + } DBUG_VOID_RETURN; } @@ -10885,8 +11070,7 @@ void ha_rocksdb::calc_updated_indexes() { /* Walk over all key parts, including the "extended key" suffix */ const uint key_parts = kd.get_key_parts(); for (uint kp = 0; kp < key_parts; kp++) { - if (has_hidden_pk(table) && kp + 1 == key_parts) - break; + if (has_hidden_pk(table) && kp + 1 == key_parts) break; Field *const field = kd.get_table_field_for_part_no(table, kp); if (bitmap_is_set(table->write_set, field->field_index)) { @@ -10899,6 +11083,9 @@ void ha_rocksdb::calc_updated_indexes() { } /** + Update an existing row + @param[in] old_data nullptr for write, non-null for update + @param[in] new_data non-null for write/update @return HA_EXIT_SUCCESS OK other HA_ERR error code (can be SE-specific) @@ -10916,7 +11103,7 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) { */ DBUG_ASSERT(new_data == table->record[0]); - const int rv = update_write_row(old_data, new_data, false); + const int rv = update_write_row(old_data, new_data, skip_unique_check()); if (rv == 0) { #ifdef MARIAROCKS_NOT_YET @@ -11021,7 +11208,8 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *const thd, THR_LOCK_DATA **to, void ha_rocksdb::read_thd_vars(THD *const thd) { m_store_row_debug_checksums = THDVAR(thd, store_row_debug_checksums); - m_verify_row_debug_checksums = THDVAR(thd, verify_row_debug_checksums); + m_converter->set_verify_row_debug_checksums( + THDVAR(thd, verify_row_debug_checksums)); m_checksums_pct = THDVAR(thd, checksums_pct); } @@ -11112,7 +11300,8 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) { tx->io_perf_end_and_record(&m_io_perf); tx->m_n_mysql_tables_in_use--; if (tx->m_n_mysql_tables_in_use == 0 && - !my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { + !my_core::thd_test_options(thd, + OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { /* Do like InnoDB: when we get here, it's time to commit a single-statement transaction. @@ -11244,9 +11433,8 @@ rocksdb::Range get_range(const Rdb_key_def &kd, } } -rocksdb::Range -ha_rocksdb::get_range(const int &i, - uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]) const { +rocksdb::Range ha_rocksdb::get_range( + const int i, uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]) const { return myrocks::get_range(*m_key_descr_arr[i], buf); } @@ -11257,11 +11445,10 @@ ha_rocksdb::get_range(const int &i, but in drop_index_thread's case, it means index is marked as removed, so no further seek will happen for the index id. */ -static bool is_myrocks_index_empty( - rocksdb::ColumnFamilyHandle *cfh, const bool is_reverse_cf, - const rocksdb::ReadOptions &read_opts, - const uint index_id) -{ +static bool is_myrocks_index_empty(rocksdb::ColumnFamilyHandle *cfh, + const bool is_reverse_cf, + const rocksdb::ReadOptions &read_opts, + const uint index_id) { bool index_removed = false; uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0}; rdb_netbuf_store_uint32(key_buf, index_id); @@ -11272,8 +11459,7 @@ static bool is_myrocks_index_empty( if (!it->Valid()) { index_removed = true; } else { - if (memcmp(it->key().data(), key_buf, - Rdb_key_def::INDEX_NUMBER_SIZE)) { + if (memcmp(it->key().data(), key_buf, Rdb_key_def::INDEX_NUMBER_SIZE)) { // Key does not have same prefix index_removed = true; } @@ -11300,8 +11486,8 @@ void Rdb_drop_index_thread::run() { timespec ts; int sec= dict_manager.is_drop_index_empty() - ? 24 * 60 * 60 // no filtering - : 60; // filtering + ? 24 * 60 * 60 // no filtering + : 60; // filtering set_timespec(ts,sec); const auto ret MY_ATTRIBUTE((__unused__)) = @@ -11318,26 +11504,23 @@ void Rdb_drop_index_thread::run() { if (!indices.empty()) { std::unordered_set<GL_INDEX_ID> finished; rocksdb::ReadOptions read_opts; - read_opts.total_order_seek = true; // disable bloom filter + read_opts.total_order_seek = true; // disable bloom filter for (const auto d : indices) { uint32 cf_flags = 0; if (!dict_manager.get_cf_flags(d.cf_id, &cf_flags)) { - sql_print_error("RocksDB: Failed to get column family flags " - "from cf id %u. MyRocks data dictionary may " - "get corrupted.", - d.cf_id); + // NO_LINT_DEBUG + sql_print_error( + "RocksDB: Failed to get column family flags " + "from cf id %u. MyRocks data dictionary may " + "get corrupted.", + d.cf_id); abort(); } rocksdb::ColumnFamilyHandle *cfh = cf_manager.get_cf(d.cf_id); DBUG_ASSERT(cfh); const bool is_reverse_cf = cf_flags & Rdb_key_def::REVERSE_CF_FLAG; - if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) - { - finished.insert(d); - continue; - } uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]; rocksdb::Range range = get_range(d.index_id, buf, is_reverse_cf ? 1 : 0, is_reverse_cf ? 0 : 1); @@ -11357,8 +11540,7 @@ void Rdb_drop_index_thread::run() { } rdb_handle_io_error(status, RDB_IO_ERROR_BG_THREAD); } - if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) - { + if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) { finished.insert(d); } } @@ -11385,28 +11567,24 @@ Rdb_tbl_def *ha_rocksdb::get_table_if_exists(const char *const tablename) { } /* - Note: the following function is called when the table is not open. That is, - this->table==nullptr, pk_key_descr==nullptr, etc. + Overload func for delete table ---it deletes table meta data in data + dictionary immediately and delete real data in background thread(async) - tablename points to line in form "./dbname/tablename". + @param tbl IN MyRocks table definition @return HA_EXIT_SUCCESS OK other HA_ERR error code (can be SE-specific) */ - -int ha_rocksdb::delete_table(const char *const tablename) { +int ha_rocksdb::delete_table(Rdb_tbl_def *const tbl) { DBUG_ENTER_FUNC(); - DBUG_ASSERT(tablename != nullptr); + DBUG_ASSERT(tbl != nullptr); + DBUG_ASSERT(m_tbl_def == nullptr || m_tbl_def == tbl); const std::unique_ptr<rocksdb::WriteBatch> wb = dict_manager.begin(); rocksdb::WriteBatch *const batch = wb.get(); - /* Find the table in the hash */ - Rdb_tbl_def *const tbl = get_table_if_exists(tablename); - if (!tbl) - DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); dict_manager.add_drop_table(tbl->m_key_descr_arr, tbl->m_key_count, batch); /* @@ -11421,10 +11599,36 @@ int ha_rocksdb::delete_table(const char *const tablename) { } rdb_drop_idx_thread.signal(); - + // avoid dangling pointer + m_tbl_def = nullptr; DBUG_RETURN(HA_EXIT_SUCCESS); } +/* + Note: the following function is called when the table is not open. That is, + this->table==nullptr, pk_key_descr==nullptr, etc. + + tablename points to line in form "./dbname/tablename". + + @return + HA_EXIT_SUCCESS OK + other HA_ERR error code (can be SE-specific) +*/ + +int ha_rocksdb::delete_table(const char *const tablename) { + DBUG_ENTER_FUNC(); + + DBUG_ASSERT(tablename != nullptr); + + /* Find the table in the hash */ + Rdb_tbl_def *const tbl = get_table_if_exists(tablename); + if (!tbl) { + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + + DBUG_RETURN(delete_table(tbl)); +} + int ha_rocksdb::remove_rows(Rdb_tbl_def *const tbl) { const rocksdb::WriteOptions wo = rdb_get_rocksdb_write_options(handler::ha_thd()); @@ -11575,21 +11779,33 @@ int ha_rocksdb::extra(enum ha_extra_function operation) { DBUG_ENTER_FUNC(); switch (operation) { - case HA_EXTRA_KEYREAD: - m_keyread_only = true; - break; - case HA_EXTRA_NO_KEYREAD: - m_keyread_only = false; - break; - case HA_EXTRA_FLUSH: - /* - If the table has blobs, then they are part of m_retrieved_record. - This call invalidates them. - */ - m_retrieved_record.Reset(); - break; - default: - break; + case HA_EXTRA_KEYREAD: + m_keyread_only = true; + break; + case HA_EXTRA_NO_KEYREAD: + m_keyread_only = false; + break; + case HA_EXTRA_FLUSH: + /* + If the table has blobs, then they are part of m_retrieved_record. + This call invalidates them. + */ + m_retrieved_record.Reset(); + break; + case HA_EXTRA_INSERT_WITH_UPDATE: + // INSERT ON DUPLICATE KEY UPDATE + if (rocksdb_enable_insert_with_update_caching) { + m_insert_with_update = true; + } + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + // PAIRED with HA_EXTRA_INSERT_WITH_UPDATE or HA_EXTRA_WRITE_CAN_REPLACE + // that indicates the end of REPLACE / INSERT ON DUPLICATE KEY + m_insert_with_update = false; + break; + + default: + break; } DBUG_RETURN(HA_EXIT_SUCCESS); @@ -11660,8 +11876,7 @@ ha_rows ha_rocksdb::records_in_range(uint inx, key_range *const min_key, uint64_t sz = 0; auto disk_size = kd.m_stats.m_actual_disk_size; - if (disk_size == 0) - disk_size = kd.m_stats.m_data_size; + if (disk_size == 0) disk_size = kd.m_stats.m_data_size; auto rows = kd.m_stats.m_rows; if (rows == 0 || disk_size == 0) { rows = 1; @@ -11772,7 +11987,7 @@ static int calculate_stats( // get RocksDB table properties for these ranges rocksdb::TablePropertiesCollection props; - for (auto it : ranges) { + for (const auto &it : ranges) { const auto old_size MY_ATTRIBUTE((__unused__)) = props.size(); const auto status = rdb->GetPropertiesOfTablesInRange( it.first, &it.second[0], it.second.size(), &props); @@ -11962,73 +12177,77 @@ void ha_rocksdb::get_auto_increment(ulonglong off, ulonglong inc, // The next value can be more complicated if either 'inc' or 'off' is not 1 ulonglong last_val = auto_incr; - // Loop until we can correctly update the atomic value - do { - DBUG_ASSERT(last_val > 0); - // Calculate the next value in the auto increment series: offset - // + N * increment where N is 0, 1, 2, ... - // - // For further information please visit: - // http://dev.mysql.com/doc/refman/5.7/en/replication-options-master.html - // - // The following is confusing so here is an explanation: - // To get the next number in the sequence above you subtract out the - // offset, calculate the next sequence (N * increment) and then add the - // offset back in. - // - // The additions are rearranged to avoid overflow. The following is - // equivalent to (last_val - 1 + inc - off) / inc. This uses the fact - // that (a+b)/c = a/c + b/c + (a%c + b%c)/c. To show why: - // - // (a+b)/c - // = (a - a%c + a%c + b - b%c + b%c) / c - // = (a - a%c) / c + (b - b%c) / c + (a%c + b%c) / c - // = a/c + b/c + (a%c + b%c) / c - // - // Now, substitute a = last_val - 1, b = inc - off, c = inc to get the - // following statement. - ulonglong n = - (last_val - 1) / inc + ((last_val - 1) % inc + inc - off) / inc; - - // Check if n * inc + off will overflow. This can only happen if we have - // an UNSIGNED BIGINT field. - if (n > (std::numeric_limits<ulonglong>::max() - off) / inc) { - DBUG_ASSERT(max_val == std::numeric_limits<ulonglong>::max()); - // The 'last_val' value is already equal to or larger than the largest - // value in the sequence. Continuing would wrap around (technically - // the behavior would be undefined). What should we do? - // We could: - // 1) set the new value to the last possible number in our sequence - // as described above. The problem with this is that this - // number could be smaller than a value in an existing row. - // 2) set the new value to the largest possible number. This number - // may not be in our sequence, but it is guaranteed to be equal - // to or larger than any other value already inserted. + if (last_val > max_val) { + new_val = std::numeric_limits<ulonglong>::max(); + } else { + // Loop until we can correctly update the atomic value + do { + DBUG_ASSERT(last_val > 0); + // Calculate the next value in the auto increment series: offset + // + N * increment where N is 0, 1, 2, ... // - // For now I'm going to take option 2. + // For further information please visit: + // http://dev.mysql.com/doc/refman/5.7/en/replication-options-master.html // - // Returning ULLONG_MAX from get_auto_increment will cause the SQL - // layer to fail with ER_AUTOINC_READ_FAILED. This means that due to - // the SE API for get_auto_increment, inserts will fail with - // ER_AUTOINC_READ_FAILED if the column is UNSIGNED BIGINT, but - // inserts will fail with ER_DUP_ENTRY for other types (or no failure - // if the column is in a non-unique SK). - new_val = std::numeric_limits<ulonglong>::max(); - auto_incr = new_val; // Store the largest value into auto_incr - break; - } + // The following is confusing so here is an explanation: + // To get the next number in the sequence above you subtract out the + // offset, calculate the next sequence (N * increment) and then add the + // offset back in. + // + // The additions are rearranged to avoid overflow. The following is + // equivalent to (last_val - 1 + inc - off) / inc. This uses the fact + // that (a+b)/c = a/c + b/c + (a%c + b%c)/c. To show why: + // + // (a+b)/c + // = (a - a%c + a%c + b - b%c + b%c) / c + // = (a - a%c) / c + (b - b%c) / c + (a%c + b%c) / c + // = a/c + b/c + (a%c + b%c) / c + // + // Now, substitute a = last_val - 1, b = inc - off, c = inc to get the + // following statement. + ulonglong n = + (last_val - 1) / inc + ((last_val - 1) % inc + inc - off) / inc; + + // Check if n * inc + off will overflow. This can only happen if we have + // an UNSIGNED BIGINT field. + if (n > (std::numeric_limits<ulonglong>::max() - off) / inc) { + DBUG_ASSERT(max_val == std::numeric_limits<ulonglong>::max()); + // The 'last_val' value is already equal to or larger than the largest + // value in the sequence. Continuing would wrap around (technically + // the behavior would be undefined). What should we do? + // We could: + // 1) set the new value to the last possible number in our sequence + // as described above. The problem with this is that this + // number could be smaller than a value in an existing row. + // 2) set the new value to the largest possible number. This number + // may not be in our sequence, but it is guaranteed to be equal + // to or larger than any other value already inserted. + // + // For now I'm going to take option 2. + // + // Returning ULLONG_MAX from get_auto_increment will cause the SQL + // layer to fail with ER_AUTOINC_READ_FAILED. This means that due to + // the SE API for get_auto_increment, inserts will fail with + // ER_AUTOINC_READ_FAILED if the column is UNSIGNED BIGINT, but + // inserts will fail with ER_DUP_ENTRY for other types (or no failure + // if the column is in a non-unique SK). + new_val = std::numeric_limits<ulonglong>::max(); + auto_incr = new_val; // Store the largest value into auto_incr + break; + } - new_val = n * inc + off; + new_val = n * inc + off; - // Attempt to store the new value (plus 1 since m_auto_incr_val contains - // the next available value) into the atomic value. If the current - // value no longer matches what we have in 'last_val' this will fail and - // we will repeat the loop (`last_val` will automatically get updated - // with the current value). - // - // See above explanation for inc == 1 for why we use std::min. - } while (!auto_incr.compare_exchange_weak(last_val, - std::min(new_val + 1, max_val))); + // Attempt to store the new value (plus 1 since m_auto_incr_val contains + // the next available value) into the atomic value. If the current + // value no longer matches what we have in 'last_val' this will fail and + // we will repeat the loop (`last_val` will automatically get updated + // with the current value). + // + // See above explanation for inc == 1 for why we use std::min. + } while (!auto_incr.compare_exchange_weak( + last_val, std::min(new_val + 1, max_val))); + } } *first_value = new_val; @@ -12044,13 +12263,13 @@ const char *dbug_print_item(Item *const item) { char *const buf = dbug_item_print_buf; String str(buf, sizeof(dbug_item_print_buf), &my_charset_bin); str.length(0); - if (!item) - return "(Item*)nullptr"; + if (!item) return "(Item*)nullptr"; item->print(&str, QT_ORDINARY); - if (str.c_ptr() == buf) + if (str.c_ptr() == buf) { return buf; - else + } else { return "Couldn't fit into buffer"; + } } #endif /*DBUG_OFF*/ @@ -12179,7 +12398,6 @@ bool ha_rocksdb::prepare_inplace_alter_table( my_core::Alter_inplace_info::DROP_UNIQUE_INDEX | my_core::Alter_inplace_info::ADD_INDEX | my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) { - if (has_hidden_pk(altered_table)) { new_n_keys += 1; } @@ -12466,7 +12684,7 @@ int ha_rocksdb::inplace_populate_sk( const int new_packed_size = index->pack_record( new_table_arg, m_pack_buffer, table->record[0], m_sk_packed_tuple, &m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, - nullptr, nullptr, m_ttl_bytes); + nullptr, m_ttl_bytes); const rocksdb::Slice key = rocksdb::Slice( reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size); @@ -12514,9 +12732,9 @@ int ha_rocksdb::inplace_populate_sk( is used inside print_keydup_error so that the error message shows the duplicate record. */ - if (index->unpack_record(new_table_arg, new_table_arg->record[0], - &merge_key, nullptr, - m_verify_row_debug_checksums)) { + if (index->unpack_record( + new_table_arg, new_table_arg->record[0], &merge_key, + &merge_val, m_converter->get_verify_row_debug_checksums())) { /* Should never reach here */ DBUG_ASSERT(0); } @@ -12546,7 +12764,9 @@ int ha_rocksdb::inplace_populate_sk( DBUG_RETURN(res); } - if ((res = tx->finish_bulk_load())) { + bool is_critical_error; + res = tx->finish_bulk_load(&is_critical_error); + if (res && is_critical_error) { // NO_LINT_DEBUG sql_print_error("Error finishing bulk load."); DBUG_RETURN(res); @@ -12756,22 +12976,22 @@ bool ha_rocksdb::commit_inplace_alter_table( #define SHOW_FNAME(name) rocksdb_show_##name -#define DEF_SHOW_FUNC(name, key) \ - static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR * var, char *buff) { \ - rocksdb_status_counters.name = \ - rocksdb_stats->getTickerCount(rocksdb::key); \ - var->type = SHOW_LONGLONG; \ - var->value = (char *)&rocksdb_status_counters.name; \ - return HA_EXIT_SUCCESS; \ +#define DEF_SHOW_FUNC(name, key) \ + static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR * var, char *buff) { \ + rocksdb_status_counters.name = \ + rocksdb_stats->getTickerCount(rocksdb::key); \ + var->type = SHOW_LONGLONG; \ + var->value = reinterpret_cast<char *>(&rocksdb_status_counters.name); \ + return HA_EXIT_SUCCESS; \ } -#define DEF_STATUS_VAR(name) \ +#define DEF_STATUS_VAR(name) \ { "rocksdb_" #name, (char *)&SHOW_FNAME(name), SHOW_FUNC } -#define DEF_STATUS_VAR_PTR(name, ptr, option) \ +#define DEF_STATUS_VAR_PTR(name, ptr, option) \ { "rocksdb_" name, (char *)ptr, option } -#define DEF_STATUS_VAR_FUNC(name, ptr, option) \ +#define DEF_STATUS_VAR_FUNC(name, ptr, option) \ { name, reinterpret_cast<char *>(ptr), option } struct rocksdb_status_counters_t { @@ -13001,9 +13221,8 @@ static void show_myrocks_vars(THD *thd, SHOW_VAR *var, char *buff) { var->value = reinterpret_cast<char *>(&myrocks_status_variables); } -static ulonglong -io_stall_prop_value(const std::map<std::string, std::string> &props, - const std::string &key) { +static ulonglong io_stall_prop_value( + const std::map<std::string, std::string> &props, const std::string &key) { std::map<std::string, std::string>::const_iterator iter = props.find("io_stalls." + key); if (iter != props.end()) { @@ -13181,6 +13400,10 @@ static SHOW_VAR rocksdb_status_vars[] = { SHOW_LONGLONG), DEF_STATUS_VAR_PTR("number_sst_entry_other", &rocksdb_num_sst_entry_other, SHOW_LONGLONG), +#ifndef DBUG_OFF + DEF_STATUS_VAR_PTR("num_get_for_update_calls", + &rocksdb_num_get_for_update_calls, SHOW_LONGLONG), +#endif // the variables generated by SHOW_FUNC are sorted only by prefix (first // arg in the tuple below), so make sure it is unique to make sorting // deterministic as quick sort is not stable @@ -13422,6 +13645,51 @@ bool Rdb_manual_compaction_thread::is_manual_compaction_finished(int mc_id) { return finished; } +/** + * Locking read + Not Found + Read Committed occurs if we accessed + * a row by Seek, tried to lock it, failed, released and reacquired the + * snapshot (because of READ COMMITTED mode) and the row was deleted by + * someone else in the meantime. + * If so, we either just skipping the row, or re-creating a snapshot + * and seek again. In both cases, Read Committed constraint is not broken. + */ +bool ha_rocksdb::should_skip_invalidated_record(const int rc) { + if ((m_lock_rows != RDB_LOCK_NONE && rc == HA_ERR_KEY_NOT_FOUND && + my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED)) { + return true; + } + return false; +} +/** + * Indicating snapshot needs to be re-created and retrying seek again, + * instead of returning errors or empty set. This is normally applicable + * when hitting kBusy when locking the first row of the transaction, + * with Repeatable Read isolation level. + */ +bool ha_rocksdb::should_recreate_snapshot(const int rc, + const bool is_new_snapshot) { + if (should_skip_invalidated_record(rc) || + (rc == HA_ERR_ROCKSDB_STATUS_BUSY && is_new_snapshot)) { + return true; + } + return false; +} + +/** + * If calling put/delete/singledelete without locking the row, + * it is necessary to pass assume_tracked=false to RocksDB TX API. + * Read Free Replication and Blind Deletes are the cases when + * using TX API and skipping row locking. + */ +bool ha_rocksdb::can_assume_tracked(THD *thd) { +#ifdef MARIAROCKS_NOT_YET + if (use_read_free_rpl() || (THDVAR(thd, blind_delete_primary_key))) { + return false; + } +#endif + return true; +} + bool ha_rocksdb::check_bloom_and_set_bounds( THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, const bool use_all_keys, size_t bound_len, uchar *const lower_bound, @@ -13482,20 +13750,22 @@ bool ha_rocksdb::can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, shorter require all parts of the key to be available for the short key match. */ - if ((use_all_keys && prefix_extractor->InRange(eq_cond)) - || prefix_extractor->SameResultWhenAppended(eq_cond)) + if ((use_all_keys && prefix_extractor->InRange(eq_cond)) || + prefix_extractor->SameResultWhenAppended(eq_cond)) { can_use = true; - else + } else { can_use = false; + } } else { /* if prefix extractor is not defined, all key parts have to be used by eq_cond. */ - if (use_all_keys) + if (use_all_keys) { can_use = true; - else + } else { can_use = false; + } } return can_use; @@ -13514,7 +13784,7 @@ bool rdb_is_ttl_enabled() { return rocksdb_enable_ttl; } bool rdb_is_ttl_read_filtering_enabled() { return rocksdb_enable_ttl_read_filtering; } -#ifndef NDEBUG +#ifndef DBUG_OFF int rdb_dbug_set_ttl_rec_ts() { return rocksdb_debug_ttl_rec_ts; } int rdb_dbug_set_ttl_snapshot_ts() { return rocksdb_debug_ttl_snapshot_ts; } int rdb_dbug_set_ttl_read_filter_ts() { @@ -13561,17 +13831,17 @@ const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type) { static_assert(RDB_IO_ERROR_LAST == 4, "Please handle all the error types."); switch (err_type) { - case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_TX_COMMIT: - return "RDB_IO_ERROR_TX_COMMIT"; - case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_DICT_COMMIT: - return "RDB_IO_ERROR_DICT_COMMIT"; - case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_BG_THREAD: - return "RDB_IO_ERROR_BG_THREAD"; - case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_GENERAL: - return "RDB_IO_ERROR_GENERAL"; - default: - DBUG_ASSERT(false); - return "(unknown)"; + case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_TX_COMMIT: + return "RDB_IO_ERROR_TX_COMMIT"; + case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_DICT_COMMIT: + return "RDB_IO_ERROR_DICT_COMMIT"; + case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_BG_THREAD: + return "RDB_IO_ERROR_BG_THREAD"; + case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_GENERAL: + return "RDB_IO_ERROR_GENERAL"; + default: + DBUG_ASSERT(false); + return "(unknown)"; } } @@ -13583,32 +13853,38 @@ const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type) { void rdb_handle_io_error(const rocksdb::Status status, const RDB_IO_ERROR_TYPE err_type) { if (status.IsIOError()) { - switch (err_type) { - case RDB_IO_ERROR_TX_COMMIT: - case RDB_IO_ERROR_DICT_COMMIT: { - rdb_log_status_error(status, "failed to write to WAL"); - /* NO_LINT_DEBUG */ - sql_print_error("MyRocks: aborting on WAL write error."); - abort(); - break; - } - case RDB_IO_ERROR_BG_THREAD: { - rdb_log_status_error(status, "BG thread failed to write to RocksDB"); - /* NO_LINT_DEBUG */ - sql_print_error("MyRocks: aborting on BG write error."); - abort(); - break; - } - case RDB_IO_ERROR_GENERAL: { - rdb_log_status_error(status, "failed on I/O"); - /* NO_LINT_DEBUG */ - sql_print_error("MyRocks: aborting on I/O error."); - abort(); - break; + /* skip dumping core if write failed and we are allowed to do so */ +#ifdef MARIAROCKS_NOT_YET + if (skip_core_dump_on_error) { + opt_core_file = false; } - default: - DBUG_ASSERT(0); - break; +#endif + switch (err_type) { + case RDB_IO_ERROR_TX_COMMIT: + case RDB_IO_ERROR_DICT_COMMIT: { + rdb_log_status_error(status, "failed to write to WAL"); + /* NO_LINT_DEBUG */ + sql_print_error("MyRocks: aborting on WAL write error."); + abort(); + break; + } + case RDB_IO_ERROR_BG_THREAD: { + rdb_log_status_error(status, "BG thread failed to write to RocksDB"); + /* NO_LINT_DEBUG */ + sql_print_error("MyRocks: aborting on BG write error."); + abort(); + break; + } + case RDB_IO_ERROR_GENERAL: { + rdb_log_status_error(status, "failed on I/O"); + /* NO_LINT_DEBUG */ + sql_print_error("MyRocks: aborting on I/O error."); + abort(); + break; + } + default: + DBUG_ASSERT(0); + break; } } else if (status.IsCorruption()) { rdb_log_status_error(status, "data corruption detected!"); @@ -13618,16 +13894,16 @@ void rdb_handle_io_error(const rocksdb::Status status, abort(); } else if (!status.ok()) { switch (err_type) { - case RDB_IO_ERROR_DICT_COMMIT: { - rdb_log_status_error(status, "Failed to write to WAL (dictionary)"); - /* NO_LINT_DEBUG */ - sql_print_error("MyRocks: aborting on WAL write error."); - abort(); - break; - } - default: - rdb_log_status_error(status, "Failed to read/write in RocksDB"); - break; + case RDB_IO_ERROR_DICT_COMMIT: { + rdb_log_status_error(status, "Failed to write to WAL (dictionary)"); + /* NO_LINT_DEBUG */ + sql_print_error("MyRocks: aborting on WAL write error."); + abort(); + break; + } + default: + rdb_log_status_error(status, "Failed to read/write in RocksDB"); + break; } } } @@ -13735,9 +14011,10 @@ void rocksdb_set_delayed_write_rate(THD *thd, struct st_mysql_sys_var *var, if (!s.ok()) { /* NO_LINT_DEBUG */ - sql_print_warning("MyRocks: failed to update delayed_write_rate. " - "status code = %d, status = %s", - s.code(), s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to update delayed_write_rate. " + "status code = %d, status = %s", + s.code(), s.ToString().c_str()); } } RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex); @@ -13795,8 +14072,7 @@ int mysql_value_to_bool(struct st_mysql_value *value, my_bool *return_value) { } else if (new_value_type == MYSQL_VALUE_TYPE_INT) { long long intbuf; value->val_int(value, &intbuf); - if (intbuf > 1) - return 1; + if (intbuf > 1) return 1; *return_value = intbuf > 0 ? TRUE : FALSE; } else { return 1; @@ -13815,12 +14091,14 @@ int rocksdb_check_bulk_load( Rdb_transaction *tx = get_tx_from_thd(thd); if (tx != nullptr) { - const int rc = tx->finish_bulk_load(); - if (rc != 0) { + bool is_critical_error; + const int rc = tx->finish_bulk_load(&is_critical_error); + if (rc != 0 && is_critical_error) { // NO_LINT_DEBUG - sql_print_error("RocksDB: Error %d finalizing last SST file while " - "setting bulk loading variable", - rc); + sql_print_error( + "RocksDB: Error %d finalizing last SST file while " + "setting bulk loading variable", + rc); THDVAR(thd, bulk_load) = 0; return 1; } @@ -13868,9 +14146,10 @@ static void rocksdb_set_max_background_jobs(THD *thd, if (!s.ok()) { /* NO_LINT_DEBUG */ - sql_print_warning("MyRocks: failed to update max_background_jobs. " - "Status code = %d, status = %s.", - s.code(), s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to update max_background_jobs. " + "Status code = %d, status = %s.", + s.code(), s.ToString().c_str()); } } @@ -13896,9 +14175,10 @@ static void rocksdb_set_bytes_per_sync( if (!s.ok()) { /* NO_LINT_DEBUG */ - sql_print_warning("MyRocks: failed to update max_background_jobs. " - "Status code = %d, status = %s.", - s.code(), s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to update max_background_jobs. " + "Status code = %d, status = %s.", + s.code(), s.ToString().c_str()); } } @@ -13924,9 +14204,10 @@ static void rocksdb_set_wal_bytes_per_sync( if (!s.ok()) { /* NO_LINT_DEBUG */ - sql_print_warning("MyRocks: failed to update max_background_jobs. " - "Status code = %d, status = %s.", - s.code(), s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to update max_background_jobs. " + "Status code = %d, status = %s.", + s.code(), s.ToString().c_str()); } } @@ -13953,7 +14234,7 @@ static int rocksdb_validate_set_block_cache_size( } if (new_value < RDB_MIN_BLOCK_CACHE_SIZE || - (uint64_t)new_value > (uint64_t)LONGLONG_MAX) { + (uint64_t)new_value > (uint64_t)LLONG_MAX) { return HA_EXIT_FAILURE; } @@ -13969,17 +14250,19 @@ static int rocksdb_validate_set_block_cache_size( return HA_EXIT_SUCCESS; } -static int -rocksdb_validate_update_cf_options(THD * /* unused */, - struct st_mysql_sys_var * /*unused*/, - void *save, struct st_mysql_value *value) { - +static int rocksdb_validate_update_cf_options( + THD * /* unused */, struct st_mysql_sys_var * /*unused*/, void *save, + struct st_mysql_value *value) { char buff[STRING_BUFFER_USUAL_SIZE]; const char *str; int length; length = sizeof(buff); str = value->val_str(value, buff, &length); - *(const char **)save = str; + // In some cases, str can point to buff in the stack. + // This can cause invalid memory access after validation is finished. + // To avoid this kind case, let's alway duplicate the str if str is not + // nullptr + *(const char **)save = (str == nullptr) ? nullptr : my_strdup(str, MYF(0)); if (str == nullptr) { return HA_EXIT_SUCCESS; @@ -13993,13 +14276,17 @@ rocksdb_validate_update_cf_options(THD * /* unused */, my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "rocksdb_update_cf_options", str); return HA_EXIT_FAILURE; } + // Loop through option_map and create missing column families + for (Rdb_cf_options::Name_to_config_t::iterator it = option_map.begin(); + it != option_map.end(); ++it) { + cf_manager.get_or_create_cf(rdb, it->first); + } return HA_EXIT_SUCCESS; } -static void -rocksdb_set_update_cf_options(THD *const /* unused */, - struct st_mysql_sys_var *const /* unused */, - void *const var_ptr, const void *const save) { +static void rocksdb_set_update_cf_options( + THD *const /* unused */, struct st_mysql_sys_var *const /* unused */, + void *const var_ptr, const void *const save) { const char *const val = *static_cast<const char *const *>(save); RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex); @@ -14017,7 +14304,7 @@ rocksdb_set_update_cf_options(THD *const /* unused */, // Reset the pointers regardless of how much success we had with updating // the CF options. This will results in consistent behavior and avoids // dealing with cases when only a subset of CF-s was successfully updated. - *reinterpret_cast<char **>(var_ptr) = my_strdup(val, MYF(0)); + *reinterpret_cast<const char **>(var_ptr) = val; // Do the real work of applying the changes. Rdb_cf_options::Name_to_config_t option_map; @@ -14045,9 +14332,10 @@ rocksdb_set_update_cf_options(THD *const /* unused */, if (s != rocksdb::Status::OK()) { // NO_LINT_DEBUG - sql_print_warning("MyRocks: failed to convert the options for column " - "family '%s' to a map. %s", cf_name.c_str(), - s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to convert the options for column " + "family '%s' to a map. %s", + cf_name.c_str(), s.ToString().c_str()); } else { DBUG_ASSERT(rdb != nullptr); @@ -14056,14 +14344,16 @@ rocksdb_set_update_cf_options(THD *const /* unused */, if (s != rocksdb::Status::OK()) { // NO_LINT_DEBUG - sql_print_warning("MyRocks: failed to apply the options for column " - "family '%s'. %s", cf_name.c_str(), - s.ToString().c_str()); + sql_print_warning( + "MyRocks: failed to apply the options for column " + "family '%s'. %s", + cf_name.c_str(), s.ToString().c_str()); } else { // NO_LINT_DEBUG - sql_print_information("MyRocks: options for column family '%s' " - "have been successfully updated.", - cf_name.c_str()); + sql_print_information( + "MyRocks: options for column family '%s' " + "have been successfully updated.", + cf_name.c_str()); // Make sure that data is internally consistent as well and update // the CF options. This is necessary also to make sure that the CF @@ -14126,18 +14416,33 @@ void ha_rocksdb::rpl_after_update_rows() { DBUG_VOID_RETURN; } +bool ha_rocksdb::is_read_free_rpl_table() const { + return table->s && m_tbl_def->m_is_read_free_rpl_table; +} + /** @brief - Read Free Replication can be used or not. Returning False means - Read Free Replication can be used. Read Free Replication can be used - on UPDATE or DELETE row events, and table must have user defined - primary key. + Read Free Replication can be used or not. Returning true means + Read Free Replication can be used. */ -bool ha_rocksdb::use_read_free_rpl() { +bool ha_rocksdb::use_read_free_rpl() const { DBUG_ENTER_FUNC(); - DBUG_RETURN((m_in_rpl_delete_rows || m_in_rpl_update_rows) && - !has_hidden_pk(table) && m_use_read_free_rpl); + if (!ha_thd()->rli_slave || table->triggers || !is_read_free_rpl_table()) { + DBUG_RETURN(false); + } + + switch (rocksdb_read_free_rpl) { + case read_free_rpl_type::OFF: + DBUG_RETURN(false); + case read_free_rpl_type::PK_ONLY: + DBUG_RETURN(!has_hidden_pk(table) && table->s->keys == 1); + case read_free_rpl_type::PK_SK: + DBUG_RETURN(!has_hidden_pk(table)); + } + + DBUG_ASSERT(false); + DBUG_RETURN(false); } #endif // MARIAROCKS_NOT_YET @@ -14176,7 +14481,7 @@ void sql_print_verbose_info(const char *format, ...) } } -} // namespace myrocks +} // namespace myrocks /** |