summaryrefslogtreecommitdiff
path: root/storage/rocksdb/ha_rocksdb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/rocksdb/ha_rocksdb.cc')
-rw-r--r--storage/rocksdb/ha_rocksdb.cc855
1 files changed, 522 insertions, 333 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index 59c6e1ab33b..d3157b0b800 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -59,6 +59,7 @@
#include "./rdb_i_s.h"
#include "./rdb_index_merge.h"
#include "./rdb_mutex_wrapper.h"
+#include "./rdb_psi.h"
#include "./rdb_threads.h"
// Internal MySQL APIs not exposed in any header.
@@ -302,7 +303,7 @@ static void rocksdb_set_pause_background_work(
my_core::THD *const thd MY_ATTRIBUTE((__unused__)),
struct st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)),
void *const var_ptr MY_ATTRIBUTE((__unused__)), const void *const save) {
- mysql_mutex_lock(&rdb_sysvars_mutex);
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
const bool pause_requested = *static_cast<const bool *>(save);
if (rocksdb_pause_background_work != pause_requested) {
if (pause_requested) {
@@ -312,7 +313,7 @@ static void rocksdb_set_pause_background_work(
}
rocksdb_pause_background_work = pause_requested;
}
- mysql_mutex_unlock(&rdb_sysvars_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
static void rocksdb_set_compaction_options(THD *thd,
@@ -329,6 +330,10 @@ static void rocksdb_set_rate_limiter_bytes_per_sec(THD *thd,
void *var_ptr,
const void *save);
+static void rocksdb_set_delayed_write_rate(THD *thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save);
+
static void rdb_set_collation_exception_list(const char *exception_list);
static void rocksdb_set_collation_exception_list(THD *thd,
struct st_mysql_sys_var *var,
@@ -350,14 +355,16 @@ static long long rocksdb_block_cache_size;
/* Use unsigned long long instead of uint64_t because of MySQL compatibility */
static unsigned long long // NOLINT(runtime/int)
rocksdb_rate_limiter_bytes_per_sec;
+static unsigned long long rocksdb_delayed_write_rate;
static unsigned long // NOLINT(runtime/int)
- rocksdb_persistent_cache_size;
+ rocksdb_persistent_cache_size_mb;
static uint64_t rocksdb_info_log_level;
static char *rocksdb_wal_dir;
static char *rocksdb_persistent_cache_path;
static uint64_t rocksdb_index_type;
static char rocksdb_background_sync;
static uint32_t rocksdb_debug_optimizer_n_rows;
+static my_bool rocksdb_force_compute_memtable_stats;
static my_bool rocksdb_debug_optimizer_no_zero_cardinality;
static uint32_t rocksdb_wal_recovery_mode;
static uint32_t rocksdb_access_hint_on_compaction_start;
@@ -413,11 +420,11 @@ static void rocksdb_set_rocksdb_info_log_level(
const void *const save) {
DBUG_ASSERT(save != nullptr);
- mysql_mutex_lock(&rdb_sysvars_mutex);
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
rocksdb_info_log_level = *static_cast<const uint64_t *>(save);
rocksdb_db_options.info_log->SetInfoLogLevel(
static_cast<const rocksdb::InfoLogLevel>(rocksdb_info_log_level));
- mysql_mutex_unlock(&rdb_sysvars_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
static const char *index_type_names[] = {"kBinarySearch", "kHashSearch", NullS};
@@ -478,6 +485,12 @@ static MYSQL_THDVAR_BOOL(
"update and delete",
nullptr, nullptr, FALSE);
+static MYSQL_THDVAR_BOOL(
+ blind_delete_primary_key, PLUGIN_VAR_RQCMDARG,
+ "Deleting rows by primary key lookup, without reading rows (Blind Deletes)."
+ " Blind delete is disabled if the table has secondary key",
+ nullptr, nullptr, FALSE);
+
static MYSQL_THDVAR_STR(
read_free_rpl_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
"List of tables that will use read-free replication on the slave "
@@ -561,6 +574,13 @@ static MYSQL_SYSVAR_ULONGLONG(
nullptr, rocksdb_set_rate_limiter_bytes_per_sec, /* default */ 0L,
/* min */ 0L, /* max */ MAX_RATE_LIMITER_BYTES_PER_SEC, 0);
+static MYSQL_SYSVAR_ULONGLONG(delayed_write_rate, rocksdb_delayed_write_rate,
+ PLUGIN_VAR_RQCMDARG,
+ "DBOptions::delayed_write_rate", nullptr,
+ rocksdb_set_delayed_write_rate,
+ rocksdb_db_options.delayed_write_rate, 0,
+ UINT64_MAX, 0);
+
static MYSQL_SYSVAR_ENUM(
info_log_level, rocksdb_info_log_level, PLUGIN_VAR_RQCMDARG,
"Filter level for info logs to be written mysqld error log. "
@@ -579,8 +599,9 @@ static MYSQL_THDVAR_INT(
static MYSQL_SYSVAR_UINT(
wal_recovery_mode, rocksdb_wal_recovery_mode, PLUGIN_VAR_RQCMDARG,
- "DBOptions::wal_recovery_mode for RocksDB", nullptr, nullptr,
- /* default */ (uint)rocksdb::WALRecoveryMode::kPointInTimeRecovery,
+ "DBOptions::wal_recovery_mode for RocksDB. Default is kAbsoluteConsistency",
+ nullptr, nullptr,
+ /* default */ (uint)rocksdb::WALRecoveryMode::kAbsoluteConsistency,
/* min */ (uint)rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords,
/* max */ (uint)rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords, 0);
@@ -638,13 +659,6 @@ static MYSQL_SYSVAR_ULONG(max_total_wal_size,
/* min */ 0L, /* max */ LONG_MAX, 0);
static MYSQL_SYSVAR_BOOL(
- disabledatasync,
- *reinterpret_cast<my_bool *>(&rocksdb_db_options.disableDataSync),
- PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
- "DBOptions::disableDataSync for RocksDB", nullptr, nullptr,
- rocksdb_db_options.disableDataSync);
-
-static MYSQL_SYSVAR_BOOL(
use_fsync, *reinterpret_cast<my_bool *>(&rocksdb_db_options.use_fsync),
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"DBOptions::use_fsync for RocksDB", nullptr, nullptr,
@@ -662,10 +676,10 @@ static MYSQL_SYSVAR_STR(
nullptr, "");
static MYSQL_SYSVAR_ULONG(
- persistent_cache_size, rocksdb_persistent_cache_size,
+ persistent_cache_size_mb, rocksdb_persistent_cache_size_mb,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
- "Size of cache for BlockBasedTableOptions::persistent_cache for RocksDB",
- nullptr, nullptr, rocksdb_persistent_cache_size,
+ "Size of cache in MB for BlockBasedTableOptions::persistent_cache "
+ "for RocksDB", nullptr, nullptr, rocksdb_persistent_cache_size_mb,
/* min */ 0L, /* max */ ULONG_MAX, 0);
static MYSQL_SYSVAR_ULONG(
@@ -946,9 +960,11 @@ static MYSQL_SYSVAR_BOOL(background_sync, rocksdb_background_sync,
"turns on background syncs for RocksDB", nullptr,
nullptr, FALSE);
-static MYSQL_THDVAR_BOOL(write_sync, PLUGIN_VAR_RQCMDARG,
- "WriteOptions::sync for RocksDB", nullptr, nullptr,
- rocksdb::WriteOptions().sync);
+static MYSQL_THDVAR_UINT(flush_log_at_trx_commit, PLUGIN_VAR_RQCMDARG,
+ "Sync on transaction commit. Similar to "
+ "innodb_flush_log_at_trx_commit. 1: sync on commit, "
+ "0,2: not sync on commit",
+ nullptr, nullptr, 1, 0, 2, 0);
static MYSQL_THDVAR_BOOL(write_disable_wal, PLUGIN_VAR_RQCMDARG,
"WriteOptions::disableWAL for RocksDB", nullptr,
@@ -986,6 +1002,12 @@ static MYSQL_SYSVAR_UINT(
"Test only to override rocksdb estimates of table size in a memtable",
nullptr, nullptr, 0, /* min */ 0, /* max */ INT_MAX, 0);
+static MYSQL_SYSVAR_BOOL(force_compute_memtable_stats,
+ rocksdb_force_compute_memtable_stats,
+ PLUGIN_VAR_RQCMDARG,
+ "Force to always compute memtable stats",
+ nullptr, nullptr, TRUE);
+
static MYSQL_SYSVAR_BOOL(
debug_optimizer_no_zero_cardinality,
rocksdb_debug_optimizer_no_zero_cardinality, PLUGIN_VAR_RQCMDARG,
@@ -1085,6 +1107,7 @@ static MYSQL_SYSVAR_BOOL(
"Counting SingleDelete as rocksdb_compaction_sequential_deletes", nullptr,
nullptr, rocksdb_compaction_sequential_deletes_count_sd);
+
static MYSQL_SYSVAR_BOOL(
print_snapshot_conflict_queries, rocksdb_print_snapshot_conflict_queries,
PLUGIN_VAR_RQCMDARG,
@@ -1104,6 +1127,11 @@ static MYSQL_THDVAR_BOOL(verify_row_debug_checksums, PLUGIN_VAR_RQCMDARG,
"Verify checksums when reading index/table records",
nullptr, nullptr, false /* default value */);
+static MYSQL_THDVAR_BOOL(master_skip_tx_api, PLUGIN_VAR_RQCMDARG,
+ "Skipping holding any lock on row access. "
+ "Not effective on slave.",
+ nullptr, nullptr, false);
+
static MYSQL_SYSVAR_UINT(
validate_tables, rocksdb_validate_tables,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
@@ -1154,6 +1182,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(skip_unique_check_tables),
MYSQL_SYSVAR(trace_sst_api),
MYSQL_SYSVAR(commit_in_the_middle),
+ MYSQL_SYSVAR(blind_delete_primary_key),
MYSQL_SYSVAR(read_free_rpl_tables),
MYSQL_SYSVAR(bulk_load_size),
MYSQL_SYSVAR(merge_buf_size),
@@ -1167,14 +1196,14 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(error_if_exists),
MYSQL_SYSVAR(paranoid_checks),
MYSQL_SYSVAR(rate_limiter_bytes_per_sec),
+ MYSQL_SYSVAR(delayed_write_rate),
MYSQL_SYSVAR(info_log_level),
MYSQL_SYSVAR(max_open_files),
MYSQL_SYSVAR(max_total_wal_size),
- MYSQL_SYSVAR(disabledatasync),
MYSQL_SYSVAR(use_fsync),
MYSQL_SYSVAR(wal_dir),
MYSQL_SYSVAR(persistent_cache_path),
- MYSQL_SYSVAR(persistent_cache_size),
+ MYSQL_SYSVAR(persistent_cache_size_mb),
MYSQL_SYSVAR(delete_obsolete_files_period_micros),
MYSQL_SYSVAR(base_background_compactions),
MYSQL_SYSVAR(max_background_compactions),
@@ -1224,7 +1253,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(background_sync),
- MYSQL_SYSVAR(write_sync),
+ MYSQL_SYSVAR(flush_log_at_trx_commit),
MYSQL_SYSVAR(write_disable_wal),
MYSQL_SYSVAR(write_ignore_missing_column_families),
@@ -1234,6 +1263,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(records_in_range),
MYSQL_SYSVAR(force_index_records_in_range),
MYSQL_SYSVAR(debug_optimizer_n_rows),
+ MYSQL_SYSVAR(force_compute_memtable_stats),
MYSQL_SYSVAR(debug_optimizer_no_zero_cardinality),
MYSQL_SYSVAR(compact_cf),
@@ -1259,6 +1289,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(checksums_pct),
MYSQL_SYSVAR(store_row_debug_checksums),
MYSQL_SYSVAR(verify_row_debug_checksums),
+ MYSQL_SYSVAR(master_skip_tx_api),
MYSQL_SYSVAR(validate_tables),
MYSQL_SYSVAR(table_stats_sampling_pct),
@@ -1268,7 +1299,7 @@ static rocksdb::WriteOptions
rdb_get_rocksdb_write_options(my_core::THD *const thd) {
rocksdb::WriteOptions opt;
- opt.sync = THDVAR(thd, write_sync);
+ opt.sync = THDVAR(thd, flush_log_at_trx_commit) == 1;
opt.disableWAL = THDVAR(thd, write_disable_wal);
opt.ignore_missing_column_families =
THDVAR(thd, write_ignore_missing_column_families);
@@ -1292,85 +1323,6 @@ Rdb_open_tables_map::get_hash_key(Rdb_table_handler *const table_handler,
}
/*
- The following is needed as an argument for mysql_stage_register,
- irrespectively of whether we're compiling with P_S or not.
-*/
-PSI_stage_info stage_waiting_on_row_lock = {0, "Waiting for row lock", 0};
-
-#ifdef HAVE_PSI_INTERFACE
-static PSI_thread_key rdb_background_psi_thread_key;
-static PSI_thread_key rdb_drop_idx_psi_thread_key;
-
-static PSI_stage_info *all_rocksdb_stages[] = {&stage_waiting_on_row_lock};
-
-static my_core::PSI_mutex_key rdb_psi_open_tbls_mutex_key,
- rdb_signal_bg_psi_mutex_key, rdb_signal_drop_idx_psi_mutex_key,
- rdb_collation_data_mutex_key, rdb_mem_cmp_space_mutex_key,
- key_mutex_tx_list, rdb_sysvars_psi_mutex_key;
-
-static PSI_mutex_info all_rocksdb_mutexes[] = {
- {&rdb_psi_open_tbls_mutex_key, "open tables", PSI_FLAG_GLOBAL},
- {&rdb_signal_bg_psi_mutex_key, "stop background", PSI_FLAG_GLOBAL},
- {&rdb_signal_drop_idx_psi_mutex_key, "signal drop index", PSI_FLAG_GLOBAL},
- {&rdb_collation_data_mutex_key, "collation data init", PSI_FLAG_GLOBAL},
- {&rdb_mem_cmp_space_mutex_key, "collation space char data init",
- PSI_FLAG_GLOBAL},
- {&key_mutex_tx_list, "tx_list", PSI_FLAG_GLOBAL},
- {&rdb_sysvars_psi_mutex_key, "setting sysvar", PSI_FLAG_GLOBAL},
-};
-
-static PSI_rwlock_key key_rwlock_collation_exception_list;
-static PSI_rwlock_key key_rwlock_read_free_rpl_tables;
-static PSI_rwlock_key key_rwlock_skip_unique_check_tables;
-
-static PSI_rwlock_info all_rocksdb_rwlocks[] = {
- {&key_rwlock_collation_exception_list, "collation_exception_list",
- PSI_FLAG_GLOBAL},
- {&key_rwlock_read_free_rpl_tables, "read_free_rpl_tables", PSI_FLAG_GLOBAL},
- {&key_rwlock_skip_unique_check_tables, "skip_unique_check_tables",
- PSI_FLAG_GLOBAL},
-};
-
-PSI_cond_key rdb_signal_bg_psi_cond_key, rdb_signal_drop_idx_psi_cond_key;
-
-static PSI_cond_info all_rocksdb_conds[] = {
- {&rdb_signal_bg_psi_cond_key, "cond signal background", PSI_FLAG_GLOBAL},
- {&rdb_signal_drop_idx_psi_cond_key, "cond signal drop index",
- PSI_FLAG_GLOBAL},
-};
-
-static PSI_thread_info all_rocksdb_threads[] = {
- {&rdb_background_psi_thread_key, "background", PSI_FLAG_GLOBAL},
- {&rdb_drop_idx_psi_thread_key, "drop index", PSI_FLAG_GLOBAL},
-};
-
-static void init_rocksdb_psi_keys() {
- const char *const category = "rocksdb";
- int count;
-
- if (PSI_server == nullptr)
- return;
-
- count = array_elements(all_rocksdb_mutexes);
- PSI_server->register_mutex(category, all_rocksdb_mutexes, count);
-
- count = array_elements(all_rocksdb_rwlocks);
- PSI_server->register_rwlock(category, all_rocksdb_rwlocks, count);
-
- count = array_elements(all_rocksdb_conds);
- // TODO Disabling PFS for conditions due to the bug
- // https://github.com/MySQLOnRocksDB/mysql-5.6/issues/92
- // PSI_server->register_cond(category, all_rocksdb_conds, count);
-
- count = array_elements(all_rocksdb_stages);
- mysql_stage_register(category, all_rocksdb_stages, count);
-
- count = array_elements(all_rocksdb_threads);
- mysql_thread_register(category, all_rocksdb_threads, count);
-}
-#endif
-
-/*
Drop index thread's control
*/
@@ -1503,10 +1455,12 @@ public:
static void walk_tx_list(Rdb_tx_list_walker *walker) {
DBUG_ASSERT(walker != nullptr);
- mysql_mutex_lock(&s_tx_list_mutex);
+ RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex);
+
for (auto it : s_tx_list)
walker->process_tran(it);
- mysql_mutex_unlock(&s_tx_list_mutex);
+
+ RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex);
}
int set_status_error(THD *const thd, const rocksdb::Status &s,
@@ -1850,15 +1804,15 @@ public:
explicit Rdb_transaction(THD *const thd)
: m_thd(thd), m_tbl_io_perf(nullptr) {
- mysql_mutex_lock(&s_tx_list_mutex);
+ RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex);
s_tx_list.insert(this);
- mysql_mutex_unlock(&s_tx_list_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex);
}
virtual ~Rdb_transaction() {
- mysql_mutex_lock(&s_tx_list_mutex);
+ RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex);
s_tx_list.erase(this);
- mysql_mutex_unlock(&s_tx_list_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex);
}
};
@@ -2075,7 +2029,7 @@ public:
tx_opts.lock_timeout = rdb_convert_sec_to_ms(m_timeout_sec);
tx_opts.deadlock_detect = THDVAR(m_thd, deadlock_detect);
- write_opts.sync = THDVAR(m_thd, write_sync);
+ write_opts.sync = THDVAR(m_thd, flush_log_at_trx_commit) == 1;
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
write_opts.ignore_missing_column_families =
THDVAR(m_thd, write_ignore_missing_column_families);
@@ -2288,7 +2242,7 @@ public:
void start_tx() override {
reset();
- write_opts.sync = THDVAR(m_thd, write_sync);
+ write_opts.sync = THDVAR(m_thd, flush_log_at_trx_commit) == 1;
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
write_opts.ignore_missing_column_families =
THDVAR(m_thd, write_ignore_missing_column_families);
@@ -2367,10 +2321,15 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
Rdb_transaction *&tx = get_tx_from_thd(thd);
// TODO: this is called too many times.. O(#rows)
if (tx == nullptr) {
- if (rpl_skip_tx_api && thd->rli_slave)
+ if ((rpl_skip_tx_api && thd->rli_slave) ||
+ (THDVAR(thd, master_skip_tx_api) && !thd->rli_slave))
+ {
tx = new Rdb_writebatch_impl(thd);
+ }
else
+ {
tx = new Rdb_transaction_impl(thd);
+ }
tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks));
tx->start_tx();
} else {
@@ -2474,7 +2433,7 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,
return HA_EXIT_FAILURE;
}
if (thd->durability_property == HA_IGNORE_DURABILITY &&
- THDVAR(thd, write_sync)) {
+ (THDVAR(thd, flush_log_at_trx_commit) == 1)) {
/**
we set the log sequence as '1' just to trigger hton->flush_logs
*/
@@ -3232,9 +3191,7 @@ static int rocksdb_init_func(void *const p) {
// Validate the assumption about the size of ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN.
static_assert(sizeof(longlong) == 8, "Assuming that longlong is 8 bytes.");
-#ifdef HAVE_PSI_INTERFACE
init_rocksdb_psi_keys();
-#endif
rocksdb_hton = (handlerton *)p;
mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &rdb_open_tables.m_mutex,
@@ -3298,6 +3255,8 @@ static int rocksdb_init_func(void *const p) {
rocksdb_db_options.rate_limiter = rocksdb_rate_limiter;
}
+ rocksdb_db_options.delayed_write_rate = rocksdb_delayed_write_rate;
+
std::shared_ptr<Rdb_logger> myrocks_logger = std::make_shared<Rdb_logger>();
rocksdb::Status s = rocksdb::CreateLoggerFromOptions(
rocksdb_datadir, rocksdb_db_options, &rocksdb_db_options.info_log);
@@ -3383,24 +3342,25 @@ static int rocksdb_init_func(void *const p) {
rocksdb_set_compaction_options(nullptr, nullptr, nullptr, nullptr);
- mysql_mutex_lock(&rdb_sysvars_mutex);
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
DBUG_ASSERT(rocksdb_table_stats_sampling_pct <=
RDB_TBL_STATS_SAMPLE_PCT_MAX);
properties_collector_factory->SetTableStatsSamplingPct(
rocksdb_table_stats_sampling_pct);
- mysql_mutex_unlock(&rdb_sysvars_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
- if (rocksdb_persistent_cache_size > 0) {
+ if (rocksdb_persistent_cache_size_mb > 0) {
std::shared_ptr<rocksdb::PersistentCache> pcache;
+ uint64_t cache_size_bytes= rocksdb_persistent_cache_size_mb * 1024 * 1024;
rocksdb::NewPersistentCache(
rocksdb::Env::Default(), std::string(rocksdb_persistent_cache_path),
- rocksdb_persistent_cache_size, myrocks_logger, true, &pcache);
+ cache_size_bytes, myrocks_logger, true, &pcache);
rocksdb_tbl_options.persistent_cache = pcache;
} else if (strlen(rocksdb_persistent_cache_path)) {
- sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size");
+ sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size_mb");
DBUG_RETURN(1);
}
@@ -3581,7 +3541,7 @@ static int rocksdb_done_func(void *const p) {
// signal the drop index thread to stop
rdb_drop_idx_thread.signal(true);
- // Flush all memtables for not lose data, even if WAL is disabled.
+ // Flush all memtables for not losing data, even if WAL is disabled.
rocksdb_flush_all_memtables();
// Stop all rocksdb background work
@@ -3652,6 +3612,16 @@ static int rocksdb_done_func(void *const p) {
DBUG_RETURN(error);
}
+static inline void rocksdb_smart_seek(bool seek_backward,
+ rocksdb::Iterator *const iter,
+ const rocksdb::Slice &key_slice) {
+ if (seek_backward) {
+ iter->SeekForPrev(key_slice);
+ } else {
+ iter->Seek(key_slice);
+ }
+}
+
/**
@brief
Example of simple lock controls. The "table_handler" it creates is a
@@ -3670,7 +3640,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) {
length = (uint)strlen(table_name);
// First, look up the table in the hash map.
- mysql_mutex_lock(&m_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_mutex);
if (!(table_handler = reinterpret_cast<Rdb_table_handler *>(my_hash_search(
&m_hash, reinterpret_cast<const uchar *>(table_name), length)))) {
// Since we did not find it in the hash map, attempt to create and add it
@@ -3679,7 +3649,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) {
MYF(MY_WME | MY_ZEROFILL), &table_handler, sizeof(*table_handler),
&tmp_name, length + 1, NullS)))) {
// Allocating a new Rdb_table_handler and a new table name failed.
- mysql_mutex_unlock(&m_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
return nullptr;
}
@@ -3690,7 +3660,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) {
if (my_hash_insert(&m_hash, reinterpret_cast<uchar *>(table_handler))) {
// Inserting into the hash map failed.
- mysql_mutex_unlock(&m_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
my_free(table_handler);
return nullptr;
}
@@ -3701,7 +3671,7 @@ Rdb_open_tables_map::get_table_handler(const char *const table_name) {
DBUG_ASSERT(table_handler->m_ref_count >= 0);
table_handler->m_ref_count++;
- mysql_mutex_unlock(&m_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
return table_handler;
}
@@ -3715,7 +3685,7 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const {
const Rdb_table_handler *table_handler;
std::vector<std::string> names;
- mysql_mutex_lock(&m_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_mutex);
for (i = 0; (table_handler = reinterpret_cast<const Rdb_table_handler *>(
my_hash_const_element(&m_hash, i)));
i++) {
@@ -3723,7 +3693,7 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const {
names.push_back(table_handler->m_table_name);
}
DBUG_ASSERT(i == m_hash.records);
- mysql_mutex_unlock(&m_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
return names;
}
@@ -3872,7 +3842,7 @@ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) {
void Rdb_open_tables_map::release_table_handler(
Rdb_table_handler *const table_handler) {
- mysql_mutex_lock(&m_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_mutex);
DBUG_ASSERT(table_handler != nullptr);
DBUG_ASSERT(table_handler->m_ref_count > 0);
@@ -3885,7 +3855,7 @@ void Rdb_open_tables_map::release_table_handler(
my_free(table_handler);
}
- mysql_mutex_unlock(&m_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_mutex);
}
static handler *rocksdb_create_handler(my_core::handlerton *const hton,
@@ -4137,6 +4107,92 @@ int ha_rocksdb::convert_record_from_storage_format(
return convert_record_from_storage_format(key, &retrieved_rec_slice, buf);
}
+int ha_rocksdb::convert_blob_from_storage_format(
+ my_core::Field_blob *const blob,
+ Rdb_string_reader *const reader,
+ bool decode)
+{
+ /* Get the number of bytes needed to store length*/
+ const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
+
+ const char *data_len_str;
+ if (!(data_len_str = reader->read(length_bytes))) {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ memcpy(blob->ptr, data_len_str, length_bytes);
+
+ const uint32 data_len = blob->get_length(
+ reinterpret_cast<const uchar*>(data_len_str), length_bytes,
+ table->s->db_low_byte_first);
+ const char *blob_ptr;
+ if (!(blob_ptr = reader->read(data_len))) {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ if (decode) {
+ // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
+ // platforms)
+ memset(blob->ptr + length_bytes, 0, 8);
+ memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+int ha_rocksdb::convert_varchar_from_storage_format(
+ my_core::Field_varstring *const field_var,
+ Rdb_string_reader *const reader,
+ bool decode)
+{
+ const char *data_len_str;
+ if (!(data_len_str = reader->read(field_var->length_bytes)))
+ return HA_ERR_INTERNAL_ERROR;
+
+ uint data_len;
+ /* field_var->length_bytes is 1 or 2 */
+ if (field_var->length_bytes == 1) {
+ data_len = (uchar)data_len_str[0];
+ } else {
+ DBUG_ASSERT(field_var->length_bytes == 2);
+ data_len = uint2korr(data_len_str);
+ }
+
+ if (data_len > field_var->field_length) {
+ /* The data on disk is longer than table DDL allows? */
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ if (!reader->read(data_len)) {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ if (decode) {
+ memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
+int ha_rocksdb::convert_field_from_storage_format(
+ my_core::Field *const field,
+ Rdb_string_reader *const reader,
+ bool decode,
+ uint len)
+{
+ const char *data_bytes;
+ if (len > 0) {
+ if ((data_bytes = reader->read(len)) == nullptr) {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ if (decode)
+ memcpy(field->ptr, data_bytes, len);
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
/*
@brief
Unpack the record in this->m_retrieved_record and this->m_last_rowkey from
@@ -4168,7 +4224,6 @@ int ha_rocksdb::convert_record_from_storage_format(
DBUG_ASSERT(buf != nullptr);
Rdb_string_reader reader(value);
- const my_ptrdiff_t ptr_diff = buf - table->record[0];
/*
Decode PK fields from the key
@@ -4208,6 +4263,7 @@ int ha_rocksdb::convert_record_from_storage_format(
return HA_ERR_INTERNAL_ERROR;
}
+ int err = HA_EXIT_SUCCESS;
for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) {
const Rdb_field_encoder *const field_dec = it->m_field_enc;
const bool decode = it->m_decode;
@@ -4221,89 +4277,49 @@ int ha_rocksdb::convert_record_from_storage_format(
if (it->m_skip && !reader.read(it->m_skip))
return HA_ERR_INTERNAL_ERROR;
+ uint field_offset = field->ptr - table->record[0];
+ uint null_offset = field->null_offset();
+ bool maybe_null = field->real_maybe_null();
+ field->move_field(buf + field_offset,
+ maybe_null ? buf + null_offset : nullptr,
+ field->null_bit);
+ // WARNING! - Don't return before restoring field->ptr and field->null_ptr!
+
if (isNull) {
if (decode) {
/* This sets the NULL-bit of this record */
- field->set_null(ptr_diff);
+ field->set_null();
/*
Besides that, set the field value to default value. CHECKSUM TABLE
depends on this.
*/
- uint field_offset = field->ptr - table->record[0];
- memcpy(buf + field_offset, table->s->default_values + field_offset,
+ memcpy(field->ptr, table->s->default_values + field_offset,
field->pack_length());
}
- continue;
} else {
- if (decode)
- field->set_notnull(ptr_diff);
- }
-
- if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
- my_core::Field_blob *const blob = (my_core::Field_blob *)field;
- /* Get the number of bytes needed to store length*/
- const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
-
- blob->move_field_offset(ptr_diff);
-
- const char *data_len_str;
- if (!(data_len_str = reader.read(length_bytes))) {
- blob->move_field_offset(-ptr_diff);
- return HA_ERR_INTERNAL_ERROR;
- }
-
- memcpy(blob->ptr, data_len_str, length_bytes);
-
- const uint32 data_len = blob->get_length(
- (uchar *)data_len_str, length_bytes, table->s->db_low_byte_first);
- const char *blob_ptr;
- if (!(blob_ptr = reader.read(data_len))) {
- blob->move_field_offset(-ptr_diff);
- return HA_ERR_INTERNAL_ERROR;
- }
-
if (decode) {
- // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
- // platforms)
- memset(blob->ptr + length_bytes, 0, 8);
- memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
- blob->move_field_offset(-ptr_diff);
+ field->set_notnull();
}
- } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
- Field_varstring *const field_var = (Field_varstring *)field;
- const char *data_len_str;
- if (!(data_len_str = reader.read(field_var->length_bytes)))
- return HA_ERR_INTERNAL_ERROR;
- uint data_len;
- /* field_var->length_bytes is 1 or 2 */
- if (field_var->length_bytes == 1) {
- data_len = (uchar)data_len_str[0];
+ if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
+ err = convert_blob_from_storage_format(
+ (my_core::Field_blob *) field, &reader, decode);
+ } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
+ err = convert_varchar_from_storage_format(
+ (my_core::Field_varstring *) field, &reader, decode);
} else {
- DBUG_ASSERT(field_var->length_bytes == 2);
- data_len = uint2korr(data_len_str);
- }
- if (data_len > field->field_length) {
- /* The data on disk is longer than table DDL allows? */
- return HA_ERR_INTERNAL_ERROR;
+ err = convert_field_from_storage_format(
+ field, &reader, decode, field_dec->m_pack_length_in_rec);
}
- if (!reader.read(data_len))
- return HA_ERR_INTERNAL_ERROR;
+ }
- if (decode) {
- memcpy(field_var->ptr + ptr_diff, data_len_str,
- field_var->length_bytes + data_len);
- }
- } else {
- const char *data_bytes;
- const uint len = field_dec->m_pack_length_in_rec;
- if (len > 0) {
- if ((data_bytes = reader.read(len)) == nullptr) {
- return HA_ERR_INTERNAL_ERROR;
- }
- if (decode)
- memcpy(field->ptr + ptr_diff, data_bytes, len);
- }
+ // Restore field->ptr and field->null_ptr
+ field->move_field(table->record[0] + field_offset,
+ maybe_null ? table->record[0] + null_offset : nullptr,
+ field->null_bit);
+
+ if (err != HA_EXIT_SUCCESS) {
+ return err;
}
}
@@ -4723,6 +4739,14 @@ bool ha_rocksdb::get_error_message(const int error, String *const buf) {
buf->append(tx->m_detailed_error);
temp_error = true;
break;
+ case HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED:
+ buf->append("Specifying DATA DIRECTORY for an individual table is not "
+ "supported.");
+ break;
+ case HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED:
+ buf->append("Specifying INDEX DIRECTORY for an individual table is not "
+ "supported.");
+ break;
default:
// We can be called with the values which are < HA_ERR_FIRST because most
// MySQL internal functions will just return HA_EXIT_FAILURE in case of
@@ -4791,7 +4815,7 @@ int ha_rocksdb::create_key_defs(
*/
if (create_cfs(table_arg, tbl_def_arg, &cfs)) {
DBUG_RETURN(HA_EXIT_FAILURE);
- };
+ }
if (!old_tbl_def_arg) {
/*
@@ -4845,6 +4869,7 @@ int ha_rocksdb::create_cfs(
DBUG_ASSERT(table_arg != nullptr);
DBUG_ASSERT(table_arg->s != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
char tablename_sys[NAME_LEN + 1];
@@ -4884,35 +4909,53 @@ int ha_rocksdb::create_cfs(
}
}
- /*
- index comment has Column Family name. If there was no comment, we get
- NULL, and it means use the default column family.
- */
- const char *const comment = get_key_comment(i, table_arg, tbl_def_arg);
+ // Internal consistency check to make sure that data in TABLE and
+ // Rdb_tbl_def structures matches. Either both are missing or both are
+ // specified. Yes, this is critical enough to make it into SHIP_ASSERT.
+ SHIP_ASSERT(!table_arg->part_info == tbl_def_arg->base_partition().empty());
+
+ // Generate the name for the column family to use.
+ bool per_part_match_found = false;
+ std::string cf_name = generate_cf_name(i, table_arg, tbl_def_arg,
+ &per_part_match_found);
+
const char *const key_name = get_key_name(i, table_arg, tbl_def_arg);
- if (looks_like_per_index_cf_typo(comment)) {
+ if (looks_like_per_index_cf_typo(cf_name.c_str())) {
my_error(ER_NOT_SUPPORTED_YET, MYF(0),
- "column family name looks like a typo of $per_index_cf");
+ "column family name looks like a typo of $per_index_cf.");
DBUG_RETURN(HA_EXIT_FAILURE);
}
- /* Prevent create from using the system column family */
- if (comment && strcmp(DEFAULT_SYSTEM_CF_NAME, comment) == 0) {
+
+ // Prevent create from using the system column family.
+ if (!cf_name.empty() && strcmp(DEFAULT_SYSTEM_CF_NAME,
+ cf_name.c_str()) == 0) {
my_error(ER_WRONG_ARGUMENTS, MYF(0),
- "column family not valid for storing index data");
+ "column family not valid for storing index data.");
DBUG_RETURN(HA_EXIT_FAILURE);
}
+
bool is_auto_cf_flag;
+
+ // Here's how `get_or_create_cf` will use the input parameters:
+ //
+ // `cf_name` - will be used as a CF name.
+ // `key_name` - will be only used in case of "$per_index_cf".
cf_handle =
- cf_manager.get_or_create_cf(rdb, comment, tbl_def_arg->full_tablename(),
- key_name, &is_auto_cf_flag);
- if (!cf_handle)
+ cf_manager.get_or_create_cf(rdb, cf_name.c_str(),
+ tbl_def_arg->full_tablename(), key_name,
+ &is_auto_cf_flag);
+
+ if (!cf_handle) {
DBUG_RETURN(HA_EXIT_FAILURE);
+ }
auto &cf = (*cfs)[i];
+
cf.cf_handle = cf_handle;
- cf.is_reverse_cf = Rdb_cf_manager::is_cf_name_reverse(comment);
+ cf.is_reverse_cf = Rdb_cf_manager::is_cf_name_reverse(cf_name.c_str());
cf.is_auto_cf = is_auto_cf_flag;
+ cf.is_per_partition_cf = per_part_match_found;
}
DBUG_RETURN(HA_EXIT_SUCCESS);
@@ -4982,7 +5025,8 @@ int ha_rocksdb::create_inplace_key_defs(
new_key_descr[i] = std::make_shared<Rdb_key_def>(
okd.get_index_number(), i, okd.get_cf(), index_dict_version,
index_type, kv_version, okd.m_is_reverse_cf, okd.m_is_auto_cf,
- okd.m_name.c_str(), dict_manager.get_stats(gl_index_id));
+ okd.m_is_per_partition_cf, okd.m_name.c_str(),
+ dict_manager.get_stats(gl_index_id));
} else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i],
cfs[i])) {
DBUG_RETURN(HA_EXIT_FAILURE);
@@ -5118,7 +5162,8 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i,
const char *const key_name = get_key_name(i, table_arg, m_tbl_def);
*new_key_def = std::make_shared<Rdb_key_def>(
index_id, i, cf_info.cf_handle, index_dict_version, index_type,
- kv_version, cf_info.is_reverse_cf, cf_info.is_auto_cf, key_name);
+ kv_version, cf_info.is_reverse_cf, cf_info.is_auto_cf,
+ cf_info.is_per_partition_cf, key_name);
DBUG_RETURN(HA_EXIT_SUCCESS);
}
@@ -5271,6 +5316,22 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
HA_CREATE_INFO *const create_info) {
DBUG_ENTER_FUNC();
+ DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(create_info != nullptr);
+
+ if (create_info->data_file_name) {
+ // DATA DIRECTORY is used to create tables under a specific location
+ // outside the MySQL data directory. We don't support this for MyRocks.
+ // The `rocksdb_datadir` setting should be used to configure RocksDB data
+ // directory.
+ DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED);
+ }
+
+ if (create_info->index_file_name) {
+ // Similar check for INDEX DIRECTORY as well.
+ DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED);
+ }
+
int res;
std::string str;
@@ -5441,15 +5502,7 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd,
index_tuple= lookup_tuple.
lookup_tuple may be a prefix of the index.
*/
- if (kd.m_is_reverse_cf) {
- if (!full_key_match) {
- if (!iter->Valid())
- iter->SeekToLast();
- else
- iter->Prev();
- }
- }
-
+ rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice);
if (!iter->Valid() || !kd.value_matches_prefix(iter->key(), key_slice)) {
/*
Got a record that is not equal to the lookup value, or even a record
@@ -5467,24 +5520,20 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd,
We are looking for record with the biggest t.key such that
t.key < lookup_tuple.
*/
- if (kd.m_is_reverse_cf) {
- if (m_scan_it->Valid() && full_key_match &&
- kd.value_matches_prefix(m_scan_it->key(), key_slice)) {
- /* We are using full key and we've hit an exact match */
+ rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice);
+ if (m_scan_it->Valid() && full_key_match &&
+ kd.value_matches_prefix(m_scan_it->key(), key_slice)) {
+ /* We are using full key and we've hit an exact match */
+ if (kd.m_is_reverse_cf) {
m_scan_it->Next();
- }
- } else {
- if (m_scan_it->Valid())
+ } else {
m_scan_it->Prev();
- else
- m_scan_it->SeekToLast();
+ }
}
-
return m_scan_it->Valid() ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND;
}
int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
- const bool &full_key_match,
const rocksdb::Slice &key_slice) {
/*
We are looking for the first record such that
@@ -5494,22 +5543,7 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
with HA_READ_AFTER_KEY, $GT = '>',
with HA_READ_KEY_OR_NEXT, $GT = '>='
*/
- if (kd.m_is_reverse_cf) {
- if (!m_scan_it->Valid()) {
- m_scan_it->SeekToLast();
- } else {
- /*
- We should step back
- - when not using full extended key
- - when using full extended key and when we've got an exact match
- */
- if (!full_key_match ||
- !kd.value_matches_prefix(m_scan_it->key(), key_slice)) {
- m_scan_it->Prev();
- }
- }
- }
-
+ rocksdb_smart_seek(kd.m_is_reverse_cf, m_scan_it, key_slice);
return m_scan_it->Valid() ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND;
}
@@ -5538,7 +5572,7 @@ int ha_rocksdb::position_to_correct_key(const Rdb_key_def &kd,
break;
case HA_READ_AFTER_KEY:
case HA_READ_KEY_OR_NEXT:
- rc = read_after_key(kd, full_key_match, key_slice);
+ rc = read_after_key(kd, key_slice);
if (rc == 0 && !kd.covers_key(m_scan_it->key())) {
/* The record we've got is not from this index */
rc = HA_ERR_KEY_NOT_FOUND;
@@ -5905,8 +5939,10 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
*/
const uint size = kd.pack_index_tuple(table, m_pack_buffer,
m_pk_packed_tuple, key, keypart_map);
- rc = get_row_by_rowid(buf, m_pk_packed_tuple, size);
- if (!rc) {
+ bool skip_lookup = is_blind_delete_enabled();
+ rc = get_row_by_rowid(buf, m_pk_packed_tuple, size,
+ skip_lookup);
+ if (!rc && !skip_lookup) {
stats.rows_read++;
stats.rows_index_first++;
update_row_stats(ROWS_READ);
@@ -6339,6 +6375,17 @@ rocksdb::Status ha_rocksdb::get_for_update(
return s;
}
+bool ha_rocksdb::is_blind_delete_enabled()
+{
+ THD *thd = ha_thd();
+ return (THDVAR(thd, blind_delete_primary_key) &&
+ thd->lex->sql_command == SQLCOM_DELETE &&
+ thd->lex->table_count == 1 &&
+ table->s->keys == 1 &&
+ !has_hidden_pk(table) &&
+ !thd->rli_slave);
+}
+
/*
Given a rowid (i.e. packed PK) as a parameter, get the record.
@@ -6348,7 +6395,8 @@ rocksdb::Status ha_rocksdb::get_for_update(
*/
int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
- const uint rowid_size) {
+ const uint rowid_size,
+ const bool skip_lookup) {
DBUG_ENTER_FUNC();
DBUG_ASSERT(buf != nullptr);
@@ -6374,6 +6422,16 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
bool found;
rocksdb::Status s;
+ /* Pretend row found without looking up */
+ if (skip_lookup)
+ {
+ stats.rows_deleted_blind++;
+ update_row_stats(ROWS_DELETED_BLIND);
+ m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin);
+ table->status = 0;
+ DBUG_RETURN(0);
+ }
+
if (m_lock_rows == RDB_LOCK_NONE) {
tx->acquire_snapshot(true);
s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record);
@@ -6554,6 +6612,7 @@ int ha_rocksdb::index_first_intern(uchar *const buf) {
for (;;) {
setup_scan_iterator(kd, &index_key, false, !kd.m_is_reverse_cf,
Rdb_key_def::INDEX_NUMBER_SIZE);
+ m_scan_it->Seek(index_key);
m_skip_scan_it_next_call = true;
rc = index_next_with_direction(buf, true);
@@ -6649,21 +6708,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) {
for (;;) {
setup_scan_iterator(kd, &index_key, false, kd.m_is_reverse_cf,
Rdb_key_def::INDEX_NUMBER_SIZE);
-
- if (!m_scan_it->Valid()) {
- /*
- RocksDB keyspace has no records that follow our index. It could be, our
- index is the last in the keyspace. Navigate to last record in the
- keyspace.
- */
- m_scan_it->SeekToLast();
- } else {
- /*
- Iterator is pointing at some record right after the end of the index.
- Move one record back.
- */
- m_scan_it->Prev();
- }
+ m_scan_it->SeekForPrev(index_key);
m_skip_scan_it_next_call = false;
if (is_pk(active_index, table, m_tbl_def)) {
@@ -6726,11 +6771,15 @@ bool ha_rocksdb::skip_unique_check() const {
2) this table is in the whitelist of tables to skip and the replication
lag has reached a large enough value (see unique_check_lag_threshold
and unique_check_lage_reset_threshold)
- 3) the user set unique_checks option to 0
+ 3) the user set unique_checks option to 0, and the table does not have
+ any indexes. If the table has secondary keys, then those might becomes
+ inconsisted/corrupted
*/
return THDVAR(table->in_use, bulk_load) ||
(m_force_skip_unique_check && m_skip_unique_check) ||
- my_core::thd_test_options(table->in_use, OPTION_RELAXED_UNIQUE_CHECKS);
+ (my_core::thd_test_options(table->in_use,
+ OPTION_RELAXED_UNIQUE_CHECKS) &&
+ m_tbl_def->m_key_count == 1);
}
void ha_rocksdb::set_force_skip_unique_check(bool skip) {
@@ -6805,15 +6854,31 @@ bool ha_rocksdb::is_pk(const uint index, const TABLE *const table_arg,
is_hidden_pk(index, table_arg, tbl_def_arg);
}
+/*
+ Formats the string and returns the column family name assignment part for a
+ specific partition.
+*/
+const std::string ha_rocksdb::gen_cf_name_qualifier_for_partition(
+ const std::string& prefix) {
+ DBUG_ASSERT(!prefix.empty());
+
+ return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER
+ + RDB_PER_PARTITION_QUALIFIER_VALUE_SEP;
+}
+
const char *ha_rocksdb::get_key_name(const uint index,
const TABLE *const table_arg,
const Rdb_tbl_def *const tbl_def_arg) {
DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
if (is_hidden_pk(index, table_arg, tbl_def_arg)) {
return HIDDEN_PK_NAME;
}
+ DBUG_ASSERT(table_arg->key_info != nullptr);
+ DBUG_ASSERT(table_arg->key_info[index].name != nullptr);
+
return table_arg->key_info[index].name;
}
@@ -6821,14 +6886,84 @@ const char *ha_rocksdb::get_key_comment(const uint index,
const TABLE *const table_arg,
const Rdb_tbl_def *const tbl_def_arg) {
DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
if (is_hidden_pk(index, table_arg, tbl_def_arg)) {
return nullptr;
}
+ DBUG_ASSERT(table_arg->key_info != nullptr);
+
return table_arg->key_info[index].comment.str;
}
+const std::string ha_rocksdb::generate_cf_name(const uint index,
+ const TABLE *const table_arg,
+ const Rdb_tbl_def *const tbl_def_arg,
+ bool *per_part_match_found) {
+ DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(tbl_def_arg != nullptr);
+ DBUG_ASSERT(per_part_match_found != nullptr);
+
+ // When creating CF-s the caller needs to know if there was a custom CF name
+ // specified for a given paritition.
+ *per_part_match_found = false;
+
+ // Index comment is used to define the column family name specification(s).
+ // If there was no comment, we get an emptry string, and it means "use the
+ // default column family".
+ const char *const comment = get_key_comment(index, table_arg, tbl_def_arg);
+
+ // `get_key_comment` can return `nullptr`, that's why this.
+ std::string key_comment = comment ? comment : "";
+
+ // If table has partitions then we need to check if user has requested to
+ // create a column family with a specific name on a per partition basis.
+ if (table_arg->part_info != nullptr) {
+ std::string partition_name = tbl_def_arg->base_partition();
+ DBUG_ASSERT(!partition_name.empty());
+
+ // Let's fetch the comment for a index and check if there's a custom key
+ // name specified for a partition we are handling.
+ std::vector<std::string> v = myrocks::parse_into_tokens(key_comment,
+ RDB_QUALIFIER_SEP);
+ std::string part_to_search = gen_cf_name_qualifier_for_partition(
+ partition_name);
+ DBUG_ASSERT(!part_to_search.empty());
+
+ // Basic O(N) search for a matching assignment. At most we expect maybe
+ // ten or so elements here.
+ for (const auto &it : v) {
+ if (it.substr(0, part_to_search.length()) == part_to_search) {
+ // We found a prefix match. Try to parse it as an assignment.
+ std::vector<std::string> tokens = myrocks::parse_into_tokens(it,
+ RDB_PER_PARTITION_QUALIFIER_VALUE_SEP);
+
+ // We found a custom name, it was in the form we expected it to be.
+ // Return that instead of whatever we initially wanted to return. In
+ // a case below the `foo` part will be returned to the caller.
+ //
+ // p3_cfname=foo
+ //
+ // If no value was specified then we'll return an empty string which
+ // later gets translated into using a default CF.
+ if (tokens.size() == 2) {
+ *per_part_match_found = true;
+ return tokens[1];
+ } else {
+ return "";
+ }
+ }
+ }
+
+ // At this point we tried to search for a custom CF name for a partition,
+ // but none was specified. Therefore default one will be used.
+ return "";
+ }
+
+ return key_comment;
+}
+
int ha_rocksdb::write_row(uchar *const buf) {
DBUG_ENTER_FUNC();
@@ -7188,7 +7323,7 @@ int ha_rocksdb::finalize_bulk_load() {
return rc;
}
- mysql_mutex_lock(&m_bulk_load_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_bulk_load_mutex);
/*
We need this check because it's possible that m_sst_info has been
@@ -7203,7 +7338,8 @@ int ha_rocksdb::finalize_bulk_load() {
mysql prints via my_printf_error.
*/
sql_print_error("Failed to commit bulk loaded sst file to the "
- "data store (%s)", m_sst_info->error_message().c_str());
+ "data store (%s)",
+ m_sst_info->error_message().c_str());
my_printf_error(ER_UNKNOWN_ERROR,
"Failed to commit bulk loaded sst file to the "
@@ -7217,7 +7353,8 @@ int ha_rocksdb::finalize_bulk_load() {
m_bulk_load_tx = nullptr;
}
- mysql_mutex_unlock(&m_bulk_load_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_bulk_load_mutex);
+
return rc;
}
@@ -7462,7 +7599,7 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
}
/*
- Open a cursor and position it at the passed record
+ Open a cursor
*/
void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd,
@@ -7522,12 +7659,6 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd,
}
m_scan_it_skips_bloom = skip_bloom;
}
- /*
- Seek() will "Position at the first key in the source that at or past
- target".
- The operation cannot fail.
- */
- m_scan_it->Seek(*slice);
}
void ha_rocksdb::release_scan_iterator() {
@@ -7551,6 +7682,7 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() {
rocksdb::Slice table_key((const char *)m_pk_packed_tuple, key_size);
setup_scan_iterator(*m_pk_descr, &table_key);
+ m_scan_it->Seek(table_key);
m_skip_scan_it_next_call = true;
}
@@ -7859,20 +7991,33 @@ int ha_rocksdb::info(uint flag) {
update_stats();
}
- if (stats.records == 0) {
- // most likely, the table is in memtable
- // try to deduce from GetApproximateSizes
+ // if number of records is hardcoded, we do not want to force computation
+ // of memtable cardinalities
+ if (stats.records == 0 ||
+ (rocksdb_force_compute_memtable_stats &&
+ rocksdb_debug_optimizer_n_rows == 0))
+ {
+ // First, compute SST files stats
uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2];
auto r = get_range(pk_index(table, m_tbl_def), buf);
uint64_t sz = 0;
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz, true);
-#pragma GCC diagnostic pop
+ uint8_t include_flags = rocksdb::DB::INCLUDE_FILES;
+ // recompute SST files stats only if records count is 0
+ if (stats.records == 0) {
+ rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz,
+ include_flags);
+ stats.records+= sz/ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE;
+ stats.data_file_length+= sz;
+ }
- stats.records = sz / ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE;
- stats.data_file_length = sz;
+ // Second, compute memtable stats
+ uint64_t memtableCount;
+ uint64_t memtableSize;
+ rdb->GetApproximateMemTableStats(m_pk_descr->get_cf(), r,
+ &memtableCount, &memtableSize);
+ stats.records += memtableCount;
+ stats.data_file_length += memtableSize;
if (rocksdb_debug_optimizer_n_rows > 0)
stats.records = rocksdb_debug_optimizer_n_rows;
@@ -8301,12 +8446,36 @@ ha_rocksdb::get_range(const int &i,
return myrocks::get_range(*m_key_descr_arr[i], buf);
}
+static bool is_myrocks_index_empty(
+ rocksdb::ColumnFamilyHandle *cfh, const bool is_reverse_cf,
+ const rocksdb::ReadOptions &read_opts,
+ const uint index_id)
+{
+ bool index_removed = false;
+ uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0};
+ rdb_netbuf_store_uint32(key_buf, index_id);
+ const rocksdb::Slice key =
+ rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
+ std::unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(read_opts, cfh));
+ rocksdb_smart_seek(is_reverse_cf, it.get(), key);
+ if (!it->Valid()) {
+ index_removed = true;
+ } else {
+ if (memcmp(it->key().data(), key_buf,
+ Rdb_key_def::INDEX_NUMBER_SIZE)) {
+ // Key does not have same prefix
+ index_removed = true;
+ }
+ }
+ return index_removed;
+}
+
/*
Drop index thread's main logic
*/
void Rdb_drop_index_thread::run() {
- mysql_mutex_lock(&m_signal_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_signal_mutex);
for (;;) {
// The stop flag might be set by shutdown command
@@ -8331,7 +8500,7 @@ void Rdb_drop_index_thread::run() {
}
// make sure, no program error is returned
DBUG_ASSERT(ret == 0 || ret == ETIMEDOUT);
- mysql_mutex_unlock(&m_signal_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex);
std::unordered_set<GL_INDEX_ID> indices;
dict_manager.get_ongoing_drop_indexes(&indices);
@@ -8353,11 +8522,11 @@ void Rdb_drop_index_thread::run() {
DBUG_ASSERT(cfh);
const bool is_reverse_cf = cf_flags & Rdb_key_def::REVERSE_CF_FLAG;
- bool index_removed = false;
- uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0};
- rdb_netbuf_store_uint32(key_buf, d.index_id);
- const rocksdb::Slice key =
- rocksdb::Slice((char *)key_buf, sizeof(key_buf));
+ if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id))
+ {
+ finished.insert(d);
+ continue;
+ }
uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2];
rocksdb::Range range = get_range(d.index_id, buf, is_reverse_cf ? 1 : 0,
is_reverse_cf ? 0 : 1);
@@ -8381,25 +8550,8 @@ void Rdb_drop_index_thread::run() {
}
rdb_handle_io_error(status, RDB_IO_ERROR_BG_THREAD);
}
- std::unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(read_opts, cfh));
- it->Seek(key);
- if (is_reverse_cf) {
- if (!it->Valid()) {
- it->SeekToLast();
- } else {
- it->Prev();
- }
- }
- if (!it->Valid()) {
- index_removed = true;
- } else {
- if (memcmp(it->key().data(), key_buf,
- Rdb_key_def::INDEX_NUMBER_SIZE)) {
- // Key does not have same prefix
- index_removed = true;
- }
- }
- if (index_removed) {
+ if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id))
+ {
finished.insert(d);
}
}
@@ -8408,10 +8560,10 @@ void Rdb_drop_index_thread::run() {
dict_manager.finish_drop_indexes(finished);
}
}
- mysql_mutex_lock(&m_signal_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_signal_mutex);
}
- mysql_mutex_unlock(&m_signal_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex);
}
Rdb_tbl_def *ha_rocksdb::get_table_if_exists(const char *const tablename) {
@@ -8683,10 +8835,12 @@ ha_rows ha_rocksdb::records_in_range(uint inx, key_range *const min_key,
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
// Getting statistics, including from Memtables
- rdb->GetApproximateSizes(kd.get_cf(), &r, 1, &sz, true);
-#pragma GCC diagnostic pop
-
+ uint8_t include_flags = rocksdb::DB::INCLUDE_FILES;
+ rdb->GetApproximateSizes(kd.get_cf(), &r, 1, &sz, include_flags);
ret = rows * sz / disk_size;
+ uint64_t memTableCount;
+ rdb->GetApproximateMemTableStats(kd.get_cf(), r, &memTableCount, &sz);
+ ret += memTableCount;
/*
GetApproximateSizes() gives estimates so ret might exceed stats.records.
@@ -8764,6 +8918,7 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd,
std::unordered_map<rocksdb::ColumnFamilyHandle *, std::vector<rocksdb::Range>>
ranges;
std::unordered_set<GL_INDEX_ID> ids_to_check;
+ std::unordered_map<GL_INDEX_ID, uint> ids_to_keyparts;
std::vector<uchar> buf(table_arg->s->keys * 2 *
Rdb_key_def::INDEX_NUMBER_SIZE);
for (uint i = 0; i < table_arg->s->keys; i++) {
@@ -8771,6 +8926,7 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd,
const Rdb_key_def &kd = *m_key_descr_arr[i];
ranges[kd.get_cf()].push_back(get_range(i, bufp));
ids_to_check.insert(kd.get_gl_index_id());
+ ids_to_keyparts[kd.get_gl_index_id()] = kd.get_key_parts();
}
// for analyze statements, force flush on memtable to get accurate cardinality
@@ -8800,6 +8956,8 @@ int ha_rocksdb::calculate_stats(const TABLE *const table_arg, THD *const thd,
// Initialize the stats to 0. If there are no files that contain
// this gl_index_id, then 0 should be stored for the cached stats.
stats[it] = Rdb_index_stats(it);
+ DBUG_ASSERT(ids_to_keyparts.count(it) > 0);
+ stats[it].m_distinct_keys_per_prefix.resize(ids_to_keyparts[it]);
}
for (const auto &it : props) {
std::vector<Rdb_index_stats> sst_stats;
@@ -9271,6 +9429,15 @@ int ha_rocksdb::inplace_populate_sk(
dict_manager.add_create_index(create_index_ids, batch);
dict_manager.commit(batch);
+ /*
+ Add uncommitted key definitons to ddl_manager. We need to do this
+ so that the property collector can find this keydef when it needs to
+ update stats. The property collector looks for the keydef in the
+ data dictionary, but it won't be there yet since this key definition
+ is still in the creation process.
+ */
+ ddl_manager.add_uncommitted_keydefs(indexes);
+
const bool hidden_pk_exists = has_hidden_pk(table);
int res = 0;
@@ -9489,6 +9656,8 @@ bool ha_rocksdb::commit_inplace_alter_table(
Rdb_inplace_alter_ctx *const ctx0 =
static_cast<Rdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
+ DEBUG_SYNC(ha_thd(), "rocksdb.commit_in_place_alter_table");
+
/*
IMPORTANT: When rollback is requested, mysql will abort with
an assertion failure. That means every failed commit during inplace alter
@@ -9523,6 +9692,9 @@ bool ha_rocksdb::commit_inplace_alter_table(
delete ctx0->m_new_tdef;
}
+ /* Remove uncommitted key definitons from ddl_manager */
+ ddl_manager.remove_uncommitted_keydefs(ctx0->m_added_indexes);
+
/* Rollback any partially created indexes */
dict_manager.rollback_ongoing_index_creation();
@@ -9582,6 +9754,12 @@ bool ha_rocksdb::commit_inplace_alter_table(
*/
DBUG_ASSERT(0);
}
+
+ /*
+ Remove uncommitted key definitons from ddl_manager, as they are now
+ committed into the data dictionary.
+ */
+ ddl_manager.remove_uncommitted_keydefs(ctx->m_added_indexes);
}
if (dict_manager.commit(batch)) {
@@ -9750,6 +9928,7 @@ static void myrocks_update_status() {
export_stats.rows_inserted = global_stats.rows[ROWS_INSERTED];
export_stats.rows_read = global_stats.rows[ROWS_READ];
export_stats.rows_updated = global_stats.rows[ROWS_UPDATED];
+ export_stats.rows_deleted_blind = global_stats.rows[ROWS_DELETED_BLIND];
export_stats.system_rows_deleted = global_stats.system_rows[ROWS_DELETED];
export_stats.system_rows_inserted = global_stats.system_rows[ROWS_INSERTED];
@@ -9765,6 +9944,8 @@ static SHOW_VAR myrocks_status_variables[] = {
DEF_STATUS_VAR_FUNC("rows_read", &export_stats.rows_read, SHOW_LONGLONG),
DEF_STATUS_VAR_FUNC("rows_updated", &export_stats.rows_updated,
SHOW_LONGLONG),
+ DEF_STATUS_VAR_FUNC("rows_deleted_blind",
+ &export_stats.rows_deleted_blind, SHOW_LONGLONG),
DEF_STATUS_VAR_FUNC("system_rows_deleted",
&export_stats.system_rows_deleted, SHOW_LONGLONG),
DEF_STATUS_VAR_FUNC("system_rows_inserted",
@@ -9870,7 +10051,7 @@ void Rdb_background_thread::run() {
// Wait until the next timeout or until we receive a signal to stop the
// thread. Request to stop the thread should only be triggered when the
// storage engine is being unloaded.
- mysql_mutex_lock(&m_signal_mutex);
+ RDB_MUTEX_LOCK_CHECK(m_signal_mutex);
const auto ret MY_ATTRIBUTE((__unused__)) =
mysql_cond_timedwait(&m_signal_cond, &m_signal_mutex, &ts_next_sync);
@@ -9879,7 +10060,7 @@ void Rdb_background_thread::run() {
const bool local_stop = m_stop;
const bool local_save_stats = m_save_stats;
reset();
- mysql_mutex_unlock(&m_signal_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(m_signal_mutex);
if (local_stop) {
// If we're here then that's because condition variable was signaled by
@@ -9963,11 +10144,8 @@ bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
shorter require all parts of the key to be available
for the short key match.
*/
- if (use_all_keys && prefix_extractor->InRange(eq_cond))
- can_use = true;
- else if (!is_ascending)
- can_use = false;
- else if (prefix_extractor->SameResultWhenAppended(eq_cond))
+ if ((use_all_keys && prefix_extractor->InRange(eq_cond))
+ || prefix_extractor->SameResultWhenAppended(eq_cond))
can_use = true;
else
can_use = false;
@@ -10138,7 +10316,7 @@ void rocksdb_set_table_stats_sampling_pct(
my_core::THD *const thd MY_ATTRIBUTE((__unused__)),
my_core::st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)),
void *const var_ptr MY_ATTRIBUTE((__unused__)), const void *const save) {
- mysql_mutex_lock(&rdb_sysvars_mutex);
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
const uint32_t new_val = *static_cast<const uint32_t *>(save);
@@ -10151,7 +10329,7 @@ void rocksdb_set_table_stats_sampling_pct(
}
}
- mysql_mutex_unlock(&rdb_sysvars_mutex);
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
/*
@@ -10185,6 +10363,15 @@ void rocksdb_set_rate_limiter_bytes_per_sec(
}
}
+void rocksdb_set_delayed_write_rate(THD *thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save) {
+ const uint64_t new_val = *static_cast<const uint64_t *>(save);
+ if (rocksdb_delayed_write_rate != new_val) {
+ rocksdb_delayed_write_rate = new_val;
+ rocksdb_db_options.delayed_write_rate = new_val;
+ }
+}
+
void rdb_set_collation_exception_list(const char *const exception_list) {
DBUG_ASSERT(rdb_collation_exceptions != nullptr);
@@ -10200,7 +10387,7 @@ void rocksdb_set_collation_exception_list(THD *const thd,
const void *const save) {
const char *const val = *static_cast<const char *const *>(save);
- rdb_set_collation_exception_list(val);
+ rdb_set_collation_exception_list(val == nullptr ? "" : val);
*static_cast<const char **>(var_ptr) = val;
}
@@ -10229,13 +10416,15 @@ static void rocksdb_set_max_background_compactions(
const void *const save) {
DBUG_ASSERT(save != nullptr);
- mysql_mutex_lock(&rdb_sysvars_mutex);
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
+
rocksdb_db_options.max_background_compactions =
*static_cast<const int *>(save);
rocksdb_db_options.env->SetBackgroundThreads(
rocksdb_db_options.max_background_compactions,
rocksdb::Env::Priority::LOW);
- mysql_mutex_unlock(&rdb_sysvars_mutex);
+
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
void rdb_queue_save_stats_request() { rdb_bg_thread.request_save_stats(); }