summaryrefslogtreecommitdiff
path: root/storage/rocksdb/ha_rocksdb.cc
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2019-06-16 00:28:33 +0300
committerSergei Petrunia <psergey@askmonty.org>2019-06-16 00:28:33 +0300
commit9ab0d7b4e9c48e881157728a4688c0bea9256a32 (patch)
tree2376691c32692cbef0648e9d3deb31bac6e935c3 /storage/rocksdb/ha_rocksdb.cc
parent91be2212c69d727d0b104f70cb32c4639397338c (diff)
parent5173e396ffce706954f7302e9854126aadab4c54 (diff)
downloadmariadb-git-9ab0d7b4e9c48e881157728a4688c0bea9256a32.tar.gz
Merge from MyRocks upstream:
Copy of commit dcd9379eb5707bc7514a2ff4d9127790356505cb Author: Manuel Ung <mung@fb.com> Date: Fri Jun 14 10:38:17 2019 -0700 Skip valgrind for rocksdb.force_shutdown Summary: This test does unclean shutdown, and leaks memory. Squash with: D15749084 Reviewed By: hermanlee Differential Revision: D15828957 fbshipit-source-id: 30541455d74
Diffstat (limited to 'storage/rocksdb/ha_rocksdb.cc')
-rw-r--r--storage/rocksdb/ha_rocksdb.cc3929
1 files changed, 2117 insertions, 1812 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index cb289fb0077..3f1db15a426 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -15,7 +15,7 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
#ifdef USE_PRAGMA_IMPLEMENTATION
-#pragma implementation // gcc: Class implementation
+#pragma implementation // gcc: Class implementation
#endif
#define MYSQL_SERVER 1
@@ -31,8 +31,8 @@
#include "./ha_rocksdb.h"
/* C++ standard header files */
-#include <algorithm>
#include <inttypes.h>
+#include <algorithm>
#include <limits>
#include <map>
#include <queue>
@@ -41,6 +41,9 @@
#include <vector>
/* MySQL includes */
+#include <mysql/psi/mysql_table.h>
+// #include <thread_pool_priv.h>
+#include <mysys_err.h>
#include "./debug_sync.h"
#include "./my_bit.h"
#include "./my_stacktrace.h"
@@ -48,20 +51,14 @@
#include "./sql_audit.h"
#include "./sql_table.h"
#include "./sql_hset.h"
-#include <mysql/psi/mysql_table.h>
#ifdef MARIAROCKS_NOT_YET
-#include <mysql/thread_pool_priv.h>
#endif
-#include <mysys_err.h>
-
-// Both MySQL and RocksDB define the same constant. To avoid compilation errors
-// till we make the fix in RocksDB, we'll temporary undefine it here.
-#undef CACHE_LINE_SIZE
/* RocksDB includes */
#include "monitoring/histogram.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
+#include "rocksdb/memory_allocator.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/slice_transform.h"
@@ -70,6 +67,7 @@
#include "rocksdb/utilities/convenience.h"
#include "rocksdb/utilities/memory_util.h"
#include "rocksdb/utilities/sim_cache.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/stop_watch.h"
#include "./rdb_source_revision.h"
@@ -77,8 +75,10 @@
#include "./event_listener.h"
#include "./ha_rocksdb_proto.h"
#include "./logger.h"
+#include "./nosql_access.h"
#include "./rdb_cf_manager.h"
#include "./rdb_cf_options.h"
+#include "./rdb_converter.h"
#include "./rdb_datadic.h"
#include "./rdb_i_s.h"
#include "./rdb_index_merge.h"
@@ -100,18 +100,19 @@ void thd_mark_transaction_to_rollback(MYSQL_THD thd, bool all);
* Get the user thread's binary logging format
* @param thd user thread
* @return Value to be used as index into the binlog_format_names array
-*/
+ */
int thd_binlog_format(const MYSQL_THD thd);
/**
* Check if binary logging is filtered for thread's current db.
* @param thd Thread handle
* @retval 1 the query is not filtered, 0 otherwise.
-*/
+ */
bool thd_binlog_filter_ok(const MYSQL_THD thd);
}
MYSQL_PLUGIN_IMPORT bool my_disable_leak_check;
+extern my_bool opt_core_file;
// Needed in rocksdb_init_func
void ignore_db_dirs_append(const char *dirname_arg);
@@ -128,22 +129,14 @@ const std::string DEFAULT_CF_NAME("default");
const std::string DEFAULT_SYSTEM_CF_NAME("__system__");
const std::string PER_INDEX_CF_NAME("$per_index_cf");
-class Rdb_explicit_snapshot;
-
-std::mutex explicit_snapshot_mutex;
-ulonglong explicit_snapshot_counter = 0;
-std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>>
- explicit_snapshots;
static std::vector<GL_INDEX_ID> rdb_indexes_to_recalc;
#ifdef MARIADB_NOT_YET
class Rdb_explicit_snapshot : public explicit_snapshot {
- std::unique_ptr<rocksdb::ManagedSnapshot> snapshot;
-
public:
- static std::shared_ptr<Rdb_explicit_snapshot>
- create(snapshot_info_st *ss_info, rocksdb::DB *db,
- const rocksdb::Snapshot *snapshot) {
+ static std::shared_ptr<Rdb_explicit_snapshot> create(
+ snapshot_info_st *ss_info, rocksdb::DB *db,
+ const rocksdb::Snapshot *snapshot) {
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
auto s = std::unique_ptr<rocksdb::ManagedSnapshot>(
new rocksdb::ManagedSnapshot(db, snapshot));
@@ -159,8 +152,24 @@ class Rdb_explicit_snapshot : public explicit_snapshot {
return ret;
}
- static std::shared_ptr<Rdb_explicit_snapshot>
- get(const ulonglong snapshot_id) {
+ static std::string dump_snapshots() {
+ std::string str;
+ std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
+ for (const auto &elem : explicit_snapshots) {
+ const auto &ss = elem.second.lock();
+ DBUG_ASSERT(ss != nullptr);
+ const auto &info = ss->ss_info;
+ str += "\nSnapshot ID: " + std::to_string(info.snapshot_id) +
+ "\nBinlog File: " + info.binlog_file +
+ "\nBinlog Pos: " + std::to_string(info.binlog_pos) +
+ "\nGtid Executed: " + info.gtid_executed + "\n";
+ }
+
+ return str;
+ }
+
+ static std::shared_ptr<Rdb_explicit_snapshot> get(
+ const ulonglong snapshot_id) {
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
auto elem = explicit_snapshots.find(snapshot_id);
if (elem == explicit_snapshots.end()) {
@@ -172,14 +181,27 @@ class Rdb_explicit_snapshot : public explicit_snapshot {
rocksdb::ManagedSnapshot *get_snapshot() { return snapshot.get(); }
Rdb_explicit_snapshot(snapshot_info_st ss_info,
- std::unique_ptr<rocksdb::ManagedSnapshot> snapshot)
+ std::unique_ptr<rocksdb::ManagedSnapshot> &&snapshot)
: explicit_snapshot(ss_info), snapshot(std::move(snapshot)) {}
virtual ~Rdb_explicit_snapshot() {
std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
explicit_snapshots.erase(ss_info.snapshot_id);
}
+
+ private:
+ std::unique_ptr<rocksdb::ManagedSnapshot> snapshot;
+
+ static std::mutex explicit_snapshot_mutex;
+ static ulonglong explicit_snapshot_counter;
+ static std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>>
+ explicit_snapshots;
};
+
+std::mutex Rdb_explicit_snapshot::explicit_snapshot_mutex;
+ulonglong Rdb_explicit_snapshot::explicit_snapshot_counter = 0;
+std::unordered_map<ulonglong, std::weak_ptr<Rdb_explicit_snapshot>>
+ Rdb_explicit_snapshot::explicit_snapshots;
#endif
/**
@@ -188,10 +210,11 @@ class Rdb_explicit_snapshot : public explicit_snapshot {
void ha_rocksdb::update_row_stats(const operation_type &type) {
DBUG_ASSERT(type < ROWS_MAX);
// Find if we are modifying system databases.
- if (table->s && m_tbl_def->m_is_mysql_system_table)
+ if (table->s && m_tbl_def->m_is_mysql_system_table) {
global_stats.system_rows[type].inc();
- else
+ } else {
global_stats.rows[type].inc();
+ }
}
void dbug_dump_database(rocksdb::DB *db);
@@ -199,8 +222,8 @@ static handler *rocksdb_create_handler(my_core::handlerton *hton,
my_core::TABLE_SHARE *table_arg,
my_core::MEM_ROOT *mem_root);
-static rocksdb::CompactRangeOptions
-getCompactRangeOptions(int concurrency = 0) {
+static rocksdb::CompactRangeOptions getCompactRangeOptions(
+ int concurrency = 0) {
rocksdb::CompactRangeOptions compact_range_options;
compact_range_options.bottommost_level_compaction =
rocksdb::BottommostLevelCompaction::kForce;
@@ -261,37 +284,77 @@ static void rocksdb_flush_all_memtables() {
}
}
+static void rocksdb_delete_column_family_stub(
+ THD *const /* thd */, struct st_mysql_sys_var *const /* var */,
+ void *const /* var_ptr */, const void *const /* save */) {}
+
+static int rocksdb_delete_column_family(
+ THD *const /* thd */, struct st_mysql_sys_var *const /* var */,
+ void *const /* var_ptr */, struct st_mysql_value *const value) {
+ // Return failure for now until the race condition between creating
+ // CF and deleting CF is resolved
+ return HA_EXIT_FAILURE;
+
+ char buff[STRING_BUFFER_USUAL_SIZE];
+ int len = sizeof(buff);
+
+ DBUG_ASSERT(value != nullptr);
+
+ if (const char *const cf = value->val_str(value, buff, &len)) {
+ auto &cf_manager = rdb_get_cf_manager();
+ auto ret = cf_manager.drop_cf(cf);
+ if (ret == HA_EXIT_SUCCESS) {
+ // NO_LINT_DEBUG
+ sql_print_information("RocksDB: Dropped column family: %s\n", cf);
+ } else {
+ // NO_LINT_DEBUG
+ sql_print_error("RocksDB: Failed to drop column family: %s, error: %d\n",
+ cf, ret);
+ }
+
+ return ret;
+ }
+
+ return HA_EXIT_SUCCESS;
+}
+
///////////////////////////////////////////////////////////
// Hash map: table name => open table handler
///////////////////////////////////////////////////////////
-namespace // anonymous namespace = not visible outside this source file
+namespace // anonymous namespace = not visible outside this source file
{
const ulong TABLE_HASH_SIZE = 32;
typedef Hash_set<Rdb_table_handler> Rdb_table_set;
-struct Rdb_open_tables_map {
+class Rdb_open_tables_map {
+ private:
/* Hash table used to track the handlers of open tables */
- Rdb_table_set m_hash;
+ std::unordered_map<std::string, Rdb_table_handler *> m_table_map;
+
/* The mutex used to protect the hash table */
mutable mysql_mutex_t m_mutex;
- static uchar *get_hash_key(const Rdb_table_handler *const table_handler,
- size_t *const length,
- my_bool not_used MY_ATTRIBUTE((__unused__)));
+ public:
+ void init() {
+ m_table_map.clear();
+ mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &m_mutex, MY_MUTEX_INIT_FAST);
+ }
+
+ void free() {
+ m_table_map.clear();
+ mysql_mutex_destroy(&m_mutex);
+ }
+ size_t count() { return m_table_map.size(); }
Rdb_table_handler *get_table_handler(const char *const table_name);
void release_table_handler(Rdb_table_handler *const table_handler);
- Rdb_open_tables_map() : m_hash(get_hash_key, system_charset_info) { }
-
- void free_hash(void) { m_hash.~Rdb_table_set(); }
-
std::vector<std::string> get_table_names(void) const;
};
-} // anonymous namespace
+} // anonymous namespace
static Rdb_open_tables_map rdb_open_tables;
@@ -326,6 +389,7 @@ static int rocksdb_create_checkpoint(
status = checkpoint->CreateCheckpoint(checkpoint_dir.c_str());
delete checkpoint;
if (status.ok()) {
+ // NO_LINT_DEBUG
sql_print_information(
"RocksDB: created checkpoint in directory : %s\n",
checkpoint_dir.c_str());
@@ -355,6 +419,7 @@ static void rocksdb_force_flush_memtable_now_stub(
static int rocksdb_force_flush_memtable_now(
THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr,
struct st_mysql_value *const value) {
+ // NO_LINT_DEBUG
sql_print_information("RocksDB: Manual memtable flush.");
rocksdb_flush_all_memtables();
return HA_EXIT_SUCCESS;
@@ -367,6 +432,7 @@ static void rocksdb_force_flush_memtable_and_lzero_now_stub(
static int rocksdb_force_flush_memtable_and_lzero_now(
THD *const thd, struct st_mysql_sys_var *const var, void *const var_ptr,
struct st_mysql_value *const value) {
+ // NO_LINT_DEBUG
sql_print_information("RocksDB: Manual memtable and L0 flush.");
rocksdb_flush_all_memtables();
@@ -375,29 +441,46 @@ static int rocksdb_force_flush_memtable_and_lzero_now(
rocksdb::ColumnFamilyMetaData metadata;
rocksdb::ColumnFamilyDescriptor cf_descr;
+ int i, max_attempts = 3, num_errors = 0;
+
for (const auto &cf_handle : cf_manager.get_all_cf()) {
- rdb->GetColumnFamilyMetaData(cf_handle, &metadata);
- cf_handle->GetDescriptor(&cf_descr);
- c_options.output_file_size_limit = cf_descr.options.target_file_size_base;
+ for (i = 0; i < max_attempts; i++) {
+ rdb->GetColumnFamilyMetaData(cf_handle, &metadata);
+ cf_handle->GetDescriptor(&cf_descr);
+ c_options.output_file_size_limit = cf_descr.options.target_file_size_base;
+
+ DBUG_ASSERT(metadata.levels[0].level == 0);
+ std::vector<std::string> file_names;
+ for (auto &file : metadata.levels[0].files) {
+ file_names.emplace_back(file.db_path + file.name);
+ }
- DBUG_ASSERT(metadata.levels[0].level == 0);
- std::vector<std::string> file_names;
- for (auto &file : metadata.levels[0].files) {
- file_names.emplace_back(file.db_path + file.name);
- }
+ if (file_names.empty()) {
+ break;
+ }
- if (!file_names.empty()) {
rocksdb::Status s;
s = rdb->CompactFiles(c_options, cf_handle, file_names, 1);
+ // Due to a race, it's possible for CompactFiles to collide
+ // with auto compaction, causing an error to return
+ // regarding file not found. In that case, retry.
+ if (s.IsInvalidArgument()) {
+ continue;
+ }
+
if (!s.ok() && !s.IsAborted()) {
rdb_handle_io_error(s, RDB_IO_ERROR_GENERAL);
return HA_EXIT_FAILURE;
}
+ break;
+ }
+ if (i == max_attempts) {
+ num_errors++;
}
}
- return HA_EXIT_SUCCESS;
+ return num_errors == 0 ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
}
static void rocksdb_drop_index_wakeup_thread(
@@ -468,11 +551,9 @@ static void rocksdb_set_update_cf_options(THD *thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
-static int rocksdb_check_bulk_load(THD *const thd,
- struct st_mysql_sys_var *var
- MY_ATTRIBUTE((__unused__)),
- void *save,
- struct st_mysql_value *value);
+static int rocksdb_check_bulk_load(
+ THD *const thd, struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)),
+ void *save, struct st_mysql_value *value);
static int rocksdb_check_bulk_load_allow_unsorted(
THD *const thd, struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)),
@@ -499,6 +580,8 @@ static int rocksdb_validate_set_block_cache_size(
static long long rocksdb_block_cache_size;
static long long rocksdb_sim_cache_size;
static my_bool rocksdb_use_clock_cache;
+static double rocksdb_cache_high_pri_pool_ratio;
+static my_bool rocksdb_cache_dump;
/* Use unsigned long long instead of uint64_t because of MySQL compatibility */
static unsigned long long // NOLINT(runtime/int)
rocksdb_rate_limiter_bytes_per_sec;
@@ -518,8 +601,10 @@ static my_bool rocksdb_force_compute_memtable_stats;
static uint32_t rocksdb_force_compute_memtable_stats_cachetime;
static my_bool rocksdb_debug_optimizer_no_zero_cardinality;
static uint32_t rocksdb_wal_recovery_mode;
+static uint32_t rocksdb_stats_level;
static uint32_t rocksdb_access_hint_on_compaction_start;
static char *rocksdb_compact_cf_name;
+static char *rocksdb_delete_cf_name;
static char *rocksdb_checkpoint_name;
static my_bool rocksdb_signal_drop_index_thread;
static my_bool rocksdb_signal_remove_mariabackup_checkpoint;
@@ -555,10 +640,21 @@ char *compression_types_val=
const_cast<char*>(get_rocksdb_supported_compression_types());
static unsigned long rocksdb_write_policy =
rocksdb::TxnDBWritePolicy::WRITE_COMMITTED;
+char *rocksdb_read_free_rpl_tables;
+std::mutex rocksdb_read_free_rpl_tables_mutex;
+#if defined(HAVE_PSI_INTERFACE)
+Regex_list_handler rdb_read_free_regex_handler(key_rwlock_read_free_rpl_tables);
+#else
+Regex_list_handler rdb_read_free_regex_handler;
+#endif
+enum read_free_rpl_type { OFF = 0, PK_ONLY, PK_SK };
+static uint64_t rocksdb_read_free_rpl = read_free_rpl_type::OFF;
static my_bool rocksdb_error_on_suboptimal_collation = 1;
static uint32_t rocksdb_stats_recalc_rate = 0;
static uint32_t rocksdb_debug_manual_compaction_delay = 0;
static uint32_t rocksdb_max_manual_compactions = 0;
+static my_bool rocksdb_rollback_on_timeout = FALSE;
+static my_bool rocksdb_enable_insert_with_update_caching = TRUE;
std::atomic<uint64_t> rocksdb_row_lock_deadlocks(0);
std::atomic<uint64_t> rocksdb_row_lock_wait_timeouts(0);
@@ -566,6 +662,9 @@ std::atomic<uint64_t> rocksdb_snapshot_conflict_errors(0);
std::atomic<uint64_t> rocksdb_wal_group_syncs(0);
std::atomic<uint64_t> rocksdb_manual_compactions_processed(0);
std::atomic<uint64_t> rocksdb_manual_compactions_running(0);
+#ifndef DBUG_OFF
+std::atomic<uint64_t> rocksdb_num_get_for_update_calls(0);
+#endif
@@ -635,7 +734,7 @@ static std::unique_ptr<rocksdb::DBOptions> rdb_init_rocksdb_db_options(void) {
o->listeners.push_back(std::make_shared<Rdb_event_listener>(&ddl_manager));
o->info_log_level = rocksdb::InfoLogLevel::INFO_LEVEL;
o->max_subcompactions = DEFAULT_SUBCOMPACTIONS;
- o->max_open_files = -2; // auto-tune to 50% open_files_limit
+ o->max_open_files = -2; // auto-tune to 50% open_files_limit
o->two_write_queues = true;
o->manual_wal_flush = true;
@@ -659,6 +758,13 @@ static TYPELIB write_policy_typelib = {array_elements(write_policy_names) - 1,
"write_policy_typelib",
write_policy_names, nullptr};
+/* This array needs to be kept up to date with myrocks::read_free_rpl_type */
+static const char *read_free_rpl_names[] = {"OFF", "PK_ONLY", "PK_SK", NullS};
+
+static TYPELIB read_free_rpl_typelib = {array_elements(read_free_rpl_names) - 1,
+ "read_free_rpl_typelib",
+ read_free_rpl_names, nullptr};
+
/* This enum needs to be kept up to date with rocksdb::InfoLogLevel */
static const char *info_log_level_names[] = {"debug_level", "info_level",
"warn_level", "error_level",
@@ -680,6 +786,23 @@ static void rocksdb_set_rocksdb_info_log_level(
RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
}
+static void rocksdb_set_rocksdb_stats_level(THD *const thd,
+ struct st_mysql_sys_var *const var,
+ void *const var_ptr,
+ const void *const save) {
+ DBUG_ASSERT(save != nullptr);
+
+ RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
+ rocksdb_db_options->statistics->set_stats_level(
+ static_cast<const rocksdb::StatsLevel>(
+ *static_cast<const uint64_t *>(save)));
+ // Actual stats level is defined at rocksdb dbopt::statistics::stats_level_
+ // so adjusting rocksdb_stats_level here to make sure it points to
+ // the correct stats level.
+ rocksdb_stats_level = rocksdb_db_options->statistics->get_stats_level();
+ RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
+}
+
static void rocksdb_set_reset_stats(
my_core::THD *const /* unused */,
my_core::st_mysql_sys_var *const var MY_ATTRIBUTE((__unused__)),
@@ -804,7 +927,7 @@ static MYSQL_THDVAR_ULONG(deadlock_detect_depth, PLUGIN_VAR_RQCMDARG,
static MYSQL_THDVAR_BOOL(
commit_time_batch_for_recovery, PLUGIN_VAR_RQCMDARG,
"TransactionOptions::commit_time_batch_for_recovery for RocksDB", nullptr,
- nullptr, FALSE);
+ nullptr, TRUE);
static MYSQL_THDVAR_BOOL(
trace_sst_api, PLUGIN_VAR_RQCMDARG,
@@ -844,10 +967,11 @@ static MYSQL_THDVAR_STR(tmpdir, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"Directory for temporary files during DDL operations.",
nullptr, nullptr, "");
+#define DEFAULT_SKIP_UNIQUE_CHECK_TABLES ".*"
static MYSQL_THDVAR_STR(
skip_unique_check_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
"Skip unique constraint checking for the specified tables", nullptr,
- nullptr, ".*");
+ nullptr, DEFAULT_SKIP_UNIQUE_CHECK_TABLES);
static MYSQL_THDVAR_BOOL(
commit_in_the_middle, PLUGIN_VAR_RQCMDARG,
@@ -861,11 +985,80 @@ static MYSQL_THDVAR_BOOL(
" Blind delete is disabled if the table has secondary key",
nullptr, nullptr, FALSE);
-static MYSQL_THDVAR_STR(
- read_free_rpl_tables, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
+static const char *DEFAULT_READ_FREE_RPL_TABLES = ".*";
+
+static int rocksdb_validate_read_free_rpl_tables(
+ THD *thd MY_ATTRIBUTE((__unused__)),
+ struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), void *save,
+ struct st_mysql_value *value) {
+ char buff[STRING_BUFFER_USUAL_SIZE];
+ int length = sizeof(buff);
+ const char *wlist_buf = value->val_str(value, buff, &length);
+ const auto wlist = wlist_buf ? wlist_buf : DEFAULT_READ_FREE_RPL_TABLES;
+
+#if defined(HAVE_PSI_INTERFACE)
+ Regex_list_handler regex_handler(key_rwlock_read_free_rpl_tables);
+#else
+ Regex_list_handler regex_handler;
+#endif
+
+ if (!regex_handler.set_patterns(wlist)) {
+ warn_about_bad_patterns(&regex_handler, "rocksdb_read_free_rpl_tables");
+ return HA_EXIT_FAILURE;
+ }
+
+ *static_cast<const char **>(save) = my_strdup(wlist, MYF(MY_WME));
+ return HA_EXIT_SUCCESS;
+}
+
+static void rocksdb_update_read_free_rpl_tables(
+ THD *thd MY_ATTRIBUTE((__unused__)),
+ struct st_mysql_sys_var *var MY_ATTRIBUTE((__unused__)), void *var_ptr,
+ const void *save) {
+ const auto wlist = *static_cast<const char *const *>(save);
+ DBUG_ASSERT(wlist != nullptr);
+
+ // This is bound to succeed since we've already checked for bad patterns in
+ // rocksdb_validate_read_free_rpl_tables
+ rdb_read_free_regex_handler.set_patterns(wlist);
+
+ // update all table defs
+ struct Rdb_read_free_rpl_updater : public Rdb_tables_scanner {
+ int add_table(Rdb_tbl_def *tdef) override {
+ tdef->check_and_set_read_free_rpl_table();
+ return HA_EXIT_SUCCESS;
+ }
+ } updater;
+ ddl_manager.scan_for_tables(&updater);
+
+ if (wlist == DEFAULT_READ_FREE_RPL_TABLES) {
+ // If running SET var = DEFAULT, then rocksdb_validate_read_free_rpl_tables
+ // isn't called, and memory is never allocated for the value. Allocate it
+ // here.
+ *static_cast<const char **>(var_ptr) = my_strdup(wlist, MYF(MY_WME));
+ } else {
+ // Otherwise, we just reuse the value allocated from
+ // rocksdb_validate_read_free_rpl_tables.
+ *static_cast<const char **>(var_ptr) = wlist;
+ }
+}
+
+static MYSQL_SYSVAR_STR(
+ read_free_rpl_tables, rocksdb_read_free_rpl_tables,
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC /*| PLUGIN_VAR_ALLOCATED*/,
"List of tables that will use read-free replication on the slave "
"(i.e. not lookup a row during replication)",
- nullptr, nullptr, "");
+ rocksdb_validate_read_free_rpl_tables, rocksdb_update_read_free_rpl_tables,
+ DEFAULT_READ_FREE_RPL_TABLES);
+
+static MYSQL_SYSVAR_ENUM(
+ read_free_rpl, rocksdb_read_free_rpl,
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
+ "Use read-free replication on the slave (i.e. no row lookup during "
+ "replication). Default is OFF, PK_SK will enable it on all tables with "
+ "primary key. PK_ONLY will enable it on tables where the only key is the "
+ "primary key (i.e. no secondary keys).",
+ nullptr, nullptr, read_free_rpl_type::OFF, &read_free_rpl_typelib);
static MYSQL_THDVAR_BOOL(skip_bloom_filter_on_read, PLUGIN_VAR_RQCMDARG,
"Skip using bloom filter for reads", nullptr, nullptr,
@@ -1033,6 +1226,14 @@ static MYSQL_SYSVAR_UINT(
/* min */ (uint)rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords,
/* max */ (uint)rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords, 0);
+static MYSQL_SYSVAR_UINT(
+ stats_level, rocksdb_stats_level, PLUGIN_VAR_RQCMDARG,
+ "Statistics Level for RocksDB. Default is 0 (kExceptHistogramOrTimers)",
+ nullptr, rocksdb_set_rocksdb_stats_level,
+ /* default */ (uint)rocksdb::StatsLevel::kExceptHistogramOrTimers,
+ /* min */ (uint)rocksdb::StatsLevel::kExceptHistogramOrTimers,
+ /* max */ (uint)rocksdb::StatsLevel::kAll, 0);
+
static MYSQL_SYSVAR_SIZE_T(compaction_readahead_size,
rocksdb_db_options->compaction_readahead_size,
PLUGIN_VAR_RQCMDARG,
@@ -1107,7 +1308,8 @@ static MYSQL_SYSVAR_ULONG(
persistent_cache_size_mb, rocksdb_persistent_cache_size_mb,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Size of cache in MB for BlockBasedTableOptions::persistent_cache "
- "for RocksDB", nullptr, nullptr, rocksdb_persistent_cache_size_mb,
+ "for RocksDB",
+ nullptr, nullptr, rocksdb_persistent_cache_size_mb,
/* min */ 0L, /* max */ ULONG_MAX, 0);
static MYSQL_SYSVAR_UINT64_T(
@@ -1286,7 +1488,7 @@ static MYSQL_SYSVAR_LONGLONG(block_cache_size, rocksdb_block_cache_size,
rocksdb_validate_set_block_cache_size, nullptr,
/* default */ RDB_DEFAULT_BLOCK_CACHE_SIZE,
/* min */ RDB_MIN_BLOCK_CACHE_SIZE,
- /* max */ LONGLONG_MAX,
+ /* max */ LLONG_MAX,
/* Block size */ RDB_MIN_BLOCK_CACHE_SIZE);
static MYSQL_SYSVAR_LONGLONG(sim_cache_size, rocksdb_sim_cache_size,
@@ -1295,15 +1497,26 @@ static MYSQL_SYSVAR_LONGLONG(sim_cache_size, rocksdb_sim_cache_size,
nullptr,
/* default */ 0,
/* min */ 0,
- /* max */ LONGLONG_MAX,
+ /* max */ LLONG_MAX,
/* Block size */ 0);
static MYSQL_SYSVAR_BOOL(
- use_clock_cache,
- rocksdb_use_clock_cache,
+ use_clock_cache, rocksdb_use_clock_cache,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
- "Use ClockCache instead of default LRUCache for RocksDB",
- nullptr, nullptr, false);
+ "Use ClockCache instead of default LRUCache for RocksDB", nullptr, nullptr,
+ false);
+
+static MYSQL_SYSVAR_BOOL(cache_dump, rocksdb_cache_dump,
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+ "Include RocksDB block cache content in core dump.",
+ nullptr, nullptr, true);
+
+static MYSQL_SYSVAR_DOUBLE(cache_high_pri_pool_ratio,
+ rocksdb_cache_high_pri_pool_ratio,
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+ "Specify the size of block cache high-pri pool",
+ nullptr, nullptr, /* default */ 0.0, /* min */ 0.0,
+ /* max */ 1.0, 0);
static MYSQL_SYSVAR_BOOL(
cache_index_and_filter_blocks,
@@ -1313,6 +1526,14 @@ static MYSQL_SYSVAR_BOOL(
"BlockBasedTableOptions::cache_index_and_filter_blocks for RocksDB",
nullptr, nullptr, true);
+static MYSQL_SYSVAR_BOOL(
+ cache_index_and_filter_with_high_priority,
+ *reinterpret_cast<my_bool *>(
+ &rocksdb_tbl_options->cache_index_and_filter_blocks_with_high_priority),
+ PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+ "cache_index_and_filter_blocks_with_high_priority for RocksDB", nullptr,
+ nullptr, true);
+
// When pin_l0_filter_and_index_blocks_in_cache is true, RocksDB will use the
// LRU cache, but will always keep the filter & idndex block's handle checked
// out (=won't call ShardedLRUCache::Release), plus the parsed out objects
@@ -1441,10 +1662,10 @@ static MYSQL_SYSVAR_UINT(
nullptr, nullptr, 0, /* min */ 0, /* max */ INT_MAX, 0);
static MYSQL_SYSVAR_BOOL(force_compute_memtable_stats,
- rocksdb_force_compute_memtable_stats,
- PLUGIN_VAR_RQCMDARG,
- "Force to always compute memtable stats",
- nullptr, nullptr, TRUE);
+ rocksdb_force_compute_memtable_stats,
+ PLUGIN_VAR_RQCMDARG,
+ "Force to always compute memtable stats", nullptr,
+ nullptr, TRUE);
static MYSQL_SYSVAR_UINT(force_compute_memtable_stats_cachetime,
rocksdb_force_compute_memtable_stats_cachetime,
@@ -1464,6 +1685,10 @@ static MYSQL_SYSVAR_STR(compact_cf, rocksdb_compact_cf_name,
rocksdb_compact_column_family,
rocksdb_compact_column_family_stub, "");
+static MYSQL_SYSVAR_STR(delete_cf, rocksdb_delete_cf_name, PLUGIN_VAR_RQCMDARG,
+ "Delete column family", rocksdb_delete_column_family,
+ rocksdb_delete_column_family_stub, "");
+
static MYSQL_SYSVAR_STR(create_checkpoint, rocksdb_checkpoint_name,
PLUGIN_VAR_RQCMDARG, "Checkpoint directory",
rocksdb_create_checkpoint,
@@ -1535,6 +1760,12 @@ static MYSQL_SYSVAR_UINT(
"Maximum number of pending + ongoing number of manual compactions.",
nullptr, nullptr, /* default */ 10, /* min */ 0, /* max */ UINT_MAX, 0);
+static MYSQL_SYSVAR_BOOL(
+ rollback_on_timeout, rocksdb_rollback_on_timeout, PLUGIN_VAR_OPCMDARG,
+ "Whether to roll back the complete transaction or a single statement on "
+ "lock wait timeout (a single statement by default)",
+ NULL, NULL, FALSE);
+
static MYSQL_SYSVAR_UINT(
debug_manual_compaction_delay, rocksdb_debug_manual_compaction_delay,
PLUGIN_VAR_RQCMDARG,
@@ -1626,7 +1857,7 @@ static MYSQL_SYSVAR_LONGLONG(
rocksdb_compaction_sequential_deletes_file_size, PLUGIN_VAR_RQCMDARG,
"Minimum file size required for compaction_sequential_deletes", nullptr,
rocksdb_set_compaction_options, 0L,
- /* min */ -1L, /* max */ LONGLONG_MAX, 0);
+ /* min */ -1L, /* max */ LLONG_MAX, 0);
static MYSQL_SYSVAR_BOOL(
compaction_sequential_deletes_count_sd,
@@ -1731,6 +1962,13 @@ static MYSQL_SYSVAR_BOOL(error_on_suboptimal_collation,
"collation is used",
nullptr, nullptr, TRUE);
+static MYSQL_SYSVAR_BOOL(
+ enable_insert_with_update_caching,
+ rocksdb_enable_insert_with_update_caching, PLUGIN_VAR_OPCMDARG,
+ "Whether to enable optimization where we cache the read from a failed "
+ "insertion attempt in INSERT ON DUPLICATE KEY UPDATE",
+ nullptr, nullptr, TRUE);
+
static const int ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE = 100;
static struct st_mysql_sys_var *rocksdb_system_variables[] = {
@@ -1749,6 +1987,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(commit_in_the_middle),
MYSQL_SYSVAR(blind_delete_primary_key),
MYSQL_SYSVAR(read_free_rpl_tables),
+ MYSQL_SYSVAR(read_free_rpl),
MYSQL_SYSVAR(bulk_load_size),
MYSQL_SYSVAR(merge_buf_size),
MYSQL_SYSVAR(enable_bulk_load_api),
@@ -1800,6 +2039,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(enable_thread_tracking),
MYSQL_SYSVAR(perf_context_level),
MYSQL_SYSVAR(wal_recovery_mode),
+ MYSQL_SYSVAR(stats_level),
MYSQL_SYSVAR(access_hint_on_compaction_start),
MYSQL_SYSVAR(new_table_reader_for_compaction_inputs),
MYSQL_SYSVAR(compaction_readahead_size),
@@ -1809,7 +2049,10 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(block_cache_size),
MYSQL_SYSVAR(sim_cache_size),
MYSQL_SYSVAR(use_clock_cache),
+ MYSQL_SYSVAR(cache_high_pri_pool_ratio),
+ MYSQL_SYSVAR(cache_dump),
MYSQL_SYSVAR(cache_index_and_filter_blocks),
+ MYSQL_SYSVAR(cache_index_and_filter_with_high_priority),
MYSQL_SYSVAR(pin_l0_filter_and_index_blocks_in_cache),
MYSQL_SYSVAR(index_type),
MYSQL_SYSVAR(hash_index_allow_collision),
@@ -1838,6 +2081,7 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(debug_optimizer_no_zero_cardinality),
MYSQL_SYSVAR(compact_cf),
+ MYSQL_SYSVAR(delete_cf),
MYSQL_SYSVAR(signal_drop_index_thread),
MYSQL_SYSVAR(pause_background_work),
MYSQL_SYSVAR(enable_2pc),
@@ -1883,10 +2127,13 @@ static struct st_mysql_sys_var *rocksdb_system_variables[] = {
MYSQL_SYSVAR(debug_manual_compaction_delay),
MYSQL_SYSVAR(max_manual_compactions),
MYSQL_SYSVAR(manual_compaction_threads),
+ MYSQL_SYSVAR(rollback_on_timeout),
+
+ MYSQL_SYSVAR(enable_insert_with_update_caching),
nullptr};
-static rocksdb::WriteOptions
-rdb_get_rocksdb_write_options(my_core::THD *const thd) {
+static rocksdb::WriteOptions rdb_get_rocksdb_write_options(
+ my_core::THD *const thd) {
rocksdb::WriteOptions opt;
opt.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC);
@@ -1943,19 +2190,6 @@ static int rocksdb_compact_column_family(THD *const thd,
///////////////////////////////////////////////////////////////////////////////////////////
-/**
- @brief
- Function we use in the creation of our hash to get key.
-*/
-
-uchar *
-Rdb_open_tables_map::get_hash_key(const Rdb_table_handler *const table_handler,
- size_t *const length,
- my_bool not_used MY_ATTRIBUTE((__unused__))) {
- *length = table_handler->m_table_name_length;
- return reinterpret_cast<uchar *>(table_handler->m_table_name);
-}
-
/*
Drop index thread's control
*/
@@ -2012,7 +2246,7 @@ class Rdb_snapshot_notifier : public rocksdb::TransactionNotifier {
void SnapshotCreated(const rocksdb::Snapshot *snapshot) override;
-public:
+ public:
Rdb_snapshot_notifier(const Rdb_snapshot_notifier &) = delete;
Rdb_snapshot_notifier &operator=(const Rdb_snapshot_notifier &) = delete;
@@ -2046,9 +2280,9 @@ String timeout_message(const char *command, const char *name1,
/* This is the base class for transactions when interacting with rocksdb.
-*/
+ */
class Rdb_transaction {
-protected:
+ protected:
ulonglong m_write_count = 0;
ulonglong m_insert_count = 0;
ulonglong m_update_count = 0;
@@ -2059,7 +2293,7 @@ protected:
bool m_is_delayed_snapshot = false;
bool m_is_two_phase = false;
-private:
+ private:
/*
Number of write operations this transaction had when we took the last
savepoint (the idea is not to take another savepoint if we haven't made
@@ -2067,7 +2301,7 @@ private:
*/
ulonglong m_writes_at_last_savepoint;
-protected:
+ protected:
protected:
THD *m_thd = nullptr;
@@ -2092,9 +2326,9 @@ protected:
// This should be used only when updating binlog information.
virtual rocksdb::WriteBatchBase *get_write_batch() = 0;
virtual bool commit_no_binlog() = 0;
- virtual rocksdb::Iterator *
- get_iterator(const rocksdb::ReadOptions &options,
- rocksdb::ColumnFamilyHandle *column_family) = 0;
+ virtual rocksdb::Iterator *get_iterator(
+ const rocksdb::ReadOptions &options,
+ rocksdb::ColumnFamilyHandle *column_family) = 0;
protected:
/*
@@ -2139,7 +2373,9 @@ protected:
String m_detailed_error;
int64_t m_snapshot_timestamp = 0;
bool m_ddl_transaction;
+#ifdef MARIAROCKS_NOT_YET
std::shared_ptr<Rdb_explicit_snapshot> m_explicit_snapshot;
+#endif
/*
Tracks the number of tables in use through external_lock.
@@ -2173,8 +2409,9 @@ protected:
RDB_MUTEX_LOCK_CHECK(s_tx_list_mutex);
- for (auto it : s_tx_list)
+ for (auto it : s_tx_list) {
walker->process_tran(it);
+ }
RDB_MUTEX_UNLOCK_CHECK(s_tx_list_mutex);
}
@@ -2194,7 +2431,8 @@ protected:
convert_error_code_to_mysql() does: force a statement
rollback before returning HA_ERR_LOCK_WAIT_TIMEOUT:
*/
- my_core::thd_mark_transaction_to_rollback(thd, false /*just statement*/);
+ my_core::thd_mark_transaction_to_rollback(
+ thd, static_cast<bool>(rocksdb_rollback_on_timeout));
m_detailed_error.copy(timeout_message(
"index", tbl_def->full_tablename().c_str(), kd.get_name().c_str()));
table_handler->m_lock_wait_timeout_counter.inc();
@@ -2216,9 +2454,10 @@ protected:
char user_host_buff[MAX_USER_HOST_SIZE + 1];
make_user_name(thd, user_host_buff);
// NO_LINT_DEBUG
- sql_print_warning("Got snapshot conflict errors: User: %s "
- "Query: %s",
- user_host_buff, thd->query());
+ sql_print_warning(
+ "Got snapshot conflict errors: User: %s "
+ "Query: %s",
+ user_host_buff, thd->query());
}
m_detailed_error = String(" (snapshot conflict)", system_charset_info);
table_handler->m_deadlock_counter.inc();
@@ -2315,8 +2554,9 @@ protected:
if (m_is_tx_failed) {
rollback();
res = false;
- } else
+ } else {
res = commit();
+ }
return res;
}
@@ -2367,7 +2607,7 @@ protected:
bool has_snapshot() const { return m_read_opts.snapshot != nullptr; }
-private:
+ private:
// The Rdb_sst_info structures we are currently loading. In a partitioned
// table this can have more than one entry
std::vector<std::shared_ptr<Rdb_sst_info>> m_curr_bulk_load;
@@ -2376,7 +2616,7 @@ private:
/* External merge sorts for bulk load: key ID -> merge sort instance */
std::unordered_map<GL_INDEX_ID, Rdb_index_merge> m_key_merge;
-public:
+ public:
int get_key_merge(GL_INDEX_ID kd_gl_id, rocksdb::ColumnFamilyHandle *cf,
Rdb_index_merge **key_merge) {
int res;
@@ -2397,22 +2637,62 @@ public:
return HA_EXIT_SUCCESS;
}
- int finish_bulk_load(int print_client_error = true) {
- int rc = 0, rc2;
+ /* Finish bulk loading for all table handlers belongs to one connection */
+ int finish_bulk_load(bool *is_critical_error = nullptr,
+ int print_client_error = true) {
+ Ensure_cleanup cleanup([&]() {
+ // Always clear everything regardless of success/failure
+ m_curr_bulk_load.clear();
+ m_curr_bulk_load_tablename.clear();
+ m_key_merge.clear();
+ });
+
+ int rc = 0;
+ if (is_critical_error) {
+ *is_critical_error = true;
+ }
+
+ // PREPARE phase: finish all on-going bulk loading Rdb_sst_info and
+ // collect all Rdb_sst_commit_info containing (SST files, cf)
+ int rc2 = 0;
+ std::vector<Rdb_sst_info::Rdb_sst_commit_info> sst_commit_list;
+ sst_commit_list.reserve(m_curr_bulk_load.size());
+
+ for (auto &sst_info : m_curr_bulk_load) {
+ Rdb_sst_info::Rdb_sst_commit_info commit_info;
- std::vector<std::shared_ptr<Rdb_sst_info>>::iterator it;
- for (it = m_curr_bulk_load.begin(); it != m_curr_bulk_load.end(); it++) {
- rc2 = (*it)->commit(print_client_error);
- if (rc2 != 0 && rc == 0) {
+ // Commit the list of SST files and move it to the end of
+ // sst_commit_list, effectively transfer the ownership over
+ rc2 = sst_info->finish(&commit_info, print_client_error);
+ if (rc2 && rc == 0) {
+ // Don't return yet - make sure we finish all the SST infos
rc = rc2;
}
+
+ // Make sure we have work to do - we might be losing the race
+ if (rc2 == 0 && commit_info.has_work()) {
+ sst_commit_list.emplace_back(std::move(commit_info));
+ DBUG_ASSERT(!commit_info.has_work());
+ }
+ }
+
+ if (rc) {
+ return rc;
}
- m_curr_bulk_load.clear();
- m_curr_bulk_load_tablename.clear();
- DBUG_ASSERT(m_curr_bulk_load.size() == 0);
- // Flush the index_merge sort buffers
+ // MERGING Phase: Flush the index_merge sort buffers into SST files in
+ // Rdb_sst_info and collect all Rdb_sst_commit_info containing
+ // (SST files, cf)
if (!m_key_merge.empty()) {
+ Ensure_cleanup malloc_cleanup([]() {
+ /*
+ Explicitly tell jemalloc to clean up any unused dirty pages at this
+ point.
+ See https://reviews.facebook.net/D63723 for more details.
+ */
+ purge_all_jemalloc_arenas();
+ });
+
rocksdb::Slice merge_key;
rocksdb::Slice merge_val;
for (auto it = m_key_merge.begin(); it != m_key_merge.end(); it++) {
@@ -2429,9 +2709,20 @@ public:
// be missed by the compaction filter and not be marked for
// removal. It is unclear how to lock the sql table from the storage
// engine to prevent modifications to it while bulk load is occurring.
- if (keydef == nullptr || table_name.empty()) {
- rc2 = HA_ERR_ROCKSDB_BULK_LOAD;
- break;
+ if (keydef == nullptr) {
+ if (is_critical_error) {
+ // We used to set the error but simply ignores it. This follows
+ // current behavior and we should revisit this later
+ *is_critical_error = false;
+ }
+ return HA_ERR_KEY_NOT_FOUND;
+ } else if (table_name.empty()) {
+ if (is_critical_error) {
+ // We used to set the error but simply ignores it. This follows
+ // current behavior and we should revisit this later
+ *is_critical_error = false;
+ }
+ return HA_ERR_NO_SUCH_TABLE;
}
const std::string &index_name = keydef->get_name();
Rdb_index_merge &rdb_merge = it->second;
@@ -2440,38 +2731,112 @@ public:
// "./database/table"
std::replace(table_name.begin(), table_name.end(), '.', '/');
table_name = "./" + table_name;
- Rdb_sst_info sst_info(rdb, table_name, index_name, rdb_merge.get_cf(),
- *rocksdb_db_options,
- THDVAR(get_thd(), trace_sst_api));
+ auto sst_info = std::make_shared<Rdb_sst_info>(
+ rdb, table_name, index_name, rdb_merge.get_cf(),
+ *rocksdb_db_options, THDVAR(get_thd(), trace_sst_api));
while ((rc2 = rdb_merge.next(&merge_key, &merge_val)) == 0) {
- if ((rc2 = sst_info.put(merge_key, merge_val)) != 0) {
+ if ((rc2 = sst_info->put(merge_key, merge_val)) != 0) {
+ rc = rc2;
+
+ // Don't return yet - make sure we finish the sst_info
break;
}
}
- // rc2 == -1 => finished ok; rc2 > 0 => error
- if (rc2 > 0 || (rc2 = sst_info.commit(print_client_error)) != 0) {
- if (rc == 0) {
- rc = rc2;
- }
- break;
+ // -1 => no more items
+ if (rc2 != -1 && rc != 0) {
+ rc = rc2;
+ }
+
+ Rdb_sst_info::Rdb_sst_commit_info commit_info;
+ rc2 = sst_info->finish(&commit_info, print_client_error);
+ if (rc2 != 0 && rc == 0) {
+ // Only set the error from sst_info->finish if finish failed and we
+ // didn't fail before. In other words, we don't have finish's
+ // success mask earlier failures
+ rc = rc2;
+ }
+
+ if (rc) {
+ return rc;
+ }
+
+ if (commit_info.has_work()) {
+ sst_commit_list.emplace_back(std::move(commit_info));
+ DBUG_ASSERT(!commit_info.has_work());
}
}
- m_key_merge.clear();
+ }
- /*
- Explicitly tell jemalloc to clean up any unused dirty pages at this
- point.
- See https://reviews.facebook.net/D63723 for more details.
- */
- purge_all_jemalloc_arenas();
+ // Early return in case we lost the race completely and end up with no
+ // work at all
+ if (sst_commit_list.size() == 0) {
+ return rc;
+ }
+
+ // INGEST phase: Group all Rdb_sst_commit_info by cf (as they might
+ // have the same cf across different indexes) and call out to RocksDB
+ // to ingest all SST files in one atomic operation
+ rocksdb::IngestExternalFileOptions options;
+ options.move_files = true;
+ options.snapshot_consistency = false;
+ options.allow_global_seqno = false;
+ options.allow_blocking_flush = false;
+
+ std::map<rocksdb::ColumnFamilyHandle *, rocksdb::IngestExternalFileArg>
+ arg_map;
+
+ // Group by column_family
+ for (auto &commit_info : sst_commit_list) {
+ if (arg_map.find(commit_info.get_cf()) == arg_map.end()) {
+ rocksdb::IngestExternalFileArg arg;
+ arg.column_family = commit_info.get_cf(),
+ arg.external_files = commit_info.get_committed_files(),
+ arg.options = options;
+
+ arg_map.emplace(commit_info.get_cf(), arg);
+ } else {
+ auto &files = arg_map[commit_info.get_cf()].external_files;
+ files.insert(files.end(), commit_info.get_committed_files().begin(),
+ commit_info.get_committed_files().end());
+ }
+ }
+
+ std::vector<rocksdb::IngestExternalFileArg> args;
+ size_t file_count = 0;
+ for (auto &cf_files_pair : arg_map) {
+ args.push_back(cf_files_pair.second);
+ file_count += cf_files_pair.second.external_files.size();
+ }
+
+ const rocksdb::Status s = rdb->IngestExternalFiles(args);
+ if (THDVAR(m_thd, trace_sst_api)) {
+ // NO_LINT_DEBUG
+ sql_print_information(
+ "SST Tracing: IngestExternalFile '%zu' files returned %s", file_count,
+ s.ok() ? "ok" : "not ok");
+ }
+
+ if (!s.ok()) {
+ if (print_client_error) {
+ Rdb_sst_info::report_error_msg(s, nullptr);
+ }
+ return HA_ERR_ROCKSDB_BULK_LOAD;
+ }
+
+ // COMMIT phase: mark everything as completed. This avoids SST file
+ // deletion kicking in. Otherwise SST files would get deleted if this
+ // entire operation is aborted
+ for (auto &commit_info : sst_commit_list) {
+ commit_info.commit();
}
+
return rc;
}
int start_bulk_load(ha_rocksdb *const bulk_load,
- std::shared_ptr<Rdb_sst_info> sst_info) {
+ std::shared_ptr<Rdb_sst_info> sst_info) {
/*
If we already have an open bulk load of a table and the name doesn't
match the current one, close out the currently running one. This allows
@@ -2484,8 +2849,6 @@ public:
bulk_load->get_table_basename() != m_curr_bulk_load_tablename) {
const auto res = finish_bulk_load();
if (res != HA_EXIT_SUCCESS) {
- m_curr_bulk_load.clear();
- m_curr_bulk_load_tablename.clear();
return res;
}
}
@@ -2535,12 +2898,10 @@ public:
inserts while inside a multi-statement transaction.
*/
bool flush_batch() {
- if (get_write_count() == 0)
- return false;
+ if (get_write_count() == 0) return false;
/* Commit the current transaction */
- if (commit_no_binlog())
- return true;
+ if (commit_no_binlog()) return true;
/* Start another one */
start_tx();
@@ -2552,7 +2913,7 @@ public:
std::max(m_auto_incr_map[gl_index_id], curr_id);
}
-#ifndef NDEBUG
+#ifndef DBUG_OFF
ulonglong get_auto_incr(const GL_INDEX_ID &gl_index_id) {
if (m_auto_incr_map.count(gl_index_id) > 0) {
return m_auto_incr_map[gl_index_id];
@@ -2563,13 +2924,14 @@ public:
virtual rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family,
const rocksdb::Slice &key,
- const rocksdb::Slice &value) = 0;
- virtual rocksdb::Status
- delete_key(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) = 0;
- virtual rocksdb::Status
- single_delete(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) = 0;
+ const rocksdb::Slice &value,
+ const bool assume_tracked) = 0;
+ virtual rocksdb::Status delete_key(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, const bool assume_tracked) = 0;
+ virtual rocksdb::Status single_delete(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, const bool assume_tracked) = 0;
virtual bool has_modifications() const = 0;
@@ -2585,25 +2947,23 @@ public:
virtual rocksdb::Status get(rocksdb::ColumnFamilyHandle *const column_family,
const rocksdb::Slice &key,
rocksdb::PinnableSlice *const value) const = 0;
- virtual rocksdb::Status
- get_for_update(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
- bool exclusive) = 0;
-
- rocksdb::Iterator *
- get_iterator(rocksdb::ColumnFamilyHandle *const column_family,
- bool skip_bloom_filter, bool fill_cache,
- const rocksdb::Slice &eq_cond_lower_bound,
- const rocksdb::Slice &eq_cond_upper_bound,
- bool read_current = false, bool create_snapshot = true) {
+ virtual rocksdb::Status get_for_update(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
+ bool exclusive, const bool do_validate) = 0;
+
+ rocksdb::Iterator *get_iterator(
+ rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter,
+ bool fill_cache, const rocksdb::Slice &eq_cond_lower_bound,
+ const rocksdb::Slice &eq_cond_upper_bound, bool read_current = false,
+ bool create_snapshot = true) {
// Make sure we are not doing both read_current (which implies we don't
// want a snapshot) and create_snapshot which makes sure we create
// a snapshot
DBUG_ASSERT(column_family != nullptr);
DBUG_ASSERT(!read_current || !create_snapshot);
- if (create_snapshot)
- acquire_snapshot(true);
+ if (create_snapshot) acquire_snapshot(true);
rocksdb::ReadOptions options = m_read_opts;
@@ -2635,25 +2995,33 @@ public:
entire transaction.
*/
do_set_savepoint();
- m_writes_at_last_savepoint= m_write_count;
+ m_writes_at_last_savepoint = m_write_count;
}
/*
Called when a "top-level" statement inside a transaction completes
successfully and its changes become part of the transaction's changes.
*/
- void make_stmt_savepoint_permanent() {
-
+ int make_stmt_savepoint_permanent() {
// Take another RocksDB savepoint only if we had changes since the last
// one. This is very important for long transactions doing lots of
// SELECTs.
- if (m_writes_at_last_savepoint != m_write_count)
- {
+ if (m_writes_at_last_savepoint != m_write_count) {
+ rocksdb::WriteBatchBase *batch = get_write_batch();
+ rocksdb::Status status = rocksdb::Status::NotFound();
+ while ((status = batch->PopSavePoint()) == rocksdb::Status::OK()) {
+ }
+
+ if (status != rocksdb::Status::NotFound()) {
+ return HA_EXIT_FAILURE;
+ }
+
do_set_savepoint();
- m_writes_at_last_savepoint= m_write_count;
+ m_writes_at_last_savepoint = m_write_count;
}
- }
+ return HA_EXIT_SUCCESS;
+ }
/*
Rollback to the savepoint we've set before the last statement
@@ -2669,7 +3037,7 @@ public:
statement start) because setting a savepoint is cheap.
*/
do_set_savepoint();
- m_writes_at_last_savepoint= m_write_count;
+ m_writes_at_last_savepoint = m_write_count;
}
}
@@ -2733,10 +3101,11 @@ class Rdb_transaction_impl : public Rdb_transaction {
rocksdb::Transaction *m_rocksdb_tx = nullptr;
rocksdb::Transaction *m_rocksdb_reuse_tx = nullptr;
-public:
+ public:
void set_lock_timeout(int timeout_sec_arg) override {
- if (m_rocksdb_tx)
+ if (m_rocksdb_tx) {
m_rocksdb_tx->SetLockTimeout(rdb_convert_sec_to_ms(m_timeout_sec));
+ }
}
void set_sync(bool sync) override {
@@ -2753,7 +3122,7 @@ public:
virtual bool is_writebatch_trx() const override { return false; }
-private:
+ private:
void release_tx(void) {
// We are done with the current active transaction object. Preserve it
// for later reuse.
@@ -2803,7 +3172,7 @@ private:
goto error;
}
-error:
+ error:
/* Save the transaction object to be reused */
release_tx();
@@ -2817,7 +3186,7 @@ error:
return res;
}
-public:
+ public:
void rollback() override {
m_write_count = 0;
m_insert_count = 0;
@@ -2884,39 +3253,42 @@ public:
m_read_opts.snapshot = nullptr;
}
- if (need_clear && m_rocksdb_tx != nullptr)
- m_rocksdb_tx->ClearSnapshot();
+ if (need_clear && m_rocksdb_tx != nullptr) m_rocksdb_tx->ClearSnapshot();
}
bool has_snapshot() { return m_read_opts.snapshot != nullptr; }
rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key,
- const rocksdb::Slice &value) override {
+ const rocksdb::Slice &key, const rocksdb::Slice &value,
+ const bool assume_tracked) override {
++m_write_count;
++m_lock_count;
- if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks)
+ if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) {
return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit);
- return m_rocksdb_tx->Put(column_family, key, value);
+ }
+ return m_rocksdb_tx->Put(column_family, key, value, assume_tracked);
}
rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) override {
+ const rocksdb::Slice &key,
+ const bool assume_tracked) override {
++m_write_count;
++m_lock_count;
- if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks)
+ if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) {
return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit);
- return m_rocksdb_tx->Delete(column_family, key);
+ }
+ return m_rocksdb_tx->Delete(column_family, key, assume_tracked);
}
- rocksdb::Status
- single_delete(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) override {
+ rocksdb::Status single_delete(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, const bool assume_tracked) override {
++m_write_count;
++m_lock_count;
- if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks)
+ if (m_write_count > m_max_row_locks || m_lock_count > m_max_row_locks) {
return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit);
- return m_rocksdb_tx->SingleDelete(column_family, key);
+ }
+ return m_rocksdb_tx->SingleDelete(column_family, key, assume_tracked);
}
bool has_modifications() const override {
@@ -2952,23 +3324,39 @@ public:
return m_rocksdb_tx->Get(m_read_opts, column_family, key, value);
}
- rocksdb::Status
- get_for_update(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
- bool exclusive) override {
- if (++m_lock_count > m_max_row_locks)
+ rocksdb::Status get_for_update(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
+ bool exclusive, const bool do_validate) override {
+ if (++m_lock_count > m_max_row_locks) {
return rocksdb::Status::Aborted(rocksdb::Status::kLockLimit);
+ }
if (value != nullptr) {
value->Reset();
}
- return m_rocksdb_tx->GetForUpdate(m_read_opts, column_family, key, value,
- exclusive);
+ rocksdb::Status s;
+ // If snapshot is null, pass it to GetForUpdate and snapshot is
+ // initialized there. Snapshot validation is skipped in that case.
+ if (m_read_opts.snapshot == nullptr || do_validate) {
+ s = m_rocksdb_tx->GetForUpdate(
+ m_read_opts, column_family, key, value, exclusive,
+ m_read_opts.snapshot ? do_validate : false);
+ } else {
+ // If snapshot is set, and if skipping validation,
+ // call GetForUpdate without validation and set back old snapshot
+ auto saved_snapshot = m_read_opts.snapshot;
+ m_read_opts.snapshot = nullptr;
+ s = m_rocksdb_tx->GetForUpdate(m_read_opts, column_family, key, value,
+ exclusive, false);
+ m_read_opts.snapshot = saved_snapshot;
+ }
+ return s;
}
- rocksdb::Iterator *
- get_iterator(const rocksdb::ReadOptions &options,
- rocksdb::ColumnFamilyHandle *const column_family) override {
+ rocksdb::Iterator *get_iterator(
+ const rocksdb::ReadOptions &options,
+ rocksdb::ColumnFamilyHandle *const column_family) override {
global_stats.queries[QUERIES_RANGE].inc();
return m_rocksdb_tx->GetIterator(options, column_family);
}
@@ -3013,10 +3401,9 @@ public:
m_ddl_transaction = false;
}
- /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
- void do_set_savepoint() override {
- m_rocksdb_tx->SetSavePoint();
- }
+ /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints
+ */
+ void do_set_savepoint() override { m_rocksdb_tx->SetSavePoint(); }
void do_rollback_to_savepoint() override {
m_rocksdb_tx->RollbackToSavePoint();
@@ -3048,14 +3435,14 @@ public:
const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot();
if (org_snapshot != cur_snapshot) {
- if (org_snapshot != nullptr)
- m_snapshot_timestamp = 0;
+ if (org_snapshot != nullptr) m_snapshot_timestamp = 0;
m_read_opts.snapshot = cur_snapshot;
- if (cur_snapshot != nullptr)
+ if (cur_snapshot != nullptr) {
rdb->GetEnv()->GetCurrentTime(&m_snapshot_timestamp);
- else
+ } else {
m_is_delayed_snapshot = true;
+ }
}
}
}
@@ -3066,7 +3453,7 @@ public:
m_notifier = std::make_shared<Rdb_snapshot_notifier>(this);
}
- virtual ~Rdb_transaction_impl() {
+ virtual ~Rdb_transaction_impl() override {
rollback();
// Theoretically the notifier could outlive the Rdb_transaction_impl
@@ -3098,7 +3485,7 @@ class Rdb_writebatch_impl : public Rdb_transaction {
m_ddl_transaction = false;
}
-private:
+ private:
bool prepare(const rocksdb::TransactionName &name) override { return true; }
bool commit_no_binlog() override {
@@ -3122,7 +3509,7 @@ private:
res = true;
goto error;
}
-error:
+ error:
reset();
m_write_count = 0;
@@ -3135,16 +3522,12 @@ error:
}
/* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
- void do_set_savepoint() override {
- m_batch->SetSavePoint();
- }
+ void do_set_savepoint() override { m_batch->SetSavePoint(); }
- void do_rollback_to_savepoint() override {
- m_batch->RollbackToSavePoint();
- }
+ void do_rollback_to_savepoint() override { m_batch->RollbackToSavePoint(); }
-public:
+ public:
bool is_writebatch_trx() const override { return true; }
void set_lock_timeout(int timeout_sec_arg) override {
@@ -3172,8 +3555,7 @@ public:
}
void acquire_snapshot(bool acquire_now) override {
- if (m_read_opts.snapshot == nullptr)
- snapshot_created(rdb->GetSnapshot());
+ if (m_read_opts.snapshot == nullptr) snapshot_created(rdb->GetSnapshot());
}
void release_snapshot() override {
@@ -3184,8 +3566,8 @@ public:
}
rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key,
- const rocksdb::Slice &value) override {
+ const rocksdb::Slice &key, const rocksdb::Slice &value,
+ const bool assume_tracked) override {
++m_write_count;
m_batch->Put(column_family, key, value);
// Note Put/Delete in write batch doesn't return any error code. We simply
@@ -3194,15 +3576,16 @@ public:
}
rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) override {
+ const rocksdb::Slice &key,
+ const bool assume_tracked) override {
++m_write_count;
m_batch->Delete(column_family, key);
return rocksdb::Status::OK();
}
- rocksdb::Status
- single_delete(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key) override {
+ rocksdb::Status single_delete(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, const bool /* assume_tracked */) override {
++m_write_count;
m_batch->SingleDelete(column_family, key);
return rocksdb::Status::OK();
@@ -3227,10 +3610,10 @@ public:
value);
}
- rocksdb::Status
- get_for_update(rocksdb::ColumnFamilyHandle *const column_family,
- const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
- bool exclusive) override {
+ rocksdb::Status get_for_update(
+ rocksdb::ColumnFamilyHandle *const column_family,
+ const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
+ bool /* exclusive */, const bool /* do_validate */) override {
if (value == nullptr) {
rocksdb::PinnableSlice pin_val;
rocksdb::Status s = get(column_family, key, &pin_val);
@@ -3241,9 +3624,9 @@ public:
return get(column_family, key, value);
}
- rocksdb::Iterator *
- get_iterator(const rocksdb::ReadOptions &options,
- rocksdb::ColumnFamilyHandle *const column_family) override {
+ rocksdb::Iterator *get_iterator(
+ const rocksdb::ReadOptions &options,
+ rocksdb::ColumnFamilyHandle *const /* column_family */) override {
const auto it = rdb->NewIterator(options);
return m_batch->NewIteratorWithBase(it);
}
@@ -3264,8 +3647,7 @@ public:
void start_stmt() override {}
void rollback_stmt() override {
- if (m_batch)
- rollback_to_stmt_savepoint();
+ if (m_batch) rollback_to_stmt_savepoint();
}
explicit Rdb_writebatch_impl(THD *const thd)
@@ -3274,7 +3656,7 @@ public:
true);
}
- virtual ~Rdb_writebatch_impl() {
+ virtual ~Rdb_writebatch_impl() override {
rollback();
delete m_batch;
}
@@ -3332,7 +3714,7 @@ class Rdb_perf_context_guard {
}
};
-} // anonymous namespace
+} // anonymous namespace
/*
TODO: maybe, call this in external_lock() and store in ha_rocksdb..
@@ -3347,9 +3729,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
false /* MARIAROCKS_NOT_YET: THDVAR(thd, master_skip_tx_api) && !thd->rgi_slave)*/)
{
tx = new Rdb_writebatch_impl(thd);
- }
- else
- {
+ } else {
tx = new Rdb_transaction_impl(thd);
}
tx->set_params(THDVAR(thd, lock_wait_timeout), THDVAR(thd, max_row_locks));
@@ -3368,12 +3748,14 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
static int rocksdb_close_connection(handlerton *const hton, THD *const thd) {
Rdb_transaction *tx = get_tx_from_thd(thd);
if (tx != nullptr) {
- int rc = tx->finish_bulk_load(false);
- if (rc != 0) {
+ bool is_critical_error;
+ int rc = tx->finish_bulk_load(&is_critical_error, false);
+ if (rc != 0 && is_critical_error) {
// NO_LINT_DEBUG
- sql_print_error("RocksDB: Error %d finalizing last SST file while "
- "disconnecting",
- rc);
+ sql_print_error(
+ "RocksDB: Error %d finalizing last SST file while "
+ "disconnecting",
+ rc);
}
delete tx;
@@ -3514,9 +3896,9 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
}
DEBUG_SYNC(thd, "rocksdb.prepared");
- }
- else
+ } else {
tx->make_stmt_savepoint_permanent();
+ }
return HA_EXIT_SUCCESS;
}
@@ -3557,9 +3939,8 @@ static int rocksdb_commit_by_xid(handlerton *const hton, XID *const xid) {
DBUG_RETURN(HA_EXIT_SUCCESS);
}
-static int
-rocksdb_rollback_by_xid(handlerton *const hton MY_ATTRIBUTE((__unused__)),
- XID *const xid) {
+static int rocksdb_rollback_by_xid(
+ handlerton *const hton MY_ATTRIBUTE((__unused__)), XID *const xid) {
DBUG_ENTER_FUNC();
DBUG_ASSERT(hton != nullptr);
@@ -3605,6 +3986,7 @@ static void rdb_xid_from_string(const std::string &src, XID *const dst) {
DBUG_ASSERT(dst->gtrid_length >= 0 && dst->gtrid_length <= MAXGTRIDSIZE);
DBUG_ASSERT(dst->bqual_length >= 0 && dst->bqual_length <= MAXBQUALSIZE);
+ memset(dst->data, 0, XIDDATASIZE);
src.copy(dst->data, (dst->gtrid_length) + (dst->bqual_length),
RDB_XIDHDR_LEN);
}
@@ -3629,13 +4011,16 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
if (is_binlog_advanced(binlog_file, *binlog_pos, file_buf, pos)) {
memcpy(binlog_file, file_buf, FN_REFLEN + 1);
*binlog_pos = pos;
- fprintf(stderr, "RocksDB: Last binlog file position %llu,"
- " file name %s\n",
+ // NO_LINT_DEBUG
+ fprintf(stderr,
+ "RocksDB: Last binlog file position %llu,"
+ " file name %s\n",
pos, file_buf);
if (*gtid_buf) {
global_sid_lock->rdlock();
binlog_max_gtid->parse(global_sid_map, gtid_buf);
global_sid_lock->unlock();
+ // NO_LINT_DEBUG
fprintf(stderr, "RocksDB: Last MySQL Gtid %s\n", gtid_buf);
}
}
@@ -3733,8 +4118,8 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd));
if (tx != nullptr) {
- if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
- OPTION_BEGIN))) {
+ if (commit_tx || (!my_core::thd_test_options(
+ thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/*
This will not add anything to commit_latency_stats, and this is correct
right?
@@ -3865,7 +4250,7 @@ static std::string format_string(const char *const format, ...) {
char *buff = static_buff;
std::unique_ptr<char[]> dynamic_buff = nullptr;
- len++; // Add one for null terminator
+ len++; // Add one for null terminator
// for longer output use an allocated buffer
if (static_cast<uint>(len) > sizeof(static_buff)) {
@@ -3890,7 +4275,7 @@ static std::string format_string(const char *const format, ...) {
}
class Rdb_snapshot_status : public Rdb_tx_list_walker {
-private:
+ private:
std::string m_data;
static std::string current_timestamp(void) {
@@ -3924,9 +4309,8 @@ private:
"=========================================\n";
}
- static Rdb_deadlock_info::Rdb_dl_trx_info
- get_dl_txn_info(const rocksdb::DeadlockInfo &txn,
- const GL_INDEX_ID &gl_index_id) {
+ static Rdb_deadlock_info::Rdb_dl_trx_info get_dl_txn_info(
+ const rocksdb::DeadlockInfo &txn, const GL_INDEX_ID &gl_index_id) {
Rdb_deadlock_info::Rdb_dl_trx_info txn_data;
txn_data.trx_id = txn.m_txn_id;
@@ -3953,13 +4337,12 @@ private:
return txn_data;
}
- static Rdb_deadlock_info
- get_dl_path_trx_info(const rocksdb::DeadlockPath &path_entry) {
+ static Rdb_deadlock_info get_dl_path_trx_info(
+ const rocksdb::DeadlockPath &path_entry) {
Rdb_deadlock_info deadlock_info;
- for (auto it = path_entry.path.begin(); it != path_entry.path.end();
- it++) {
- auto txn = *it;
+ for (auto it = path_entry.path.begin(); it != path_entry.path.end(); it++) {
+ const auto &txn = *it;
const GL_INDEX_ID gl_index_id = {
txn.m_cf_id, rdb_netbuf_to_uint32(reinterpret_cast<const uchar *>(
txn.m_waiting_key.c_str()))};
@@ -3968,7 +4351,7 @@ private:
DBUG_ASSERT_IFF(path_entry.limit_exceeded, path_entry.path.empty());
/* print the first txn in the path to display the full deadlock cycle */
if (!path_entry.path.empty() && !path_entry.limit_exceeded) {
- auto deadlocking_txn = *(path_entry.path.end() - 1);
+ const auto &deadlocking_txn = *(path_entry.path.end() - 1);
deadlock_info.victim_trx_id = deadlocking_txn.m_txn_id;
deadlock_info.deadlock_time = path_entry.deadlock_time;
}
@@ -3997,7 +4380,7 @@ private:
#endif
m_data += format_string(
"---SNAPSHOT, ACTIVE %lld sec\n"
- "%s\n"
+ "%s\n"
"lock count %llu, write count %llu\n"
"insert count %llu, update count %llu, delete count %llu\n",
(longlong)(curr_time - snapshot_timestamp), buffer, tx->get_lock_count(),
@@ -4010,19 +4393,21 @@ private:
auto dlock_buffer = rdb->GetDeadlockInfoBuffer();
m_data += "----------LATEST DETECTED DEADLOCKS----------\n";
- for (auto path_entry : dlock_buffer) {
+ for (const auto &path_entry : dlock_buffer) {
std::string path_data;
if (path_entry.limit_exceeded) {
path_data += "\n-------DEADLOCK EXCEEDED MAX DEPTH-------\n";
} else {
- path_data += "\n*** DEADLOCK PATH\n"
- "=========================================\n";
+ path_data +=
+ "\n*** DEADLOCK PATH\n"
+ "=========================================\n";
const auto dl_info = get_dl_path_trx_info(path_entry);
const auto deadlock_time = dl_info.deadlock_time;
for (auto it = dl_info.path.begin(); it != dl_info.path.end(); it++) {
- const auto trx_info = *it;
+ const auto &trx_info = *it;
path_data += format_string(
- "TIMESTAMP: %" PRId64 "\n"
+ "TIMESTAMP: %" PRId64
+ "\n"
"TRANSACTION ID: %u\n"
"COLUMN FAMILY NAME: %s\n"
"WAITING KEY: %s\n"
@@ -4037,9 +4422,9 @@ private:
path_data += "---------------WAITING FOR---------------\n";
}
}
- path_data +=
- format_string("\n--------TRANSACTION ID: %u GOT DEADLOCK---------\n",
- dl_info.victim_trx_id);
+ path_data += format_string(
+ "\n--------TRANSACTION ID: %u GOT DEADLOCK---------\n",
+ dl_info.victim_trx_id);
}
m_data += path_data;
}
@@ -4048,7 +4433,7 @@ private:
std::vector<Rdb_deadlock_info> get_deadlock_info() {
std::vector<Rdb_deadlock_info> deadlock_info;
auto dlock_buffer = rdb->GetDeadlockInfoBuffer();
- for (auto path_entry : dlock_buffer) {
+ for (const auto &path_entry : dlock_buffer) {
if (!path_entry.limit_exceeded) {
deadlock_info.push_back(get_dl_path_trx_info(path_entry));
}
@@ -4063,10 +4448,10 @@ private:
* out relevant information for information_schema.rocksdb_trx
*/
class Rdb_trx_info_aggregator : public Rdb_tx_list_walker {
-private:
+ private:
std::vector<Rdb_trx_info> *m_trx_info;
-public:
+ public:
explicit Rdb_trx_info_aggregator(std::vector<Rdb_trx_info> *const trx_info)
: m_trx_info(trx_info) {}
@@ -4197,9 +4582,10 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd,
// sure that output will look unified.
DBUG_ASSERT(commit_latency_stats != nullptr);
- snprintf(buf, sizeof(buf), "rocksdb.commit_latency statistics "
- "Percentiles :=> 50 : %.2f 95 : %.2f "
- "99 : %.2f 100 : %.2f\n",
+ snprintf(buf, sizeof(buf),
+ "rocksdb.commit_latency statistics "
+ "Percentiles :=> 50 : %.2f 95 : %.2f "
+ "99 : %.2f 100 : %.2f\n",
commit_latency_stats->Percentile(50),
commit_latency_stats->Percentile(95),
commit_latency_stats->Percentile(99),
@@ -4221,7 +4607,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd,
}
if (rdb->GetIntProperty("rocksdb.actual-delayed-write-rate", &v)) {
- snprintf(buf, sizeof(buf), "rocksdb.actual_delayed_write_rate "
+ snprintf(buf, sizeof(buf),
"COUNT : %llu\n",
(ulonglong)v);
str.append(buf);
@@ -4309,6 +4695,7 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd,
rocksdb::Status s = rdb->GetEnv()->GetThreadList(&thread_list);
if (!s.ok()) {
+ // NO_LINT_DEBUG
sql_print_error("RocksDB: Returned error (%s) from GetThreadList.\n",
s.ToString().c_str());
res |= true;
@@ -4325,37 +4712,23 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd,
"\noperation_type: " + it.GetOperationName(it.operation_type) +
"\noperation_stage: " +
it.GetOperationStageName(it.operation_stage) +
- "\nelapsed_time_ms: " +
- it.MicrosToString(it.op_elapsed_micros);
+ "\nelapsed_time_ms: " + it.MicrosToString(it.op_elapsed_micros);
- for (auto &it_props :
- it.InterpretOperationProperties(it.operation_type,
- it.op_properties)) {
+ for (auto &it_props : it.InterpretOperationProperties(
+ it.operation_type, it.op_properties)) {
str += "\n" + it_props.first + ": " + std::to_string(it_props.second);
}
str += "\nstate_type: " + it.GetStateName(it.state_type);
- res |= print_stats(thd, "BG_THREADS", std::to_string(it.thread_id),
- str, stat_print);
+ res |= print_stats(thd, "BG_THREADS", std::to_string(it.thread_id), str,
+ stat_print);
}
}
#ifdef MARIAROCKS_NOT_YET
/* Explicit snapshot information */
- str.clear();
- {
- std::lock_guard<std::mutex> lock(explicit_snapshot_mutex);
- for (const auto &elem : explicit_snapshots) {
- const auto &ss = elem.second.lock();
- DBUG_ASSERT(ss != nullptr);
- const auto &info = ss->ss_info;
- str += "\nSnapshot ID: " + std::to_string(info.snapshot_id) +
- "\nBinlog File: " + info.binlog_file +
- "\nBinlog Pos: " + std::to_string(info.binlog_pos) +
- "\nGtid Executed: " + info.gtid_executed + "\n";
- }
- }
+ str = Rdb_explicit_snapshot::dump_snapshots();
#endif
if (!str.empty()) {
@@ -4390,38 +4763,38 @@ static bool rocksdb_explicit_snapshot(
snapshot_info_st *ss_info) /*!< out: Snapshot information */
{
switch (ss_info->op) {
- case snapshot_operation::SNAPSHOT_CREATE: {
- if (mysql_bin_log_is_open()) {
- mysql_bin_log_lock_commits(ss_info);
+ case snapshot_operation::SNAPSHOT_CREATE: {
+ if (mysql_bin_log_is_open()) {
+ mysql_bin_log_lock_commits(ss_info);
+ }
+ auto s = Rdb_explicit_snapshot::create(ss_info, rdb, rdb->GetSnapshot());
+ if (mysql_bin_log_is_open()) {
+ mysql_bin_log_unlock_commits(ss_info);
+ }
+
+ thd->set_explicit_snapshot(s);
+ return s == nullptr;
}
- auto s = Rdb_explicit_snapshot::create(ss_info, rdb, rdb->GetSnapshot());
- if (mysql_bin_log_is_open()) {
- mysql_bin_log_unlock_commits(ss_info);
+ case snapshot_operation::SNAPSHOT_ATTACH: {
+ auto s = Rdb_explicit_snapshot::get(ss_info->snapshot_id);
+ if (!s) {
+ return true;
+ }
+ *ss_info = s->ss_info;
+ thd->set_explicit_snapshot(s);
+ return false;
}
-
- thd->set_explicit_snapshot(s);
- return s == nullptr;
- }
- case snapshot_operation::SNAPSHOT_ATTACH: {
- auto s = Rdb_explicit_snapshot::get(ss_info->snapshot_id);
- if (!s) {
- return true;
+ case snapshot_operation::SNAPSHOT_RELEASE: {
+ if (!thd->get_explicit_snapshot()) {
+ return true;
+ }
+ *ss_info = thd->get_explicit_snapshot()->ss_info;
+ thd->set_explicit_snapshot(nullptr);
+ return false;
}
- *ss_info = s->ss_info;
- thd->set_explicit_snapshot(s);
- return false;
- }
- case snapshot_operation::SNAPSHOT_RELEASE: {
- if (!thd->get_explicit_snapshot()) {
+ default:
+ DBUG_ASSERT(false);
return true;
- }
- *ss_info = thd->get_explicit_snapshot()->ss_info;
- thd->set_explicit_snapshot(nullptr);
- return false;
- }
- default:
- DBUG_ASSERT(false);
- return true;
}
return true;
}
@@ -4567,7 +4940,7 @@ static int rocksdb_start_tx_with_shared_read_view(
// case: an explicit snapshot was not assigned to this transaction
if (!tx->m_explicit_snapshot) {
tx->m_explicit_snapshot =
- Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot);
+ Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot);
if (!tx->m_explicit_snapshot) {
my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0));
error = HA_EXIT_FAILURE;
@@ -4611,9 +4984,8 @@ static int rocksdb_rollback_to_savepoint(handlerton *const hton, THD *const thd,
return tx->rollback_to_savepoint(savepoint);
}
-static bool
-rocksdb_rollback_to_savepoint_can_release_mdl(handlerton *const hton,
- THD *const thd) {
+static bool rocksdb_rollback_to_savepoint_can_release_mdl(
+ handlerton *const /* hton */, THD *const /* thd */) {
return true;
}
@@ -4661,7 +5033,7 @@ static void rocksdb_update_table_stats(
/* Function needs to return void because of the interface and we've
* detected an error which shouldn't happen. There's no way to let
* caller know that something failed.
- */
+ */
SHIP_ASSERT(false);
return;
}
@@ -4741,8 +5113,9 @@ static rocksdb::Status check_rocksdb_options_compatibility(
}
if (loaded_cf_descs.size() != cf_descr.size()) {
- return rocksdb::Status::NotSupported("Mismatched size of column family "
- "descriptors.");
+ return rocksdb::Status::NotSupported(
+ "Mismatched size of column family "
+ "descriptors.");
}
// Please see RocksDB documentation for more context about why we need to set
@@ -4792,17 +5165,22 @@ static int rocksdb_init_func(void *const p) {
}
if (rdb_check_rocksdb_corruption()) {
- sql_print_error("RocksDB: There was a corruption detected in RockDB files. "
- "Check error log emitted earlier for more details.");
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: There was a corruption detected in RockDB files. "
+ "Check error log emitted earlier for more details.");
if (rocksdb_allow_to_start_after_corruption) {
+ // NO_LINT_DEBUG
sql_print_information(
"RocksDB: Remove rocksdb_allow_to_start_after_corruption to prevent "
"server operating if RocksDB corruption is detected.");
} else {
- sql_print_error("RocksDB: The server will exit normally and stop restart "
- "attempts. Remove %s file from data directory and "
- "start mysqld manually.",
- rdb_corruption_marker_file_name().c_str());
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: The server will exit normally and stop restart "
+ "attempts. Remove %s file from data directory and "
+ "start mysqld manually.",
+ rdb_corruption_marker_file_name().c_str());
exit(0);
}
}
@@ -4813,8 +5191,10 @@ static int rocksdb_init_func(void *const p) {
init_rocksdb_psi_keys();
rocksdb_hton = (handlerton *)p;
- mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &rdb_open_tables.m_mutex,
- MY_MUTEX_INIT_FAST);
+
+ rdb_open_tables.init();
+ Ensure_cleanup rdb_open_tables_cleanup([]() { rdb_open_tables.free(); });
+
#ifdef HAVE_PSI_INTERFACE
rdb_bg_thread.init(rdb_signal_bg_psi_mutex_key, rdb_signal_bg_psi_cond_key);
rdb_drop_idx_thread.init(rdb_signal_drop_idx_psi_mutex_key,
@@ -4885,6 +5265,8 @@ static int rocksdb_init_func(void *const p) {
/*
Not needed in MariaDB:
rocksdb_hton->flush_logs = rocksdb_flush_wal;
+ rocksdb_hton->handle_single_table_select = rocksdb_handle_single_table_select;
+
*/
rocksdb_hton->flags = HTON_TEMPORARY_NOT_SUPPORTED |
@@ -4894,16 +5276,23 @@ static int rocksdb_init_func(void *const p) {
DBUG_ASSERT(!mysqld_embedded);
if (rocksdb_db_options->max_open_files > (long)open_files_limit) {
- sql_print_information("RocksDB: rocksdb_max_open_files should not be "
- "greater than the open_files_limit, effective value "
- "of rocksdb_max_open_files is being set to "
- "open_files_limit / 2.");
+ // NO_LINT_DEBUG
+ sql_print_information(
+ "RocksDB: rocksdb_max_open_files should not be "
+ "greater than the open_files_limit, effective value "
+ "of rocksdb_max_open_files is being set to "
+ "open_files_limit / 2.");
rocksdb_db_options->max_open_files = open_files_limit / 2;
} else if (rocksdb_db_options->max_open_files == -2) {
rocksdb_db_options->max_open_files = open_files_limit / 2;
}
+ rdb_read_free_regex_handler.set_patterns(DEFAULT_READ_FREE_RPL_TABLES);
+
rocksdb_stats = rocksdb::CreateDBStatistics();
+ rocksdb_stats->set_stats_level(
+ static_cast<rocksdb::StatsLevel>(rocksdb_stats_level));
+ rocksdb_stats_level = rocksdb_stats->get_stats_level();
rocksdb_db_options->statistics = rocksdb_stats;
if (rocksdb_rate_limiter_bytes_per_sec != 0) {
@@ -4937,13 +5326,15 @@ static int rocksdb_init_func(void *const p) {
rocksdb_db_options->use_direct_reads) {
// allow_mmap_reads implies !use_direct_reads and RocksDB will not open if
// mmap_reads and direct_reads are both on. (NO_LINT_DEBUG)
- sql_print_error("RocksDB: Can't enable both use_direct_reads "
- "and allow_mmap_reads\n");
+ sql_print_error(
+ "RocksDB: Can't enable both use_direct_reads "
+ "and allow_mmap_reads\n");
DBUG_RETURN(HA_EXIT_FAILURE);
}
// Check whether the filesystem backing rocksdb_datadir allows O_DIRECT
- if (rocksdb_db_options->use_direct_reads) {
+ if (rocksdb_db_options->use_direct_reads ||
+ rocksdb_db_options->use_direct_io_for_flush_and_compaction) {
rocksdb::EnvOptions soptions;
rocksdb::Status check_status;
rocksdb::Env *const env = rocksdb_db_options->env;
@@ -4964,9 +5355,11 @@ static int rocksdb_init_func(void *const p) {
}
if (!check_status.ok()) {
- sql_print_error("RocksDB: Unable to use direct io in rocksdb-datadir:"
- "(%s)", check_status.getState());
- rdb_open_tables.free_hash();
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Unable to use direct io in rocksdb-datadir:"
+ "(%s)",
+ check_status.getState());
DBUG_RETURN(HA_EXIT_FAILURE);
}
}
@@ -4974,17 +5367,19 @@ static int rocksdb_init_func(void *const p) {
if (rocksdb_db_options->allow_mmap_writes &&
rocksdb_db_options->use_direct_io_for_flush_and_compaction) {
// See above comment for allow_mmap_reads. (NO_LINT_DEBUG)
- sql_print_error("RocksDB: Can't enable both "
- "use_direct_io_for_flush_and_compaction and "
- "allow_mmap_writes\n");
+ sql_print_error(
+ "RocksDB: Can't enable both "
+ "use_direct_io_for_flush_and_compaction and "
+ "allow_mmap_writes\n");
DBUG_RETURN(HA_EXIT_FAILURE);
}
if (rocksdb_db_options->allow_mmap_writes &&
rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER) {
// NO_LINT_DEBUG
- sql_print_error("RocksDB: rocksdb_flush_log_at_trx_commit needs to be 0 "
- "to use allow_mmap_writes");
+ sql_print_error(
+ "RocksDB: rocksdb_flush_log_at_trx_commit needs to be 0 "
+ "to use allow_mmap_writes");
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -5011,15 +5406,19 @@ static int rocksdb_init_func(void *const p) {
#endif
) {
sql_print_information("RocksDB: Got ENOENT when listing column families");
+
+ // NO_LINT_DEBUG
sql_print_information(
"RocksDB: assuming that we're creating a new database");
} else {
rdb_log_status_error(status, "Error listing column families");
DBUG_RETURN(HA_EXIT_FAILURE);
}
- } else
+ } else {
+ // NO_LINT_DEBUG
sql_print_information("RocksDB: %ld column families found",
cf_names.size());
+ }
std::vector<rocksdb::ColumnFamilyDescriptor> cf_descr;
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
@@ -5028,9 +5427,33 @@ static int rocksdb_init_func(void *const p) {
(rocksdb::BlockBasedTableOptions::IndexType)rocksdb_index_type;
if (!rocksdb_tbl_options->no_block_cache) {
- std::shared_ptr<rocksdb::Cache> block_cache = rocksdb_use_clock_cache
- ? rocksdb::NewClockCache(rocksdb_block_cache_size)
- : rocksdb::NewLRUCache(rocksdb_block_cache_size);
+ std::shared_ptr<rocksdb::MemoryAllocator> memory_allocator;
+ if (!rocksdb_cache_dump) {
+ size_t block_size = rocksdb_tbl_options->block_size;
+ rocksdb::JemallocAllocatorOptions alloc_opt;
+ // Limit jemalloc tcache memory usage. The range
+ // [block_size/4, block_size] should be enough to cover most of
+ // block cache allocation sizes.
+ alloc_opt.limit_tcache_size = true;
+ alloc_opt.tcache_size_lower_bound = block_size / 4;
+ alloc_opt.tcache_size_upper_bound = block_size;
+ rocksdb::Status new_alloc_status =
+ rocksdb::NewJemallocNodumpAllocator(alloc_opt, &memory_allocator);
+ if (!new_alloc_status.ok()) {
+ // Fallback to use default malloc/free.
+ rdb_log_status_error(new_alloc_status,
+ "Error excluding block cache from core dump");
+ memory_allocator = nullptr;
+ DBUG_RETURN(HA_EXIT_FAILURE);
+ }
+ }
+ std::shared_ptr<rocksdb::Cache> block_cache =
+ rocksdb_use_clock_cache
+ ? rocksdb::NewClockCache(rocksdb_block_cache_size)
+ : rocksdb::NewLRUCache(
+ rocksdb_block_cache_size, -1 /*num_shard_bits*/,
+ false /*strict_capcity_limit*/,
+ rocksdb_cache_high_pri_pool_ratio, memory_allocator);
if (rocksdb_sim_cache_size > 0) {
// Simulated cache enabled
// Wrap block cache inside a simulated cache and pass it to RocksDB
@@ -5065,7 +5488,7 @@ static int rocksdb_init_func(void *const p) {
if (rocksdb_persistent_cache_size_mb > 0) {
std::shared_ptr<rocksdb::PersistentCache> pcache;
- uint64_t cache_size_bytes= rocksdb_persistent_cache_size_mb * 1024 * 1024;
+ uint64_t cache_size_bytes = rocksdb_persistent_cache_size_mb * 1024 * 1024;
status = rocksdb::NewPersistentCache(
rocksdb::Env::Default(), std::string(rocksdb_persistent_cache_path),
cache_size_bytes, myrocks_logger, true, &pcache);
@@ -5077,6 +5500,7 @@ static int rocksdb_init_func(void *const p) {
}
rocksdb_tbl_options->persistent_cache = pcache;
} else if (strlen(rocksdb_persistent_cache_path)) {
+ // NO_LINT_DEBUG
sql_print_error("RocksDB: Must specify rocksdb_persistent_cache_size_mb");
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -5094,17 +5518,23 @@ static int rocksdb_init_func(void *const p) {
If there are no column families, we're creating the new database.
Create one column family named "default".
*/
- if (cf_names.size() == 0)
- cf_names.push_back(DEFAULT_CF_NAME);
+ if (cf_names.size() == 0) cf_names.push_back(DEFAULT_CF_NAME);
std::vector<int> compaction_enabled_cf_indices;
+
+ // NO_LINT_DEBUG
sql_print_information("RocksDB: Column Families at start:");
for (size_t i = 0; i < cf_names.size(); ++i) {
rocksdb::ColumnFamilyOptions opts;
cf_options_map->get_cf_options(cf_names[i], &opts);
+ // NO_LINT_DEBUG
sql_print_information(" cf=%s", cf_names[i].c_str());
+
+ // NO_LINT_DEBUG
sql_print_information(" write_buffer_size=%ld", opts.write_buffer_size);
+
+ // NO_LINT_DEBUG
sql_print_information(" target_file_size_base=%" PRIu64,
opts.target_file_size_base);
@@ -5185,25 +5615,27 @@ static int rocksdb_init_func(void *const p) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
- auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME
-#ifdef HAVE_PSI_INTERFACE
- ,
- rdb_background_psi_thread_key
+#ifndef HAVE_PSI_INTERFACE
+ auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME);
+#else
+ auto err = rdb_bg_thread.create_thread(BG_THREAD_NAME,
+ rdb_background_psi_thread_key);
#endif
- );
if (err != 0) {
+ // NO_LINT_DEBUG
sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)",
err);
DBUG_RETURN(HA_EXIT_FAILURE);
}
- err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME
-#ifdef HAVE_PSI_INTERFACE
- ,
- rdb_drop_idx_psi_thread_key
+#ifndef HAVE_PSI_INTERFACE
+ err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME);
+#else
+ err = rdb_drop_idx_thread.create_thread(INDEX_THREAD_NAME,
+ rdb_drop_idx_psi_thread_key);
#endif
- );
if (err != 0) {
+ // NO_LINT_DEBUG
sql_print_error("RocksDB: Couldn't start the drop index thread: (errno=%d)",
err);
DBUG_RETURN(HA_EXIT_FAILURE);
@@ -5220,7 +5652,6 @@ static int rocksdb_init_func(void *const p) {
sql_print_error(
"RocksDB: Couldn't start the manual compaction thread: (errno=%d)",
err);
- rdb_open_tables.free_hash();
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -5254,7 +5685,6 @@ static int rocksdb_init_func(void *const p) {
if (err != 0) {
// NO_LINT_DEBUG
sql_print_error("RocksDB: Couldn't initialize error messages");
- rdb_open_tables.m_hash.~Rdb_table_set();
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -5277,13 +5707,17 @@ static int rocksdb_init_func(void *const p) {
}
#if !defined(_WIN32) && !defined(__APPLE__)
- io_watchdog = new Rdb_io_watchdog(directories);
+ io_watchdog = new Rdb_io_watchdog(std::move(directories));
io_watchdog->reset_timeout(rocksdb_io_write_timeout_secs);
#endif
// NO_LINT_DEBUG
- sql_print_information("MyRocks storage engine plugin has been successfully "
- "initialized.");
+ sql_print_information(
+ "MyRocks storage engine plugin has been successfully "
+ "initialized.");
+
+ // Skip cleaning up rdb_open_tables as we've succeeded
+ rdb_open_tables_cleanup.skip();
DBUG_RETURN(HA_EXIT_SUCCESS);
}
@@ -5340,18 +5774,18 @@ static int rocksdb_done_func(void *const p) {
"RocksDB: Couldn't stop the manual compaction thread: (errno=%d)", err);
}
- if (rdb_open_tables.m_hash.size()) {
+ if (rdb_open_tables.count()) {
// Looks like we are getting unloaded and yet we have some open tables
// left behind.
error = 1;
}
+ rdb_open_tables.free();
/*
destructors for static objects can be called at _exit(),
but we want to free the memory at dlclose()
*/
- rdb_open_tables.m_hash.~Rdb_table_set();
- mysql_mutex_destroy(&rdb_open_tables.m_mutex);
+ // MARIADB_MERGE_2019: rdb_open_tables.m_hash.~Rdb_table_set();
mysql_mutex_destroy(&rdb_sysvars_mutex);
mysql_mutex_destroy(&rdb_block_cache_resize_mutex);
@@ -5436,7 +5870,7 @@ static inline void rocksdb_smart_next(bool seek_backward,
}
}
-#ifndef NDEBUG
+#ifndef DBUG_OFF
// simulate that RocksDB has reported corrupted data
static void dbug_change_status_to_corrupted(rocksdb::Status *status) {
*status = rocksdb::Status::Corruption();
@@ -5471,39 +5905,39 @@ static inline bool is_valid(rocksdb::Iterator *scan_it) {
they are needed to function.
*/
-Rdb_table_handler *
-Rdb_open_tables_map::get_table_handler(const char *const table_name) {
+Rdb_table_handler *Rdb_open_tables_map::get_table_handler(
+ const char *const table_name) {
+ DBUG_ASSERT(table_name != nullptr);
+
Rdb_table_handler *table_handler;
- uint length;
- char *tmp_name;
- DBUG_ASSERT(table_name != nullptr);
- length = (uint)strlen(table_name);
+ std::string table_name_str(table_name);
// First, look up the table in the hash map.
RDB_MUTEX_LOCK_CHECK(m_mutex);
- if (!m_hash.size() || !(table_handler = m_hash.find(table_name, length))) {
+ const auto it = m_table_map.find(table_name_str);
+ if (it != m_table_map.end()) {
+ // Found it
+ table_handler = it->second;
+ } else {
+ char *tmp_name;
+
// Since we did not find it in the hash map, attempt to create and add it
// to the hash map.
if (!(table_handler = reinterpret_cast<Rdb_table_handler *>(my_multi_malloc(
MYF(MY_WME | MY_ZEROFILL), &table_handler, sizeof(*table_handler),
- &tmp_name, length + 1, NullS)))) {
+ &tmp_name, table_name_str.length() + 1, NullS)))) {
// Allocating a new Rdb_table_handler and a new table name failed.
RDB_MUTEX_UNLOCK_CHECK(m_mutex);
return nullptr;
}
table_handler->m_ref_count = 0;
- table_handler->m_table_name_length = length;
+ table_handler->m_table_name_length = table_name_str.length();
table_handler->m_table_name = tmp_name;
strmov(table_handler->m_table_name, table_name);
- if (m_hash.insert(table_handler)) {
- // Inserting into the hash map failed.
- RDB_MUTEX_UNLOCK_CHECK(m_mutex);
- my_free(table_handler);
- return nullptr;
- }
+ m_table_map.emplace(table_name_str, table_handler);
thr_lock_init(&table_handler->m_thr_lock);
#ifdef MARIAROCKS_NOT_YET
@@ -5524,16 +5958,15 @@ std::vector<std::string> rdb_get_open_table_names(void) {
}
std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const {
- size_t i;
const Rdb_table_handler *table_handler;
std::vector<std::string> names;
RDB_MUTEX_LOCK_CHECK(m_mutex);
- for (i = 0; (table_handler = m_hash.at(i)); i++) {
+ for (const auto &kv : m_table_map) {
+ table_handler = kv.second;
DBUG_ASSERT(table_handler != nullptr);
names.push_back(table_handler->m_table_name);
}
- DBUG_ASSERT(i == m_hash.size());
RDB_MUTEX_UNLOCK_CHECK(m_mutex);
return names;
@@ -5546,44 +5979,44 @@ std::vector<std::string> Rdb_open_tables_map::get_table_names(void) const {
static ulonglong rdb_get_int_col_max_value(const Field *field) {
ulonglong max_value = 0;
switch (field->key_type()) {
- case HA_KEYTYPE_BINARY:
- max_value = 0xFFULL;
- break;
- case HA_KEYTYPE_INT8:
- max_value = 0x7FULL;
- break;
- case HA_KEYTYPE_USHORT_INT:
- max_value = 0xFFFFULL;
- break;
- case HA_KEYTYPE_SHORT_INT:
- max_value = 0x7FFFULL;
- break;
- case HA_KEYTYPE_UINT24:
- max_value = 0xFFFFFFULL;
- break;
- case HA_KEYTYPE_INT24:
- max_value = 0x7FFFFFULL;
- break;
- case HA_KEYTYPE_ULONG_INT:
- max_value = 0xFFFFFFFFULL;
- break;
- case HA_KEYTYPE_LONG_INT:
- max_value = 0x7FFFFFFFULL;
- break;
- case HA_KEYTYPE_ULONGLONG:
- max_value = 0xFFFFFFFFFFFFFFFFULL;
- break;
- case HA_KEYTYPE_LONGLONG:
- max_value = 0x7FFFFFFFFFFFFFFFULL;
- break;
- case HA_KEYTYPE_FLOAT:
- max_value = 0x1000000ULL;
- break;
- case HA_KEYTYPE_DOUBLE:
- max_value = 0x20000000000000ULL;
- break;
- default:
- abort();
+ case HA_KEYTYPE_BINARY:
+ max_value = 0xFFULL;
+ break;
+ case HA_KEYTYPE_INT8:
+ max_value = 0x7FULL;
+ break;
+ case HA_KEYTYPE_USHORT_INT:
+ max_value = 0xFFFFULL;
+ break;
+ case HA_KEYTYPE_SHORT_INT:
+ max_value = 0x7FFFULL;
+ break;
+ case HA_KEYTYPE_UINT24:
+ max_value = 0xFFFFFFULL;
+ break;
+ case HA_KEYTYPE_INT24:
+ max_value = 0x7FFFFFULL;
+ break;
+ case HA_KEYTYPE_ULONG_INT:
+ max_value = 0xFFFFFFFFULL;
+ break;
+ case HA_KEYTYPE_LONG_INT:
+ max_value = 0x7FFFFFFFULL;
+ break;
+ case HA_KEYTYPE_ULONGLONG:
+ max_value = 0xFFFFFFFFFFFFFFFFULL;
+ break;
+ case HA_KEYTYPE_LONGLONG:
+ max_value = 0x7FFFFFFFFFFFFFFFULL;
+ break;
+ case HA_KEYTYPE_FLOAT:
+ max_value = 0x1000000ULL;
+ break;
+ case HA_KEYTYPE_DOUBLE:
+ max_value = 0x20000000000000ULL;
+ break;
+ default:
+ abort();
}
return max_value;
@@ -5592,7 +6025,7 @@ static ulonglong rdb_get_int_col_max_value(const Field *field) {
void ha_rocksdb::load_auto_incr_value() {
ulonglong auto_incr = 0;
bool validate_last = false, use_datadic = true;
-#ifndef NDEBUG
+#ifndef DBUG_OFF
DBUG_EXECUTE_IF("myrocks_autoinc_upgrade", use_datadic = false;);
validate_last = true;
#endif
@@ -5639,7 +6072,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() {
// appropriately and non-index-only lookup will not read the value)
const bool save_keyread_only = m_keyread_only;
m_keyread_only = true;
- m_key_requested = true;
+ m_converter->set_is_key_requested(true);
if (!index_last(table->record[0])) {
Field *field =
@@ -5651,7 +6084,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() {
if (last_val != max_val) {
last_val++;
}
-#ifndef NDEBUG
+#ifndef DBUG_OFF
ulonglong dd_val;
if (last_val <= max_val) {
const auto &gl_index_id = m_tbl_def->get_autoincr_gl_index_id();
@@ -5776,8 +6209,9 @@ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) {
// Get hidden primary key from old key slice
Rdb_string_reader reader(&rowkey_slice);
- if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE)))
+ if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE))) {
return HA_ERR_ROCKSDB_CORRUPT_DATA;
+ }
const int length= 8; /* was Field_longlong::PACK_LENGTH in FB MySQL tree */
const uchar *from = reinterpret_cast<const uchar *>(reader.read(length));
@@ -5805,8 +6239,9 @@ void Rdb_open_tables_map::release_table_handler(
DBUG_ASSERT(table_handler->m_ref_count > 0);
if (!--table_handler->m_ref_count) {
// Last reference was released. Tear down the hash entry.
- const auto ret MY_ATTRIBUTE((__unused__)) = m_hash.remove(table_handler);
- DBUG_ASSERT(!ret); // the hash entry must actually be found and deleted
+ const auto ret MY_ATTRIBUTE((__unused__)) =
+ m_table_map.erase(std::string(table_handler->m_table_name));
+ DBUG_ASSERT(ret == 1); // the hash entry must actually be found and deleted
my_core::thr_lock_delete(&table_handler->m_thr_lock);
my_free(table_handler);
}
@@ -5822,19 +6257,34 @@ static handler *rocksdb_create_handler(my_core::handlerton *const hton,
ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
my_core::TABLE_SHARE *const table_arg)
- : handler(hton, table_arg), m_table_handler(nullptr), m_scan_it(nullptr),
- m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr),
- m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr),
- m_tbl_def(nullptr), m_pk_descr(nullptr), m_key_descr_arr(nullptr),
- m_pk_can_be_decoded(false), m_maybe_unpack_info(false),
- m_pk_tuple(nullptr), m_pk_packed_tuple(nullptr),
- m_sk_packed_tuple(nullptr), m_end_key_packed_tuple(nullptr),
- m_sk_match_prefix(nullptr), m_sk_match_prefix_buf(nullptr),
- m_sk_packed_tuple_old(nullptr), m_dup_sk_packed_tuple(nullptr),
- m_dup_sk_packed_tuple_old(nullptr), m_pack_buffer(nullptr),
- m_lock_rows(RDB_LOCK_NONE), m_keyread_only(FALSE), m_encoder_arr(nullptr),
- m_row_checksums_checked(0), m_in_rpl_delete_rows(false),
- m_in_rpl_update_rows(false), m_force_skip_unique_check(false) {}
+ : handler(hton, table_arg),
+ m_table_handler(nullptr),
+ m_scan_it(nullptr),
+ m_scan_it_skips_bloom(false),
+ m_scan_it_snapshot(nullptr),
+ m_scan_it_lower_bound(nullptr),
+ m_scan_it_upper_bound(nullptr),
+ m_tbl_def(nullptr),
+ m_pk_descr(nullptr),
+ m_key_descr_arr(nullptr),
+ m_pk_can_be_decoded(false),
+ m_pk_tuple(nullptr),
+ m_pk_packed_tuple(nullptr),
+ m_sk_packed_tuple(nullptr),
+ m_end_key_packed_tuple(nullptr),
+ m_sk_match_prefix(nullptr),
+ m_sk_match_prefix_buf(nullptr),
+ m_sk_packed_tuple_old(nullptr),
+ m_dup_sk_packed_tuple(nullptr),
+ m_dup_sk_packed_tuple_old(nullptr),
+ m_pack_buffer(nullptr),
+ m_lock_rows(RDB_LOCK_NONE),
+ m_keyread_only(false),
+ m_insert_with_update(false),
+ m_dup_pk_found(false),
+ m_in_rpl_delete_rows(false),
+ m_in_rpl_update_rows(false),
+ m_force_skip_unique_check(false) {}
const std::string &ha_rocksdb::get_table_basename() const {
@@ -5853,9 +6303,9 @@ bool ha_rocksdb::init_with_fields() {
if (pk != MAX_KEY) {
const uint key_parts = table_share->key_info[pk].user_defined_key_parts;
check_keyread_allowed(pk /*PK*/, key_parts - 1, true);
- } else
+ } else {
m_pk_can_be_decoded = false;
-
+ }
cached_table_flags = table_flags();
DBUG_RETURN(false); /* Ok */
@@ -5912,298 +6362,52 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd,
RDB_MAX_HEXDUMP_LEN);
const GL_INDEX_ID gl_index_id = kd.get_gl_index_id();
// NO_LINT_DEBUG
- sql_print_error("Decoding ttl from PK value failed, "
- "for index (%u,%u), val: %s",
- gl_index_id.cf_id, gl_index_id.index_id, buf.c_str());
+ sql_print_error(
+ "Decoding ttl from PK value failed, "
+ "for index (%u,%u), val: %s",
+ gl_index_id.cf_id, gl_index_id.index_id, buf.c_str());
DBUG_ASSERT(0);
return false;
}
/* Hide record if it has expired before the current snapshot time. */
uint64 read_filter_ts = 0;
-#ifndef NDEBUG
+#ifndef DBUG_OFF
read_filter_ts += rdb_dbug_set_ttl_read_filter_ts();
#endif
bool is_hide_ttl =
ts + kd.m_ttl_duration + read_filter_ts <= static_cast<uint64>(curr_ts);
if (is_hide_ttl) {
update_row_stats(ROWS_FILTERED);
+
+ /* increment examined row count when rows are skipped */
+ THD *thd = ha_thd();
+ thd->inc_examined_row_count(1);
+ DEBUG_SYNC(thd, "rocksdb.ttl_rows_examined");
}
return is_hide_ttl;
}
-void ha_rocksdb::rocksdb_skip_expired_records(const Rdb_key_def &kd,
- rocksdb::Iterator *const iter,
- bool seek_backward) {
+int ha_rocksdb::rocksdb_skip_expired_records(const Rdb_key_def &kd,
+ rocksdb::Iterator *const iter,
+ bool seek_backward) {
if (kd.has_ttl()) {
+ THD *thd = ha_thd();
while (iter->Valid() &&
should_hide_ttl_rec(
kd, iter->value(),
get_or_create_tx(table->in_use)->m_snapshot_timestamp)) {
- rocksdb_smart_next(seek_backward, iter);
- }
- }
-}
-
-/**
- Convert record from table->record[0] form into a form that can be written
- into rocksdb.
-
- @param pk_packed_slice Packed PK tuple. We need it in order to compute
- and store its CRC.
- @param packed_rec OUT Data slice with record data.
-*/
-
-int ha_rocksdb::convert_record_to_storage_format(
- const struct update_row_info &row_info, rocksdb::Slice *const packed_rec) {
- DBUG_ASSERT_IMP(m_maybe_unpack_info, row_info.new_pk_unpack_info);
- DBUG_ASSERT(m_pk_descr != nullptr);
-
- const rocksdb::Slice &pk_packed_slice = row_info.new_pk_slice;
- Rdb_string_writer *const pk_unpack_info = row_info.new_pk_unpack_info;
- bool has_ttl = m_pk_descr->has_ttl();
- bool has_ttl_column = !m_pk_descr->m_ttl_column.empty();
- bool ttl_in_pk = has_ttl_column && (row_info.ttl_pk_offset != UINT_MAX);
-
- m_storage_record.length(0);
-
- if (has_ttl) {
- /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
- m_storage_record.fill(ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_in_rec, 0);
- m_ttl_bytes_updated = false;
-
- /*
- If the TTL is contained within the key, we use the offset to find the
- TTL value and place it in the beginning of the value record.
- */
- if (ttl_in_pk) {
- Rdb_string_reader reader(&pk_packed_slice);
- const char *ts;
- if (!reader.read(row_info.ttl_pk_offset) ||
- !(ts = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) {
- std::string buf;
- buf = rdb_hexdump(pk_packed_slice.data(), pk_packed_slice.size(),
- RDB_MAX_HEXDUMP_LEN);
- const GL_INDEX_ID gl_index_id = m_pk_descr->get_gl_index_id();
- // NO_LINT_DEBUG
- sql_print_error("Decoding ttl from PK failed during insert, "
- "for index (%u,%u), key: %s",
- gl_index_id.cf_id, gl_index_id.index_id, buf.c_str());
- return HA_EXIT_FAILURE;
- }
-
- char *const data = const_cast<char *>(m_storage_record.ptr());
- memcpy(data, ts, ROCKSDB_SIZEOF_TTL_RECORD);
-#ifndef NDEBUG
- // Adjust for test case if needed
- rdb_netbuf_store_uint64(
- reinterpret_cast<uchar *>(data),
- rdb_netbuf_to_uint64(reinterpret_cast<const uchar *>(data)) +
- rdb_dbug_set_ttl_rec_ts());
-#endif
- // Also store in m_ttl_bytes to propagate to update_sk
- memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
- } else if (!has_ttl_column) {
- /*
- For implicitly generated TTL records we need to copy over the old
- TTL value from the old record in the event of an update. It was stored
- in m_ttl_bytes.
-
- Otherwise, generate a timestamp using the current time.
- */
- if (!row_info.old_pk_slice.empty()) {
- char *const data = const_cast<char *>(m_storage_record.ptr());
- memcpy(data, m_ttl_bytes, sizeof(uint64));
- } else {
- uint64 ts = static_cast<uint64>(std::time(nullptr));
-#ifndef NDEBUG
- ts += rdb_dbug_set_ttl_rec_ts();
-#endif
- char *const data = const_cast<char *>(m_storage_record.ptr());
- rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
- // Also store in m_ttl_bytes to propagate to update_sk
- memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
- }
- }
- } else {
- /* All NULL bits are initially 0 */
- m_storage_record.fill(m_null_bytes_in_rec, 0);
- }
-
- // If a primary key may have non-empty unpack_info for certain values,
- // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
- // itself was prepared in Rdb_key_def::pack_record.
- if (m_maybe_unpack_info) {
- m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
- pk_unpack_info->get_current_pos());
- }
-
- for (uint i = 0; i < table->s->fields; i++) {
- /* Don't pack decodable PK key parts */
- if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
- continue;
- }
-
- Field *const field = table->field[i];
- if (m_encoder_arr[i].maybe_null()) {
- char *data = const_cast<char *>(m_storage_record.ptr());
- if (has_ttl) {
- data += ROCKSDB_SIZEOF_TTL_RECORD;
- }
-
- if (field->is_null()) {
- data[m_encoder_arr[i].m_null_offset] |= m_encoder_arr[i].m_null_mask;
- /* Don't write anything for NULL values */
- continue;
- }
- }
-
- if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB) {
- my_core::Field_blob *blob = (my_core::Field_blob *)field;
- /* Get the number of bytes needed to store length*/
- const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
-
- /* Store the length of the value */
- m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
- length_bytes);
-
- /* Store the blob value itself */
- char *data_ptr;
- memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
- m_storage_record.append(data_ptr, blob->get_length());
- } else if (m_encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) {
- Field_varstring *const field_var = (Field_varstring *)field;
- uint data_len;
- /* field_var->length_bytes is 1 or 2 */
- if (field_var->length_bytes == 1) {
- data_len = field_var->ptr[0];
- } else {
- DBUG_ASSERT(field_var->length_bytes == 2);
- data_len = uint2korr(field_var->ptr);
- }
- m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
- field_var->length_bytes + data_len);
- } else {
- /* Copy the field data */
- const uint len = field->pack_length_in_rec();
- m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
-
- /*
- Check if this is the TTL field within the table, if so store the TTL
- in the front of the record as well here.
- */
- if (has_ttl && has_ttl_column &&
- i == m_pk_descr->get_ttl_field_offset()) {
- DBUG_ASSERT(len == ROCKSDB_SIZEOF_TTL_RECORD);
- DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
- DBUG_ASSERT(m_pk_descr->get_ttl_field_offset() != UINT_MAX);
-
- char *const data = const_cast<char *>(m_storage_record.ptr());
- uint64 ts = uint8korr(field->ptr);
-#ifndef NDEBUG
- ts += rdb_dbug_set_ttl_rec_ts();
-#endif
- rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
-
- // If this is an update and the timestamp has been updated, take note
- // so we can avoid updating SKs unnecessarily.
- if (!row_info.old_pk_slice.empty()) {
- m_ttl_bytes_updated =
- memcmp(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
- }
- // Store timestamp in m_ttl_bytes to propagate to update_sk
- memcpy(m_ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
+ DEBUG_SYNC(thd, "rocksdb.check_flags_ser");
+ if (thd && thd->killed) {
+ return HA_ERR_QUERY_INTERRUPTED;
}
+ rocksdb_smart_next(seek_backward, iter);
}
}
-
- if (should_store_row_debug_checksums()) {
- const uint32_t key_crc32 = my_core::crc32(
- 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
- const uint32_t val_crc32 =
- my_core::crc32(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
- m_storage_record.length());
- uchar key_crc_buf[RDB_CHECKSUM_SIZE];
- uchar val_crc_buf[RDB_CHECKSUM_SIZE];
- rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
- rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
- m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
- m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
- m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
- }
-
- *packed_rec =
- rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
-
return HA_EXIT_SUCCESS;
}
-/*
- @brief
- Setup which fields will be unpacked when reading rows
-
- @detail
- Three special cases when we still unpack all fields:
- - When this table is being updated (m_lock_rows==RDB_LOCK_WRITE).
- - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
- read all fields to find whether there is a row checksum at the end. We could
- skip the fields instead of decoding them, but currently we do decoding.)
- - On index merge as bitmap is cleared during that operation
-
- @seealso
- ha_rocksdb::setup_field_converters()
- ha_rocksdb::convert_record_from_storage_format()
-*/
-void ha_rocksdb::setup_read_decoders() {
- m_decoders_vect.clear();
- m_key_requested = false;
-
- int last_useful = 0;
- int skip_size = 0;
-
- for (uint i = 0; i < table->s->fields; i++) {
- // bitmap is cleared on index merge, but it still needs to decode columns
- const bool field_requested =
- m_lock_rows == RDB_LOCK_WRITE || m_verify_row_debug_checksums ||
- bitmap_is_clear_all(table->read_set) ||
- bitmap_is_set(table->read_set, table->field[i]->field_index);
-
- // We only need the decoder if the whole record is stored.
- if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
- // the field potentially needs unpacking
- if (field_requested) {
- // the field is in the read set
- m_key_requested = true;
- }
- continue;
- }
-
- if (field_requested) {
- // We will need to decode this field
- m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
- last_useful = m_decoders_vect.size();
- skip_size = 0;
- } else {
- if (m_encoder_arr[i].uses_variable_len_encoding() ||
- m_encoder_arr[i].maybe_null()) {
- // For variable-length field, we need to read the data and skip it
- m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
- skip_size = 0;
- } else {
- // Fixed-width field can be skipped without looking at it.
- // Add appropriate skip_size to the next field.
- skip_size += m_encoder_arr[i].m_pack_length_in_rec;
- }
- }
- }
-
- // It could be that the last few elements are varchars that just do
- // skipping. Remove them.
- m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
- m_decoders_vect.end());
-}
-
-#ifndef NDEBUG
+#ifndef DBUG_OFF
void dbug_append_garbage_at_end(rocksdb::PinnableSlice *on_disk_rec) {
std::string str(on_disk_rec->data(), on_disk_rec->size());
on_disk_rec->Reset();
@@ -6228,17 +6432,6 @@ void dbug_modify_rec_varchar12(rocksdb::PinnableSlice *on_disk_rec) {
on_disk_rec->PinSelf(rocksdb::Slice(res));
}
-void dbug_modify_key_varchar8(String &on_disk_rec) {
- std::string res;
- // The key starts with index number
- res.append(on_disk_rec.ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
-
- // Then, a mem-comparable form of a varchar(8) value.
- res.append("ABCDE\0\0\0\xFC", 9);
- on_disk_rec.length(0);
- on_disk_rec.append(res.data(), res.size());
-}
-
void dbug_create_err_inplace_alter() {
my_printf_error(ER_UNKNOWN_ERROR,
"Intentional failure in inplace alter occurred.", MYF(0));
@@ -6247,7 +6440,6 @@ void dbug_create_err_inplace_alter() {
int ha_rocksdb::convert_record_from_storage_format(
const rocksdb::Slice *const key, uchar *const buf) {
-
DBUG_EXECUTE_IF("myrocks_simulate_bad_row_read1",
dbug_append_garbage_at_end(&m_retrieved_record););
DBUG_EXECUTE_IF("myrocks_simulate_bad_row_read2",
@@ -6258,91 +6450,6 @@ int ha_rocksdb::convert_record_from_storage_format(
return convert_record_from_storage_format(key, &m_retrieved_record, buf);
}
-int ha_rocksdb::convert_blob_from_storage_format(
- my_core::Field_blob *const blob,
- Rdb_string_reader *const reader,
- bool decode)
-{
- /* Get the number of bytes needed to store length*/
- const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
-
- const char *data_len_str;
- if (!(data_len_str = reader->read(length_bytes))) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- memcpy(blob->ptr, data_len_str, length_bytes);
-
- const uint32 data_len = blob->get_length(
- reinterpret_cast<const uchar*>(data_len_str), length_bytes);
- const char *blob_ptr;
- if (!(blob_ptr = reader->read(data_len))) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- if (decode) {
- // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
- // platforms)
- memset(blob->ptr + length_bytes, 0, 8);
- memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
- }
-
- return HA_EXIT_SUCCESS;
-}
-
-int ha_rocksdb::convert_varchar_from_storage_format(
- my_core::Field_varstring *const field_var,
- Rdb_string_reader *const reader,
- bool decode)
-{
- const char *data_len_str;
- if (!(data_len_str = reader->read(field_var->length_bytes)))
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
-
- uint data_len;
- /* field_var->length_bytes is 1 or 2 */
- if (field_var->length_bytes == 1) {
- data_len = (uchar)data_len_str[0];
- } else {
- DBUG_ASSERT(field_var->length_bytes == 2);
- data_len = uint2korr(data_len_str);
- }
-
- if (data_len > field_var->field_length) {
- /* The data on disk is longer than table DDL allows? */
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- if (!reader->read(data_len)) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- if (decode) {
- memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
- }
-
- return HA_EXIT_SUCCESS;
-}
-
-int ha_rocksdb::convert_field_from_storage_format(
- my_core::Field *const field,
- Rdb_string_reader *const reader,
- bool decode,
- uint len)
-{
- const char *data_bytes;
- if (len > 0) {
- if ((data_bytes = reader->read(len)) == nullptr) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- if (decode)
- memcpy(field->ptr, data_bytes, len);
- }
-
- return HA_EXIT_SUCCESS;
-}
-
/*
@brief
Unpack the record in this->m_retrieved_record and this->m_last_rowkey from
@@ -6359,8 +6466,8 @@ int ha_rocksdb::convert_field_from_storage_format(
m_retrieved_record).
@seealso
- ha_rocksdb::setup_read_decoders() Sets up data structures which tell which
- columns to decode.
+ rdb_converter::setup_read_decoders() Sets up data structures which tell
+ which columns to decode.
@return
0 OK
@@ -6370,241 +6477,7 @@ int ha_rocksdb::convert_field_from_storage_format(
int ha_rocksdb::convert_record_from_storage_format(
const rocksdb::Slice *const key, const rocksdb::Slice *const value,
uchar *const buf) {
- Rdb_string_reader reader(value);
-
- /*
- Decode PK fields from the key
- */
- DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
- dbug_modify_key_varchar8(m_last_rowkey););
-
- const rocksdb::Slice rowkey_slice(m_last_rowkey.ptr(),
- m_last_rowkey.length());
- const char *unpack_info = nullptr;
- uint16 unpack_info_len = 0;
- rocksdb::Slice unpack_slice;
-
- /* If it's a TTL record, skip the 8 byte TTL value */
- const char *ttl_bytes;
- if (m_pk_descr->has_ttl()) {
- if ((ttl_bytes = reader.read(ROCKSDB_SIZEOF_TTL_RECORD))) {
- memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
- } else {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
- }
-
- /* Other fields are decoded from the value */
- const char *null_bytes = nullptr;
- if (m_null_bytes_in_rec && !(null_bytes = reader.read(m_null_bytes_in_rec))) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- if (m_maybe_unpack_info) {
- unpack_info = reader.get_current_ptr();
- if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
- !reader.read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- unpack_info_len =
- rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
- unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
-
- reader.read(unpack_info_len -
- Rdb_key_def::get_unpack_header_size(unpack_info[0]));
- }
-
- int err = HA_EXIT_SUCCESS;
- if (m_key_requested) {
- err = m_pk_descr->unpack_record(table, buf, &rowkey_slice,
- unpack_info ? &unpack_slice : nullptr,
- false /* verify_checksum */);
- }
-
- if (err != HA_EXIT_SUCCESS) {
- return err;
- }
-
- for (auto it = m_decoders_vect.begin(); it != m_decoders_vect.end(); it++) {
- const Rdb_field_encoder *const field_dec = it->m_field_enc;
- const bool decode = it->m_decode;
- const bool isNull =
- field_dec->maybe_null() &&
- ((null_bytes[field_dec->m_null_offset] & field_dec->m_null_mask) != 0);
-
- Field *const field = table->field[field_dec->m_field_index];
-
- /* Skip the bytes we need to skip */
- if (it->m_skip && !reader.read(it->m_skip)) {
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- uint field_offset = field->ptr - table->record[0];
- uint null_offset = field->null_offset();
- bool maybe_null = field->real_maybe_null();
- field->move_field(buf + field_offset,
- maybe_null ? buf + null_offset : nullptr,
- field->null_bit);
- // WARNING! - Don't return before restoring field->ptr and field->null_ptr!
-
- if (isNull) {
- if (decode) {
- /* This sets the NULL-bit of this record */
- field->set_null();
- /*
- Besides that, set the field value to default value. CHECKSUM TABLE
- depends on this.
- */
- memcpy(field->ptr, table->s->default_values + field_offset,
- field->pack_length());
- }
- } else {
- if (decode) {
- field->set_notnull();
- }
-
- if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
- err = convert_blob_from_storage_format(
- (my_core::Field_blob *) field, &reader, decode);
- } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
- err = convert_varchar_from_storage_format(
- (my_core::Field_varstring *) field, &reader, decode);
- } else {
- err = convert_field_from_storage_format(
- field, &reader, decode, field_dec->m_pack_length_in_rec);
- }
- }
-
- // Restore field->ptr and field->null_ptr
- field->move_field(table->record[0] + field_offset,
- maybe_null ? table->record[0] + null_offset : nullptr,
- field->null_bit);
-
- if (err != HA_EXIT_SUCCESS) {
- return err;
- }
- }
-
- if (m_verify_row_debug_checksums) {
- if (reader.remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
- reader.read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
- uint32_t stored_key_chksum =
- rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE));
- uint32_t stored_val_chksum =
- rdb_netbuf_to_uint32((const uchar *)reader.read(RDB_CHECKSUM_SIZE));
-
- const uint32_t computed_key_chksum =
- my_core::crc32(0, rdb_slice_to_uchar_ptr(key), key->size());
- const uint32_t computed_val_chksum =
- my_core::crc32(0, rdb_slice_to_uchar_ptr(value),
- value->size() - RDB_CHECKSUM_CHUNK_SIZE);
-
- DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1",
- stored_key_chksum++;);
-
- if (stored_key_chksum != computed_key_chksum) {
- m_pk_descr->report_checksum_mismatch(true, key->data(), key->size());
- return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
- }
-
- DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2",
- stored_val_chksum++;);
- if (stored_val_chksum != computed_val_chksum) {
- m_pk_descr->report_checksum_mismatch(false, value->data(),
- value->size());
- return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
- }
-
- m_row_checksums_checked++;
- }
- if (reader.remaining_bytes())
- return HA_ERR_ROCKSDB_CORRUPT_DATA;
- }
-
- return HA_EXIT_SUCCESS;
-}
-
-void ha_rocksdb::get_storage_type(Rdb_field_encoder *const encoder,
- const uint &kp) {
- // STORE_SOME uses unpack_info.
- if (m_pk_descr->has_unpack_info(kp)) {
- DBUG_ASSERT(m_pk_descr->can_unpack(kp));
- encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
- m_maybe_unpack_info = true;
- } else if (m_pk_descr->can_unpack(kp)) {
- encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
- }
-}
-
-/*
- Setup data needed to convert table->record[] to and from record storage
- format.
-
- @seealso
- ha_rocksdb::convert_record_to_storage_format,
- ha_rocksdb::convert_record_from_storage_format
-*/
-
-void ha_rocksdb::setup_field_converters() {
- uint i;
- uint null_bytes = 0;
- uchar cur_null_mask = 0x1;
-
- DBUG_ASSERT(m_encoder_arr == nullptr);
- m_encoder_arr = static_cast<Rdb_field_encoder *>(
- my_malloc(table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
- if (m_encoder_arr == nullptr) {
- return;
- }
-
- for (i = 0; i < table->s->fields; i++) {
- Field *const field = table->field[i];
- m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
-
- /*
- Check if this field is
- - a part of primary key, and
- - it can be decoded back from its key image.
- If both hold, we don't need to store this field in the value part of
- RocksDB's key-value pair.
-
- If hidden pk exists, we skip this check since the field will never be
- part of the hidden pk.
- */
- if (!has_hidden_pk(table)) {
- KEY *const pk_info = &table->key_info[table->s->primary_key];
- for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
- /* key_part->fieldnr is counted from 1 */
- if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
- get_storage_type(&m_encoder_arr[i], kp);
- break;
- }
- }
- }
-
- m_encoder_arr[i].m_field_type = field->real_type();
- m_encoder_arr[i].m_field_index = i;
- m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec();
-
- if (field->real_maybe_null()) {
- m_encoder_arr[i].m_null_mask = cur_null_mask;
- m_encoder_arr[i].m_null_offset = null_bytes;
- if (cur_null_mask == 0x80) {
- cur_null_mask = 0x1;
- null_bytes++;
- } else
- cur_null_mask = cur_null_mask << 1;
- } else {
- m_encoder_arr[i].m_null_mask = 0;
- }
- }
-
- /* Count the last, unfinished NULL-bits byte */
- if (cur_null_mask != 0x1)
- null_bytes++;
-
- m_null_bytes_in_rec = null_bytes;
+ return m_converter->decode(m_pk_descr, buf, key, value);
}
int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
@@ -6641,8 +6514,8 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
/* Sometimes, we may use m_sk_packed_tuple for storing packed PK */
max_packed_sk_len = pack_key_len;
for (uint i = 0; i < table_arg->s->keys; i++) {
- if (i == table_arg->s->primary_key) /* Primary key was processed above */
- continue;
+ /* Primary key was processed above */
+ if (i == table_arg->s->primary_key) continue;
// TODO: move this into get_table_handler() ??
kd_arr[i]->setup(table_arg, tbl_def_arg);
@@ -6731,25 +6604,10 @@ void ha_rocksdb::free_key_buffers() {
}
#ifdef MARIAROCKS_NOT_YET
-void ha_rocksdb::set_use_read_free_rpl(const char *const whitelist) {
- DBUG_ASSERT(whitelist != nullptr);
-
-#if defined(HAVE_PSI_INTERFACE)
- Regex_list_handler regex_handler(key_rwlock_read_free_rpl_tables);
-#else
- Regex_list_handler regex_handler;
-#endif
-
- if (!regex_handler.set_patterns(whitelist)) {
- warn_about_bad_patterns(&regex_handler, "read_free_rpl_tables");
- }
-
- m_use_read_free_rpl = regex_handler.matches(m_tbl_def->base_tablename());
-}
#endif
-
void ha_rocksdb::set_skip_unique_check_tables(const char *const whitelist) {
- DBUG_ASSERT(whitelist != nullptr);
+ const char *const wl =
+ whitelist ? whitelist : DEFAULT_SKIP_UNIQUE_CHECK_TABLES;
#if defined(HAVE_PSI_INTERFACE)
Regex_list_handler regex_handler(key_rwlock_skip_unique_check_tables);
@@ -6757,7 +6615,7 @@ void ha_rocksdb::set_skip_unique_check_tables(const char *const whitelist) {
Regex_list_handler regex_handler;
#endif
- if (!regex_handler.set_patterns(whitelist)) {
+ if (!regex_handler.set_patterns(wl)) {
warn_about_bad_patterns(&regex_handler, "skip_unique_check_tables");
}
@@ -6804,8 +6662,8 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) {
"dictionary");
DBUG_RETURN(HA_ERR_ROCKSDB_INVALID_TABLE);
}
- m_lock_rows = RDB_LOCK_NONE;
+ m_lock_rows = RDB_LOCK_NONE;
m_key_descr_arr = m_tbl_def->m_key_descr_arr;
/*
@@ -6834,7 +6692,15 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) {
*/
init_with_fields();
- setup_field_converters();
+ /* Initialize decoder */
+ m_converter = std::make_shared<Rdb_converter>(ha_thd(), m_tbl_def, table);
+
+ /*
+ Update m_ttl_bytes address to same as Rdb_converter's m_ttl_bytes.
+ Remove this code after moving convert_record_to_storage_format() into
+ Rdb_converter class.
+ */
+ m_ttl_bytes = m_converter->get_ttl_bytes_buffer();
/*
MariaDB: adjust field->part_of_key for PK columns. We can only do it here
@@ -6877,11 +6743,10 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) {
The following load_XXX code calls row decode functions, and they do
that without having done ::external_lock() or index_init()/rnd_init().
(Note: this also means we're doing a read when there was no
- setup_field_converters() call)
+ rdb_converter::setup_field_encoders() call)
Initialize the necessary variables for them:
*/
- m_verify_row_debug_checksums = false;
/* Load auto_increment value only once on first use. */
if (table->found_next_number_field && m_tbl_def->m_auto_incr_val == 0) {
@@ -6899,10 +6764,7 @@ int ha_rocksdb::open(const char *const name, int mode, uint test_if_locked) {
stats.block_size = rocksdb_tbl_options->block_size;
#ifdef MARIAROCKS_NOT_YET // MDEV-10976
- /* Determine at open whether we can use Read Free Replication or not */
- set_use_read_free_rpl(THDVAR(ha_thd(), read_free_rpl_tables));
#endif
-
/* Determine at open whether we should skip unique checks for this table */
set_skip_unique_check_tables(THDVAR(ha_thd(), skip_unique_check_tables));
@@ -6914,19 +6776,15 @@ int ha_rocksdb::close(void) {
m_pk_descr = nullptr;
m_key_descr_arr = nullptr;
-
+ m_converter = nullptr;
free_key_buffers();
- my_free(m_encoder_arr);
- m_encoder_arr = nullptr;
-
if (m_table_handler != nullptr) {
rdb_open_tables.release_table_handler(m_table_handler);
m_table_handler = nullptr;
}
// These are needed to suppress valgrind errors in rocksdb.partition
- m_storage_record.free();
m_last_rowkey.free();
m_sk_tails.free();
m_sk_tails_old.free();
@@ -7017,61 +6875,72 @@ int ha_rocksdb::rdb_error_to_mysql(const rocksdb::Status &s,
int err;
switch (s.code()) {
- case rocksdb::Status::Code::kOk:
- err = HA_EXIT_SUCCESS;
- break;
- case rocksdb::Status::Code::kNotFound:
- err = HA_ERR_ROCKSDB_STATUS_NOT_FOUND;
- break;
- case rocksdb::Status::Code::kCorruption:
- err = HA_ERR_ROCKSDB_STATUS_CORRUPTION;
- break;
- case rocksdb::Status::Code::kNotSupported:
- err = HA_ERR_ROCKSDB_STATUS_NOT_SUPPORTED;
- break;
- case rocksdb::Status::Code::kInvalidArgument:
- err = HA_ERR_ROCKSDB_STATUS_INVALID_ARGUMENT;
- break;
- case rocksdb::Status::Code::kIOError:
- err = (s.IsNoSpace()) ? HA_ERR_ROCKSDB_STATUS_NO_SPACE
- : HA_ERR_ROCKSDB_STATUS_IO_ERROR;
- break;
- case rocksdb::Status::Code::kMergeInProgress:
- err = HA_ERR_ROCKSDB_STATUS_MERGE_IN_PROGRESS;
- break;
- case rocksdb::Status::Code::kIncomplete:
- err = HA_ERR_ROCKSDB_STATUS_INCOMPLETE;
- break;
- case rocksdb::Status::Code::kShutdownInProgress:
- err = HA_ERR_ROCKSDB_STATUS_SHUTDOWN_IN_PROGRESS;
- break;
- case rocksdb::Status::Code::kTimedOut:
- err = HA_ERR_ROCKSDB_STATUS_TIMED_OUT;
- break;
- case rocksdb::Status::Code::kAborted:
- err = (s.IsLockLimit()) ? HA_ERR_ROCKSDB_STATUS_LOCK_LIMIT
- : HA_ERR_ROCKSDB_STATUS_ABORTED;
- break;
- case rocksdb::Status::Code::kBusy:
- err = (s.IsDeadlock()) ? HA_ERR_ROCKSDB_STATUS_DEADLOCK
- : HA_ERR_ROCKSDB_STATUS_BUSY;
- break;
- case rocksdb::Status::Code::kExpired:
- err = HA_ERR_ROCKSDB_STATUS_EXPIRED;
- break;
- case rocksdb::Status::Code::kTryAgain:
- err = HA_ERR_ROCKSDB_STATUS_TRY_AGAIN;
- break;
- default:
- DBUG_ASSERT(0);
- return -1;
+ case rocksdb::Status::Code::kOk:
+ err = HA_EXIT_SUCCESS;
+ break;
+ case rocksdb::Status::Code::kNotFound:
+ err = HA_ERR_ROCKSDB_STATUS_NOT_FOUND;
+ break;
+ case rocksdb::Status::Code::kCorruption:
+ err = HA_ERR_ROCKSDB_STATUS_CORRUPTION;
+ break;
+ case rocksdb::Status::Code::kNotSupported:
+ err = HA_ERR_ROCKSDB_STATUS_NOT_SUPPORTED;
+ break;
+ case rocksdb::Status::Code::kInvalidArgument:
+ err = HA_ERR_ROCKSDB_STATUS_INVALID_ARGUMENT;
+ break;
+ case rocksdb::Status::Code::kIOError:
+ err = (s.IsNoSpace()) ? HA_ERR_ROCKSDB_STATUS_NO_SPACE
+ : HA_ERR_ROCKSDB_STATUS_IO_ERROR;
+ break;
+ case rocksdb::Status::Code::kMergeInProgress:
+ err = HA_ERR_ROCKSDB_STATUS_MERGE_IN_PROGRESS;
+ break;
+ case rocksdb::Status::Code::kIncomplete:
+ err = HA_ERR_ROCKSDB_STATUS_INCOMPLETE;
+ break;
+ case rocksdb::Status::Code::kShutdownInProgress:
+ err = HA_ERR_ROCKSDB_STATUS_SHUTDOWN_IN_PROGRESS;
+ break;
+ case rocksdb::Status::Code::kTimedOut:
+ err = HA_ERR_ROCKSDB_STATUS_TIMED_OUT;
+ break;
+ case rocksdb::Status::Code::kAborted:
+ err = (s.IsLockLimit()) ? HA_ERR_ROCKSDB_STATUS_LOCK_LIMIT
+ : HA_ERR_ROCKSDB_STATUS_ABORTED;
+ break;
+ case rocksdb::Status::Code::kBusy:
+ err = (s.IsDeadlock()) ? HA_ERR_ROCKSDB_STATUS_DEADLOCK
+ : HA_ERR_ROCKSDB_STATUS_BUSY;
+ break;
+ case rocksdb::Status::Code::kExpired:
+ err = HA_ERR_ROCKSDB_STATUS_EXPIRED;
+ break;
+ case rocksdb::Status::Code::kTryAgain:
+ err = HA_ERR_ROCKSDB_STATUS_TRY_AGAIN;
+ break;
+ default:
+ DBUG_ASSERT(0);
+ return -1;
+ }
+
+ std::string errMsg;
+ if (s.IsLockLimit()) {
+ errMsg =
+ "Operation aborted: Failed to acquire lock due to "
+ "rocksdb_max_row_locks limit";
+ } else {
+ errMsg = s.ToString();
}
if (opt_msg) {
- std::string concatenated_error = s.ToString() + " (" + std::string(opt_msg) + ")";
- my_error(ER_GET_ERRMSG, MYF(0), s.code(), concatenated_error.c_str(), rocksdb_hton_name);
+ std::string concatenated_error = errMsg + " (" + std::string(opt_msg) + ")";
+ my_error(ER_GET_ERRMSG, MYF(0), s.code(), concatenated_error.c_str(),
+ rocksdb_hton_name);
} else {
- my_error(ER_GET_ERRMSG, MYF(0), s.code(), s.ToString().c_str(), rocksdb_hton_name);
+ my_error(ER_GET_ERRMSG, MYF(0), s.code(), errMsg.c_str(),
+ rocksdb_hton_name);
}
return err;
@@ -7081,8 +6950,8 @@ int ha_rocksdb::rdb_error_to_mysql(const rocksdb::Status &s,
static const std::set<uint> RDB_INDEX_COLLATIONS = {
COLLATION_BINARY, COLLATION_UTF8_BIN, COLLATION_LATIN1_BIN};
-static bool
-rdb_is_index_collation_supported(const my_core::Field *const field) {
+static bool rdb_is_index_collation_supported(
+ const my_core::Field *const field) {
const my_core::enum_field_types type = field->real_type();
/* Handle [VAR](CHAR|BINARY) or TEXT|BLOB */
if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING ||
@@ -7142,8 +7011,6 @@ int ha_rocksdb::create_key_defs(
DBUG_ASSERT(table_arg->s != nullptr);
- uint i;
-
/*
These need to be one greater than MAX_INDEXES since the user can create
MAX_INDEXES secondary keys and no primary key which would cause us
@@ -7160,6 +7027,36 @@ int ha_rocksdb::create_key_defs(
DBUG_RETURN(HA_EXIT_FAILURE);
}
+ uint64 ttl_duration = 0;
+ std::string ttl_column;
+ uint ttl_field_offset;
+
+ uint err;
+ if ((err = Rdb_key_def::extract_ttl_duration(table_arg, tbl_def_arg,
+ &ttl_duration))) {
+ DBUG_RETURN(err);
+ }
+
+ if ((err = Rdb_key_def::extract_ttl_col(table_arg, tbl_def_arg, &ttl_column,
+ &ttl_field_offset))) {
+ DBUG_RETURN(err);
+ }
+
+ /* We don't currently support TTL on tables with hidden primary keys. */
+ if (ttl_duration > 0 && has_hidden_pk(table_arg)) {
+ my_error(ER_RDB_TTL_UNSUPPORTED, MYF(0));
+ DBUG_RETURN(HA_EXIT_FAILURE);
+ }
+
+ /*
+ If TTL duration is not specified but TTL column was specified, throw an
+ error because TTL column requires duration.
+ */
+ if (ttl_duration == 0 && !ttl_column.empty()) {
+ my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_column.c_str());
+ DBUG_RETURN(HA_EXIT_FAILURE);
+ }
+
if (!old_tbl_def_arg) {
/*
old_tbl_def doesn't exist. this means we are in the process of creating
@@ -7168,9 +7065,9 @@ int ha_rocksdb::create_key_defs(
Get the index numbers (this will update the next_index_number)
and create Rdb_key_def structures.
*/
- for (i = 0; i < tbl_def_arg->m_key_count; i++) {
- if (create_key_def(table_arg, i, tbl_def_arg, &m_key_descr_arr[i],
- cfs[i])) {
+ for (uint i = 0; i < tbl_def_arg->m_key_count; i++) {
+ if (create_key_def(table_arg, i, tbl_def_arg, &m_key_descr_arr[i], cfs[i],
+ ttl_duration, ttl_column)) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
}
@@ -7181,7 +7078,8 @@ int ha_rocksdb::create_key_defs(
generate the necessary new key definitions if any.
*/
if (create_inplace_key_defs(table_arg, tbl_def_arg, old_table_arg,
- old_tbl_def_arg, cfs)) {
+ old_tbl_def_arg, cfs, ttl_duration,
+ ttl_column)) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
}
@@ -7267,8 +7165,8 @@ int ha_rocksdb::create_cfs(
// Generate the name for the column family to use.
bool per_part_match_found = false;
- std::string cf_name = generate_cf_name(i, table_arg, tbl_def_arg,
- &per_part_match_found);
+ std::string cf_name =
+ generate_cf_name(i, table_arg, tbl_def_arg, &per_part_match_found);
// Prevent create from using the system column family.
if (cf_name == DEFAULT_SYSTEM_CF_NAME) {
@@ -7313,7 +7211,8 @@ int ha_rocksdb::create_cfs(
int ha_rocksdb::create_inplace_key_defs(
const TABLE *const table_arg, Rdb_tbl_def *const tbl_def_arg,
const TABLE *const old_table_arg, const Rdb_tbl_def *const old_tbl_def_arg,
- const std::array<key_def_cf_info, MAX_INDEXES + 1> &cfs) const {
+ const std::array<key_def_cf_info, MAX_INDEXES + 1> &cfs,
+ uint64 ttl_duration, const std::string &ttl_column) const {
DBUG_ENTER_FUNC();
std::shared_ptr<Rdb_key_def> *const old_key_descr =
@@ -7339,10 +7238,11 @@ int ha_rocksdb::create_inplace_key_defs(
struct Rdb_index_info index_info;
if (!dict_manager.get_index_info(gl_index_id, &index_info)) {
// NO_LINT_DEBUG
- sql_print_error("RocksDB: Could not get index information "
- "for Index Number (%u,%u), table %s",
- gl_index_id.cf_id, gl_index_id.index_id,
- old_tbl_def_arg->full_tablename().c_str());
+ sql_print_error(
+ "RocksDB: Could not get index information "
+ "for Index Number (%u,%u), table %s",
+ gl_index_id.cf_id, gl_index_id.index_id,
+ old_tbl_def_arg->full_tablename().c_str());
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -7366,7 +7266,7 @@ int ha_rocksdb::create_inplace_key_defs(
dict_manager.get_stats(gl_index_id), index_info.m_index_flags,
ttl_rec_offset, index_info.m_ttl_duration);
} else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i],
- cfs[i])) {
+ cfs[i], ttl_duration, ttl_column)) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
@@ -7516,44 +7416,16 @@ int ha_rocksdb::compare_key_parts(const KEY *const old_key,
0 - Ok
other - error, either given table ddl is not supported by rocksdb or OOM.
*/
-int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i,
+int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint i,
const Rdb_tbl_def *const tbl_def_arg,
std::shared_ptr<Rdb_key_def> *const new_key_def,
- const struct key_def_cf_info &cf_info) const {
+ const struct key_def_cf_info &cf_info,
+ uint64 ttl_duration,
+ const std::string &ttl_column) const {
DBUG_ENTER_FUNC();
DBUG_ASSERT(*new_key_def == nullptr);
- uint64 ttl_duration = 0;
- std::string ttl_column;
- uint ttl_field_offset;
-
- uint err;
- if ((err = Rdb_key_def::extract_ttl_duration(table_arg, tbl_def_arg,
- &ttl_duration))) {
- DBUG_RETURN(err);
- }
-
- if ((err = Rdb_key_def::extract_ttl_col(table_arg, tbl_def_arg, &ttl_column,
- &ttl_field_offset))) {
- DBUG_RETURN(err);
- }
-
- /* We don't currently support TTL on tables with hidden primary keys. */
- if (ttl_duration > 0 && is_hidden_pk(i, table_arg, tbl_def_arg)) {
- my_error(ER_RDB_TTL_UNSUPPORTED, MYF(0));
- DBUG_RETURN(HA_EXIT_FAILURE);
- }
-
- /*
- If TTL duration is not specified but TTL column was specified, throw an
- error because TTL column requires duration.
- */
- if (ttl_duration == 0 && !ttl_column.empty()) {
- my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_column.c_str());
- DBUG_RETURN(HA_EXIT_FAILURE);
- }
-
const uint index_id = ddl_manager.get_and_update_next_number(&dict_manager);
const uint16_t index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
uchar index_type;
@@ -7602,7 +7474,8 @@ int ha_rocksdb::create_key_def(const TABLE *const table_arg, const uint &i,
if (!ttl_column.empty()) {
(*new_key_def)->m_ttl_column = ttl_column;
}
-
+ // initialize key_def
+ (*new_key_def)->setup(table_arg, tbl_def_arg);
DBUG_RETURN(HA_EXIT_SUCCESS);
}
@@ -7610,7 +7483,7 @@ int rdb_normalize_tablename(const std::string &tablename,
std::string *const strbuf) {
if (tablename.size() < 2 || tablename[0] != '.' ||
(tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) {
- DBUG_ASSERT(0); // We were not passed table name?
+ DBUG_ASSERT(0); // We were not passed table name?
return HA_ERR_ROCKSDB_INVALID_TABLE;
}
@@ -7620,7 +7493,7 @@ int rdb_normalize_tablename(const std::string &tablename,
}
if (pos == std::string::npos) {
- DBUG_ASSERT(0); // We were not passed table name?
+ DBUG_ASSERT(0); // We were not passed table name?
return HA_ERR_ROCKSDB_INVALID_TABLE;
}
@@ -7734,95 +7607,34 @@ int rdb_split_normalized_tablename(const std::string &fullname,
return HA_EXIT_SUCCESS;
}
-/**
- @brief
- create() is called to create a database. The variable name will have the name
- of the table.
-
- @details
- When create() is called you do not need to worry about
- opening the table. Also, the .frm file will have already been
- created so adjusting create_info is not necessary. You can overwrite
- the .frm file at this point if you wish to change the table
- definition, but there are no methods currently provided for doing
- so.
+/*
+ Create a table's Rdb_tbl_def and its Rdb_key_defs and store table information
+ into MyRocks Data Dictionary
+ The method is called during create table/partition, truncate table/partition
- Called from handle.cc by ha_create_table().
+ @param table_name IN table's name formated as
+ 'dbname.tablename'
+ @param table_arg IN sql table
+ @param auto_increment_value IN specified table's auto increment value
@return
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
-
- @see
- ha_create_table() in handle.cc
*/
-
-int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
- HA_CREATE_INFO *const create_info) {
+int ha_rocksdb::create_table(const std::string &table_name,
+ const TABLE *table_arg,
+ ulonglong auto_increment_value) {
DBUG_ENTER_FUNC();
- DBUG_ASSERT(table_arg != nullptr);
- DBUG_ASSERT(create_info != nullptr);
- if (create_info->data_file_name) {
- // DATA DIRECTORY is used to create tables under a specific location
- // outside the MySQL data directory. We don't support this for MyRocks.
- // The `rocksdb_datadir` setting should be used to configure RocksDB data
- // directory.
- print_error(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED,
- MYF(ME_JUST_WARNING));
- DBUG_RETURN(HA_WRONG_CREATE_OPTION);
- }
- if (create_info->index_file_name) {
- // Similar check for INDEX DIRECTORY as well.
- print_error(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED,
- MYF(ME_JUST_WARNING));
- DBUG_RETURN(HA_WRONG_CREATE_OPTION);
- }
-
- std::string str;
int err;
- THD *const thd = my_core::thd_get_current_thd();
-
- if (get_table_if_exists(name)) {
- if (thd->lex->sql_command == SQLCOM_TRUNCATE) {
- err = delete_table(name);
- if (err != HA_EXIT_SUCCESS) {
- DBUG_RETURN(err);
- }
- } else {
- err = rdb_normalize_tablename(name, &str);
- if (err != HA_EXIT_SUCCESS) {
- DBUG_RETURN(err);
- }
- my_error(ER_METADATA_INCONSISTENCY, MYF(0), str.c_str(), name);
- DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA);
- }
- }
-
- /*
- Construct dbname.tablename ourselves, because parititioning
- passes strings like "./test/t14#P#p0" for individual partitions,
- while table_arg->s->table_name has none of that.
- */
- err = rdb_normalize_tablename(name, &str);
- if (err != HA_EXIT_SUCCESS) {
- DBUG_RETURN(err);
- }
-
- if (contains_foreign_key(thd)) {
- my_error(ER_NOT_SUPPORTED_YET, MYF(0),
- "FOREIGN KEY for the RocksDB storage engine");
- DBUG_RETURN(HA_ERR_UNSUPPORTED);
- }
-
const std::unique_ptr<rocksdb::WriteBatch> wb = dict_manager.begin();
rocksdb::WriteBatch *const batch = wb.get();
/* Create table/key descriptions and put them into the data dictionary */
- m_tbl_def = new Rdb_tbl_def(str);
+ m_tbl_def = new Rdb_tbl_def(table_name);
uint n_keys = table_arg->s->keys;
@@ -7832,6 +7644,9 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
*/
if (has_hidden_pk(table_arg)) {
n_keys += 1;
+ // reset hidden pk id
+ // the starting valid value for hidden pk is 1
+ m_tbl_def->m_hidden_pk_val = 1;
}
m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[n_keys];
@@ -7845,9 +7660,9 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
m_pk_descr = m_key_descr_arr[pk_index(table_arg, m_tbl_def)];
- if (create_info->auto_increment_value) {
+ if (auto_increment_value) {
bool autoinc_upgrade_test = false;
- m_tbl_def->m_auto_incr_val = create_info->auto_increment_value;
+ m_tbl_def->m_auto_incr_val = auto_increment_value;
DBUG_EXECUTE_IF("myrocks_autoinc_upgrade", autoinc_upgrade_test = true;);
if (!autoinc_upgrade_test) {
auto s = dict_manager.put_auto_incr_val(
@@ -7886,6 +7701,86 @@ error:
}
/**
+ @brief
+ create() is called to create a table. The variable name will have the name
+ of the table.
+
+ @details
+ When create() is called you do not need to worry about
+ opening the table. Also, the .frm file will have already been
+ created so adjusting create_info is not necessary. You can overwrite
+ the .frm file at this point if you wish to change the table
+ definition, but there are no methods currently provided for doing
+ so.
+
+ Called from handle.cc by ha_create_table().
+
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+
+ @see
+ ha_create_table() in handle.cc
+*/
+
+int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
+ HA_CREATE_INFO *const create_info) {
+ DBUG_ENTER_FUNC();
+
+ DBUG_ASSERT(table_arg != nullptr);
+ DBUG_ASSERT(create_info != nullptr);
+
+ if (create_info->data_file_name) {
+ // DATA DIRECTORY is used to create tables under a specific location
+ // outside the MySQL data directory. We don't support this for MyRocks.
+ // The `rocksdb_datadir` setting should be used to configure RocksDB data
+ // directory.
+ DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_DATA_DIRECTORY_NOT_SUPPORTED);
+ }
+
+ if (create_info->index_file_name) {
+ // Similar check for INDEX DIRECTORY as well.
+ DBUG_RETURN(HA_ERR_ROCKSDB_TABLE_INDEX_DIRECTORY_NOT_SUPPORTED);
+ }
+
+ int err;
+ /*
+ Construct dbname.tablename ourselves, because parititioning
+ passes strings like "./test/t14#P#p0" for individual partitions,
+ while table_arg->s->table_name has none of that.
+ */
+ std::string str;
+ err = rdb_normalize_tablename(name, &str);
+ if (err != HA_EXIT_SUCCESS) {
+ DBUG_RETURN(err);
+ }
+
+ // FOREIGN KEY isn't supported yet
+ THD *const thd = my_core::thd_get_current_thd();
+ if (contains_foreign_key(thd)) {
+ my_error(ER_NOT_SUPPORTED_YET, MYF(0),
+ "FOREIGN KEY for the RocksDB storage engine");
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+ }
+
+ // Check whether Data Dictionary contain information
+ Rdb_tbl_def *tbl = ddl_manager.find(str);
+ if (tbl != nullptr) {
+ if (thd->lex->sql_command == SQLCOM_TRUNCATE) {
+ err = delete_table(tbl);
+ if (err != HA_EXIT_SUCCESS) {
+ DBUG_RETURN(err);
+ }
+ } else {
+ my_error(ER_METADATA_INCONSISTENCY, MYF(0), str.c_str(), name);
+ DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA);
+ }
+ }
+
+ DBUG_RETURN(create_table(str, table_arg, create_info->auto_increment_value));
+}
+
+/**
@note
This function is used only when the table has not yet been opened, and
keyread_allowed bitmap doesn't have the correct values yet.
@@ -7928,9 +7823,10 @@ bool ha_rocksdb::check_keyread_allowed(uint inx, uint part,
int ha_rocksdb::read_key_exact(const Rdb_key_def &kd,
rocksdb::Iterator *const iter,
- const bool &full_key_match,
+ const bool /* unused */,
const rocksdb::Slice &key_slice,
const int64_t ttl_filter_ts) {
+ THD *thd = ha_thd();
/*
We are looking for the first record such that
index_tuple= lookup_tuple.
@@ -7939,6 +7835,9 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd,
rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice);
while (iter->Valid() && kd.value_matches_prefix(iter->key(), key_slice)) {
+ if (thd && thd->killed) {
+ return HA_ERR_QUERY_INTERRUPTED;
+ }
/*
If TTL is enabled we need to check if the given key has already expired
from the POV of the current transaction. If it has, try going to the next
@@ -7960,9 +7859,10 @@ int ha_rocksdb::read_key_exact(const Rdb_key_def &kd,
}
int ha_rocksdb::read_before_key(const Rdb_key_def &kd,
- const bool &full_key_match,
+ const bool full_key_match,
const rocksdb::Slice &key_slice,
const int64_t ttl_filter_ts) {
+ THD *thd = ha_thd();
/*
We are looking for record with the biggest t.key such that
t.key < lookup_tuple.
@@ -7970,6 +7870,9 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd,
rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice);
while (is_valid(m_scan_it)) {
+ if (thd && thd->killed) {
+ return HA_ERR_QUERY_INTERRUPTED;
+ }
/*
We are using full key and we've hit an exact match, or...
@@ -7994,6 +7897,7 @@ int ha_rocksdb::read_before_key(const Rdb_key_def &kd,
int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
const rocksdb::Slice &key_slice,
const int64_t ttl_filter_ts) {
+ THD *thd = ha_thd();
/*
We are looking for the first record such that
@@ -8011,6 +7915,9 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
*/
while (is_valid(m_scan_it) && kd.has_ttl() &&
should_hide_ttl_rec(kd, m_scan_it->value(), ttl_filter_ts)) {
+ if (thd && thd->killed) {
+ return HA_ERR_QUERY_INTERRUPTED;
+ }
rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it);
}
@@ -8019,7 +7926,7 @@ int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
int ha_rocksdb::position_to_correct_key(
const Rdb_key_def &kd, const enum ha_rkey_function &find_flag,
- const bool &full_key_match, const uchar *const key,
+ const bool full_key_match, const uchar *const key,
const key_part_map &keypart_map, const rocksdb::Slice &key_slice,
bool *const move_forward, const int64_t ttl_filter_ts) {
int rc = 0;
@@ -8027,65 +7934,66 @@ int ha_rocksdb::position_to_correct_key(
*move_forward = true;
switch (find_flag) {
- case HA_READ_KEY_EXACT:
- rc =
- read_key_exact(kd, m_scan_it, full_key_match, key_slice, ttl_filter_ts);
- break;
- case HA_READ_BEFORE_KEY:
- *move_forward = false;
- rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts);
- if (rc == 0 && !kd.covers_key(m_scan_it->key())) {
- /* The record we've got is not from this index */
- rc = HA_ERR_KEY_NOT_FOUND;
- }
- break;
- case HA_READ_AFTER_KEY:
- case HA_READ_KEY_OR_NEXT:
- rc = read_after_key(kd, key_slice, ttl_filter_ts);
- if (rc == 0 && !kd.covers_key(m_scan_it->key())) {
- /* The record we've got is not from this index */
- rc = HA_ERR_KEY_NOT_FOUND;
- }
- break;
- case HA_READ_KEY_OR_PREV:
- case HA_READ_PREFIX:
- /* This flag is not used by the SQL layer, so we don't support it yet. */
- rc = HA_ERR_UNSUPPORTED;
- break;
- case HA_READ_PREFIX_LAST:
- case HA_READ_PREFIX_LAST_OR_PREV:
- *move_forward = false;
- /*
- Find the last record with the specified index prefix lookup.
- - HA_READ_PREFIX_LAST requires that the record has the
- prefix=lookup (if there are no such records,
- HA_ERR_KEY_NOT_FOUND should be returned).
- - HA_READ_PREFIX_LAST_OR_PREV has no such requirement. If there are no
- records with prefix=lookup, we should return the last record
- before that.
- */
- rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts);
- if (rc == 0) {
- const rocksdb::Slice &rkey = m_scan_it->key();
- if (!kd.covers_key(rkey)) {
+ case HA_READ_KEY_EXACT:
+ rc = read_key_exact(kd, m_scan_it, full_key_match, key_slice,
+ ttl_filter_ts);
+ break;
+ case HA_READ_BEFORE_KEY:
+ *move_forward = false;
+ rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts);
+ if (rc == 0 && !kd.covers_key(m_scan_it->key())) {
/* The record we've got is not from this index */
rc = HA_ERR_KEY_NOT_FOUND;
- } else if (find_flag == HA_READ_PREFIX_LAST) {
- uint size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
- key, keypart_map);
- rocksdb::Slice lookup_tuple(reinterpret_cast<char *>(m_sk_packed_tuple),
- size);
-
- // We need to compare the key we've got with the original search prefix.
- if (!kd.value_matches_prefix(rkey, lookup_tuple)) {
+ }
+ break;
+ case HA_READ_AFTER_KEY:
+ case HA_READ_KEY_OR_NEXT:
+ rc = read_after_key(kd, key_slice, ttl_filter_ts);
+ if (rc == 0 && !kd.covers_key(m_scan_it->key())) {
+ /* The record we've got is not from this index */
+ rc = HA_ERR_KEY_NOT_FOUND;
+ }
+ break;
+ case HA_READ_KEY_OR_PREV:
+ case HA_READ_PREFIX:
+ /* This flag is not used by the SQL layer, so we don't support it yet. */
+ rc = HA_ERR_UNSUPPORTED;
+ break;
+ case HA_READ_PREFIX_LAST:
+ case HA_READ_PREFIX_LAST_OR_PREV:
+ *move_forward = false;
+ /*
+ Find the last record with the specified index prefix lookup.
+ - HA_READ_PREFIX_LAST requires that the record has the
+ prefix=lookup (if there are no such records,
+ HA_ERR_KEY_NOT_FOUND should be returned).
+ - HA_READ_PREFIX_LAST_OR_PREV has no such requirement. If there are no
+ records with prefix=lookup, we should return the last record
+ before that.
+ */
+ rc = read_before_key(kd, full_key_match, key_slice, ttl_filter_ts);
+ if (rc == 0) {
+ const rocksdb::Slice &rkey = m_scan_it->key();
+ if (!kd.covers_key(rkey)) {
+ /* The record we've got is not from this index */
rc = HA_ERR_KEY_NOT_FOUND;
+ } else if (find_flag == HA_READ_PREFIX_LAST) {
+ uint size = kd.pack_index_tuple(table, m_pack_buffer,
+ m_sk_packed_tuple, key, keypart_map);
+ rocksdb::Slice lookup_tuple(
+ reinterpret_cast<char *>(m_sk_packed_tuple), size);
+
+ // We need to compare the key we've got with the original search
+ // prefix.
+ if (!kd.value_matches_prefix(rkey, lookup_tuple)) {
+ rc = HA_ERR_KEY_NOT_FOUND;
+ }
}
}
- }
- break;
- default:
- DBUG_ASSERT(0);
- break;
+ break;
+ default:
+ DBUG_ASSERT(0);
+ break;
}
return rc;
@@ -8094,11 +8002,10 @@ int ha_rocksdb::position_to_correct_key(
int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd,
const enum ha_rkey_function &find_flag,
const rocksdb::Slice &slice,
- const int &bytes_changed_by_succ,
+ const int bytes_changed_by_succ,
const key_range *const end_key,
uint *const end_key_packed_size) {
- if (find_flag == HA_READ_KEY_EXACT)
- return slice.size();
+ if (find_flag == HA_READ_KEY_EXACT) return slice.size();
if (find_flag == HA_READ_PREFIX_LAST) {
/*
@@ -8168,8 +8075,18 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf,
const rocksdb::Slice &rkey = m_scan_it->key();
const rocksdb::Slice &value = m_scan_it->value();
- bool covered_lookup =
- m_keyread_only || kd.covers_lookup(table, &value, &m_lookup_bitmap);
+#ifndef DBUG_OFF
+ bool save_keyread_only = m_keyread_only;
+#endif
+ DBUG_EXECUTE_IF("dbug.rocksdb.HA_EXTRA_KEYREAD", { m_keyread_only = true; });
+
+ bool covered_lookup = (m_keyread_only && kd.can_cover_lookup()) ||
+ kd.covers_lookup(&value, &m_lookup_bitmap);
+
+#ifndef DBUG_OFF
+ m_keyread_only = save_keyread_only;
+#endif
+
if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) {
pk_size =
kd.get_primary_key_tuple(table, *m_pk_descr, &rkey, m_pk_packed_tuple);
@@ -8177,12 +8094,11 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf,
rc = HA_ERR_ROCKSDB_CORRUPT_DATA;
} else {
rc = kd.unpack_record(table, buf, &rkey, &value,
- m_verify_row_debug_checksums);
+ m_converter->get_verify_row_debug_checksums());
global_stats.covered_secondary_key_lookups.inc();
}
} else {
- if (kd.m_is_reverse_cf)
- move_forward = !move_forward;
+ if (kd.m_is_reverse_cf) move_forward = !move_forward;
rc = find_icp_matching_index_rec(move_forward, buf);
if (!rc) {
@@ -8229,11 +8145,12 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf,
ulong ha_rocksdb::index_flags(uint inx, uint part, bool all_parts) const {
DBUG_ENTER_FUNC();
- ulong base_flags = HA_READ_NEXT | // doesn't seem to be used
+ ulong base_flags = HA_READ_NEXT | // doesn't seem to be used
HA_READ_ORDER | HA_READ_RANGE | HA_READ_PREV;
- if (check_keyread_allowed(inx, part, all_parts))
+ if (check_keyread_allowed(inx, part, all_parts)) {
base_flags |= HA_KEYREAD_ONLY;
+ }
if (inx == table_share->primary_key) {
/*
@@ -8289,13 +8206,15 @@ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) {
rocksdb::Slice value = m_scan_it->value();
bool covered_lookup =
- m_keyread_only || m_key_descr_arr[keyno]->covers_lookup(
- table, &value, &m_lookup_bitmap);
+ (m_keyread_only && m_key_descr_arr[keyno]->can_cover_lookup()) ||
+ m_key_descr_arr[keyno]->covers_lookup(&value, &m_lookup_bitmap);
if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) {
rc = m_key_descr_arr[keyno]->unpack_record(
- table, buf, &key, &value, m_verify_row_debug_checksums);
+ table, buf, &key, &value,
+ m_converter->get_verify_row_debug_checksums());
global_stats.covered_secondary_key_lookups.inc();
} else {
+ DEBUG_SYNC(ha_thd(), "rocksdb_concurrent_delete_sk");
rc = get_row_by_rowid(buf, m_pk_packed_tuple, size);
}
@@ -8345,9 +8264,10 @@ int ha_rocksdb::read_range_first(const key_range *const start_key,
range_key_part = table->key_info[active_index].key_part;
- if (!start_key) // Read first record
+ if (!start_key) {
+ // Read first record
result = ha_index_first(table->record[0]);
- else {
+ } else {
#ifdef MARIAROCKS_NOT_YET
if (is_using_prohibited_gap_locks(
is_using_full_unique_key(active_index, start_key->keypart_map,
@@ -8363,8 +8283,9 @@ int ha_rocksdb::read_range_first(const key_range *const start_key,
start_key->keypart_map, start_key->flag, end_key);
})
}
- if (result)
+ if (result) {
DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND) ? HA_ERR_END_OF_FILE : result);
+ }
if (compare_key(end_range) <= 0) {
DBUG_RETURN(HA_EXIT_SUCCESS);
@@ -8441,14 +8362,23 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
const key_range *end_key) {
DBUG_ENTER_FUNC();
+ DBUG_EXECUTE_IF("myrocks_busy_loop_on_row_read", int debug_i = 0;
+ while (1) { debug_i++; });
+
int rc = 0;
+ THD *thd = ha_thd();
+ DEBUG_SYNC(thd, "rocksdb.check_flags_rmi");
+ if (thd && thd->killed) {
+ rc = HA_ERR_QUERY_INTERRUPTED;
+ DBUG_RETURN(rc);
+ }
+
const Rdb_key_def &kd = *m_key_descr_arr[active_index];
const uint actual_key_parts = kd.get_key_parts();
bool using_full_key = is_using_full_key(keypart_map, actual_key_parts);
- if (!end_key)
- end_key = end_range;
+ if (!end_key) end_key = end_range;
/* By default, we don't need the retrieved records to match the prefix */
m_sk_match_prefix = nullptr;
@@ -8465,8 +8395,7 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
m_pk_packed_tuple, key, keypart_map);
bool skip_lookup = is_blind_delete_enabled();
- rc = get_row_by_rowid(buf, m_pk_packed_tuple, size,
- skip_lookup, false);
+ rc = get_row_by_rowid(buf, m_pk_packed_tuple, size, skip_lookup, false);
if (!rc && !skip_lookup) {
#ifdef MARIAROCKS_NOT_YET
@@ -8491,8 +8420,9 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
key, tmp_map);
if (table->key_info[active_index].user_defined_key_parts !=
- kd.get_key_parts())
+ kd.get_key_parts()) {
using_full_key = false;
+ }
} else {
packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
key, keypart_map);
@@ -8546,14 +8476,20 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
bool use_all_keys = false;
if (find_flag == HA_READ_KEY_EXACT &&
- my_count_bits(keypart_map) == kd.get_key_parts())
+ my_count_bits(keypart_map) == kd.get_key_parts()) {
use_all_keys = true;
+ }
Rdb_transaction *const tx = get_or_create_tx(table->in_use);
const bool is_new_snapshot = !tx->has_snapshot();
// Loop as long as we get a deadlock error AND we end up creating the
// snapshot here (i.e. it did not exist prior to this)
for (;;) {
+ DEBUG_SYNC(thd, "rocksdb.check_flags_rmi_scan");
+ if (thd && thd->killed) {
+ rc = HA_ERR_QUERY_INTERRUPTED;
+ break;
+ }
/*
This will open the iterator and position it at a record that's equal or
greater than the lookup tuple.
@@ -8570,9 +8506,7 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
slice, &move_forward, tx->m_snapshot_timestamp);
if (rc) {
- /* This status is returned on any error */
- table->status = STATUS_NOT_FOUND;
- DBUG_RETURN(rc);
+ break;
}
m_skip_scan_it_next_call = false;
@@ -8582,13 +8516,15 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
then we have all the rows we need. For a secondary key we now need to
lookup the primary key.
*/
- if (active_index == table->s->primary_key)
+ if (active_index == table->s->primary_key) {
rc = read_row_from_primary_key(buf);
- else
+ } else {
rc = read_row_from_secondary_key(buf, kd, move_forward);
+ }
- if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot)
+ if (!should_recreate_snapshot(rc, is_new_snapshot)) {
break; /* Exit the loop */
+ }
// release the snapshot and iterator so they will be regenerated
tx->release_snapshot();
@@ -8596,7 +8532,10 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
}
if (rc) {
- /* the only possible error condition is record-not-found */
+ /*
+ This status is returned on any error
+ the only possible error condition is record-not-found
+ */
table->status = STATUS_NOT_FOUND;
} else {
table->status = 0;
@@ -8630,13 +8569,21 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
other - error code
*/
-int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward,
+int ha_rocksdb::find_icp_matching_index_rec(const bool move_forward,
uchar *const buf) {
if (pushed_idx_cond && pushed_idx_cond_keyno == active_index) {
const Rdb_key_def &kd = *m_key_descr_arr[active_index];
+ THD *thd = ha_thd();
while (1) {
- rocksdb_skip_expired_records(kd, m_scan_it, !move_forward);
+ int rc = rocksdb_skip_expired_records(kd, m_scan_it, !move_forward);
+ if (rc != HA_EXIT_SUCCESS) {
+ return rc;
+ }
+
+ if (thd && thd->killed) {
+ return HA_ERR_QUERY_INTERRUPTED;
+ }
if (!is_valid(m_scan_it)) {
table->status = STATUS_NOT_FOUND;
@@ -8660,7 +8607,7 @@ int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward,
const rocksdb::Slice value = m_scan_it->value();
int err = kd.unpack_record(table, buf, &rkey, &value,
- m_verify_row_debug_checksums);
+ m_converter->get_verify_row_debug_checksums());
if (err != HA_EXIT_SUCCESS) {
return err;
}
@@ -8681,14 +8628,6 @@ int ha_rocksdb::find_icp_matching_index_rec(const bool &move_forward,
* row. */
break;
}
- /*
- TODO: should we have this here, or RockDB handles this internally?
- if (my_core::thd_killed(current_thd))
- {
- rc= HA_ERR_INTERNAL_ERROR; // doesn't matter
- break;
- }
- */
}
}
return HA_EXIT_SUCCESS;
@@ -8720,14 +8659,14 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
// Rdb_transaction *tx= get_or_create_tx(table->in_use);
// tx->snapshot= nullptr;
- const bool save_verify_row_debug_checksums = m_verify_row_debug_checksums;
- m_verify_row_debug_checksums = true;
-
+ bool save_verify_row_debug_checksums =
+ m_converter->get_verify_row_debug_checksums();
+ m_converter->set_verify_row_debug_checksums(true);
/* For each secondary index, check that we can get a PK value from it */
// NO_LINT_DEBUG
sql_print_verbose_info("CHECKTABLE %s: Checking table %s", table_name,
table_name);
- ha_rows UNINIT_VAR(row_checksums_at_start); // set/used iff first_index==true
+ ha_rows UNINIT_VAR(row_checksums_at_start); // set/used iff first_index==true
ha_rows row_checksums = ha_rows(-1);
bool first_index = true;
@@ -8737,20 +8676,21 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
ha_index_init(keyno, true);
ha_rows rows = 0;
ha_rows checksums = 0;
- if (first_index)
- row_checksums_at_start = m_row_checksums_checked;
+ if (first_index) {
+ row_checksums_at_start = m_converter->get_row_checksums_checked();
+ }
int res;
// NO_LINT_DEBUG
sql_print_verbose_info("CHECKTABLE %s: Checking index %s", table_name,
table->key_info[keyno].name);
while (1) {
- if (!rows)
+ if (!rows) {
res = index_first(table->record[0]);
- else
+ } else {
res = index_next(table->record[0]);
+ }
- if (res == HA_ERR_END_OF_FILE)
- break;
+ if (res == HA_ERR_END_OF_FILE) break;
if (res) {
// error
// NO_LINT_DEBUG
@@ -8771,16 +8711,18 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
if ((res = get_row_by_rowid(table->record[0], rowkey_copy.ptr(),
rowkey_copy.length()))) {
// NO_LINT_DEBUG
- sql_print_error("CHECKTABLE %s: .. row %lld: "
- "failed to fetch row by rowid",
- table_name, rows);
+ sql_print_error(
+ "CHECKTABLE %s: .. row %lld: "
+ "failed to fetch row by rowid",
+ table_name, rows);
goto error;
}
longlong hidden_pk_id = 0;
if (has_hidden_pk(table) &&
- read_hidden_pk_id_from_rowkey(&hidden_pk_id))
+ read_hidden_pk_id_from_rowkey(&hidden_pk_id)) {
goto error;
+ }
/* Check if we get the same PK value */
uint packed_size = m_pk_descr->pack_record(
@@ -8801,9 +8743,10 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
if (packed_size != sec_key_copy.length() ||
memcmp(m_sk_packed_tuple, sec_key_copy.ptr(), packed_size)) {
// NO_LINT_DEBUG
- sql_print_error("CHECKTABLE %s: .. row %lld: "
- "secondary index value mismatch",
- table_name, rows);
+ sql_print_error(
+ "CHECKTABLE %s: .. row %lld: "
+ "secondary index value mismatch",
+ table_name, rows);
goto print_and_error;
}
rows++;
@@ -8831,11 +8774,13 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
}
// NO_LINT_DEBUG
sql_print_verbose_info("CHECKTABLE %s: ... %lld index entries checked "
+ "CHECKTABLE %s: ... %lld index entries checked "
"(%lld had checksums)",
table_name, rows, checksums);
if (first_index) {
- row_checksums = m_row_checksums_checked - row_checksums_at_start;
+ row_checksums =
+ m_converter->get_row_checksums_checked() - row_checksums_at_start;
first_index = false;
}
ha_index_end();
@@ -8848,16 +8793,16 @@ int ha_rocksdb::check(THD *const thd, HA_CHECK_OPT *const check_opt) {
}
extra(HA_EXTRA_NO_KEYREAD);
- m_verify_row_debug_checksums = save_verify_row_debug_checksums;
+ m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums);
/*
- TODO: we should check also for PK records that are missing in the secondary
- indexes.
+ TODO(sergiy): we should check also for PK records that are missing in
+ the secondary indexes.
For that, need to walk through the PK and check that every PK record has a
proper counterpart in each secondary index.
*/
DBUG_RETURN(HA_ADMIN_OK);
error:
- m_verify_row_debug_checksums = save_verify_row_debug_checksums;
+ m_converter->set_verify_row_debug_checksums(save_verify_row_debug_checksums);
ha_index_or_rnd_end();
extra(HA_EXTRA_NO_KEYREAD);
@@ -8867,10 +8812,11 @@ error:
static void dbug_dump_str(FILE *const out, const char *const str, int len) {
fprintf(out, "\"");
for (int i = 0; i < len; i++) {
- if (str[i] > 32)
+ if (str[i] > 32) {
fprintf(out, "%c", str[i]);
- else
+ } else {
fprintf(out, "\\%d", str[i]);
+ }
}
fprintf(out, "\"");
}
@@ -8883,8 +8829,7 @@ static void dbug_dump_str(FILE *const out, const char *const str, int len) {
void dbug_dump_database(rocksdb::DB *const db) {
FILE *const out = fopen("/tmp/rocksdb.dump", "wt");
- if (!out)
- return;
+ if (!out) return;
rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -8906,30 +8851,23 @@ rocksdb::Status ha_rocksdb::get_for_update(
DBUG_ASSERT(m_lock_rows != RDB_LOCK_NONE);
const bool exclusive = m_lock_rows != RDB_LOCK_READ;
- rocksdb::Status s = tx->get_for_update(column_family, key, value, exclusive);
-
- // If we have a lock conflict and we are running in READ COMMITTTED mode
- // release and reacquire the snapshot and then retry the get_for_update().
- if (s.IsBusy() && !s.IsDeadlock() &&
- my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED) {
- tx->release_snapshot();
- tx->acquire_snapshot(false);
-
- s = tx->get_for_update(column_family, key, value, exclusive);
- }
+ const bool do_validate =
+ my_core::thd_tx_isolation(ha_thd()) > ISO_READ_COMMITTED;
+ rocksdb::Status s =
+ tx->get_for_update(column_family, key, value, exclusive, do_validate);
+#ifndef DBUG_OFF
+ ++rocksdb_num_get_for_update_calls;
+#endif
return s;
}
-bool ha_rocksdb::is_blind_delete_enabled()
-{
+bool ha_rocksdb::is_blind_delete_enabled() {
THD *thd = ha_thd();
return (THDVAR(thd, blind_delete_primary_key) &&
thd->lex->sql_command == SQLCOM_DELETE &&
- thd->lex->table_count == 1 &&
- table->s->keys == 1 &&
- !has_hidden_pk(table) &&
- !thd->rgi_slave);
+ thd->lex->table_count == 1 && table->s->keys == 1 &&
+ !has_hidden_pk(table) && !thd->rgi_slave);
}
/*
@@ -8957,8 +8895,9 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
DEBUG_SYNC(ha_thd(), "rocksdb.get_row_by_rowid");
DBUG_EXECUTE_IF("dbug.rocksdb.get_row_by_rowid", {
THD *thd = ha_thd();
- const char act[] = "now signal Reached "
- "wait_for signal.rocksdb.get_row_by_rowid_let_running";
+ const char act[] =
+ "now signal Reached "
+ "wait_for signal.rocksdb.get_row_by_rowid_let_running";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
};);
@@ -8967,8 +8906,7 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
rocksdb::Status s;
/* Pretend row found without looking up */
- if (skip_lookup)
- {
+ if (skip_lookup) {
#ifdef MARIAROCKS_NOT_YET
stats.rows_deleted_blind++;
#endif
@@ -8981,6 +8919,17 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
if (m_lock_rows == RDB_LOCK_NONE) {
tx->acquire_snapshot(true);
s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record);
+ } else if (m_insert_with_update && m_dup_pk_found) {
+ DBUG_ASSERT(m_pk_descr->get_keyno() == m_dupp_errkey);
+ DBUG_ASSERT(m_dup_pk_retrieved_record.length() ==
+ m_retrieved_record.size());
+ DBUG_ASSERT(memcmp(m_dup_pk_retrieved_record.ptr(),
+ m_retrieved_record.data(),
+ m_retrieved_record.size()) == 0);
+
+ // do nothing - we already have the result in m_retrieved_record and
+ // already taken the lock
+ s = rocksdb::Status::OK();
} else {
s = get_for_update(tx, m_pk_descr->get_cf(), key_slice,
&m_retrieved_record);
@@ -9035,8 +8984,7 @@ int ha_rocksdb::index_next(uchar *const buf) {
}
int rc = index_next_with_direction(buf, moves_forward);
- if (rc == HA_ERR_KEY_NOT_FOUND)
- rc = HA_ERR_END_OF_FILE;
+ if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE;
DBUG_RETURN(rc);
}
@@ -9055,8 +9003,7 @@ int ha_rocksdb::index_prev(uchar *const buf) {
}
int rc = index_next_with_direction(buf, moves_forward);
- if (rc == HA_ERR_KEY_NOT_FOUND)
- rc = HA_ERR_END_OF_FILE;
+ if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE;
DBUG_RETURN(rc);
}
@@ -9069,19 +9016,33 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) {
if (active_index == pk_index(table, m_tbl_def)) {
rc = rnd_next_with_direction(buf, move_forward);
} else {
- if (m_skip_scan_it_next_call) {
- m_skip_scan_it_next_call = false;
- } else {
- if (move_forward)
- m_scan_it->Next(); /* this call cannot fail */
- else
- m_scan_it->Prev();
+ THD *thd = ha_thd();
+ for (;;) {
+ DEBUG_SYNC(thd, "rocksdb.check_flags_inwd");
+ if (thd && thd->killed) {
+ rc = HA_ERR_QUERY_INTERRUPTED;
+ break;
+ }
+ if (m_skip_scan_it_next_call) {
+ m_skip_scan_it_next_call = false;
+ } else {
+ if (move_forward) {
+ m_scan_it->Next(); /* this call cannot fail */
+ } else {
+ m_scan_it->Prev();
+ }
+ }
+ rc = rocksdb_skip_expired_records(*m_key_descr_arr[active_index],
+ m_scan_it, !move_forward);
+ if (rc != HA_EXIT_SUCCESS) {
+ break;
+ }
+ rc = find_icp_matching_index_rec(move_forward, buf);
+ if (!rc) rc = secondary_index_read(active_index, buf);
+ if (!should_skip_invalidated_record(rc)) {
+ break;
+ }
}
- rocksdb_skip_expired_records(*m_key_descr_arr[active_index], m_scan_it,
- !move_forward);
- rc = find_icp_matching_index_rec(move_forward, buf);
- if (!rc)
- rc = secondary_index_read(active_index, buf);
}
DBUG_RETURN(rc);
@@ -9099,8 +9060,7 @@ int ha_rocksdb::index_first(uchar *const buf) {
int rc = m_key_descr_arr[active_index]->m_is_reverse_cf
? index_last_intern(buf)
: index_first_intern(buf);
- if (rc == HA_ERR_KEY_NOT_FOUND)
- rc = HA_ERR_END_OF_FILE;
+ if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE;
DBUG_RETURN(rc);
}
@@ -9117,8 +9077,7 @@ int ha_rocksdb::index_last(uchar *const buf) {
int rc = m_key_descr_arr[active_index]->m_is_reverse_cf
? index_first_intern(buf)
: index_last_intern(buf);
- if (rc == HA_ERR_KEY_NOT_FOUND)
- rc = HA_ERR_END_OF_FILE;
+ if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE;
DBUG_RETURN(rc);
}
@@ -9186,8 +9145,9 @@ int ha_rocksdb::index_first_intern(uchar *const buf) {
m_skip_scan_it_next_call = true;
rc = index_next_with_direction(buf, true);
- if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot)
- break; // exit the loop
+ if (!should_recreate_snapshot(rc, is_new_snapshot)) {
+ break; /* exit the loop */
+ }
// release the snapshot and iterator so they will be regenerated
tx->release_snapshot();
@@ -9261,11 +9221,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) {
DBUG_ASSERT(key != nullptr);
const Rdb_key_def &kd = *m_key_descr_arr[active_index];
- if (kd.m_is_reverse_cf) {
- kd.get_infimum_key(key, &key_size);
- } else {
- kd.get_supremum_key(key, &key_size);
- }
+ int key_end_matching_bytes = kd.get_last_key(key, &key_size);
rocksdb::Slice index_key((const char *)key, key_size);
@@ -9276,7 +9232,7 @@ int ha_rocksdb::index_last_intern(uchar *const buf) {
// Loop as long as we get a deadlock error AND we end up creating the
// snapshot here (i.e. it did not exist prior to this)
for (;;) {
- setup_scan_iterator(kd, &index_key, false, Rdb_key_def::INDEX_NUMBER_SIZE);
+ setup_scan_iterator(kd, &index_key, false, key_end_matching_bytes);
m_scan_it->SeekForPrev(index_key);
m_skip_scan_it_next_call = false;
@@ -9285,12 +9241,12 @@ int ha_rocksdb::index_last_intern(uchar *const buf) {
rc = rnd_next_with_direction(buf, false);
} else {
rc = find_icp_matching_index_rec(false /*move_forward*/, buf);
- if (!rc)
- rc = secondary_index_read(active_index, buf);
+ if (!rc) rc = secondary_index_read(active_index, buf);
}
- if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !is_new_snapshot)
+ if (!should_recreate_snapshot(rc, is_new_snapshot)) {
break; /* exit the loop */
+ }
// release the snapshot and iterator so they will be regenerated
tx->release_snapshot();
@@ -9329,7 +9285,7 @@ void ha_rocksdb::unlock_row() {
- If the index is PRIMARY KEY, and if all of the columns of the table
are covered by the PRIMARY KEY, SingleDelete can be used.
*/
-bool ha_rocksdb::can_use_single_delete(const uint &index) const {
+bool ha_rocksdb::can_use_single_delete(const uint index) const {
return (index != pk_index(table, m_tbl_def) ||
(!has_hidden_pk(table) &&
table->key_info[index].ext_key_parts == table->s->fields));
@@ -9345,12 +9301,18 @@ bool ha_rocksdb::skip_unique_check() const {
3) the user set unique_checks option to 0, and the table does not have
any indexes. If the table has secondary keys, then those might becomes
inconsisted/corrupted
+ 4) We're using read-free replication
*/
return THDVAR(table->in_use, bulk_load) ||
(m_force_skip_unique_check && m_skip_unique_check) ||
(my_core::thd_test_options(table->in_use,
OPTION_RELAXED_UNIQUE_CHECKS) &&
- m_tbl_def->m_key_count == 1);
+ m_tbl_def->m_key_count == 1) ||
+#ifdef MARIAROCKS_NOT_YET
+ use_read_free_rpl();
+#else
+ FALSE;
+#endif
}
#ifdef MARIAROCKS_NOT_YET // MDEV-10975
@@ -9450,10 +9412,9 @@ const char *ha_rocksdb::get_key_comment(const uint index,
return table_arg->key_info[index].comment.str;
}
-const std::string ha_rocksdb::generate_cf_name(const uint index,
- const TABLE *const table_arg,
- const Rdb_tbl_def *const tbl_def_arg,
- bool *per_part_match_found) {
+const std::string ha_rocksdb::generate_cf_name(
+ const uint index, const TABLE *const table_arg,
+ const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found) {
DBUG_ASSERT(table_arg != nullptr);
DBUG_ASSERT(tbl_def_arg != nullptr);
DBUG_ASSERT(per_part_match_found != nullptr);
@@ -9498,6 +9459,9 @@ const std::string ha_rocksdb::get_table_comment(const TABLE *const table_arg) {
}
/**
+ Write a new row
+
+ @param[in] buf new row data to write
@return
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
@@ -9520,6 +9484,11 @@ int ha_rocksdb::write_row(uchar *const buf) {
}
}
+ // clear cache at beginning of write for INSERT ON DUPLICATE
+ // we may get multiple write->fail->read->update if there are multiple
+ // values from INSERT
+ m_dup_pk_found = false;
+
const int rv = update_write_row(nullptr, buf, skip_unique_check());
if (rv == 0) {
@@ -9552,6 +9521,15 @@ void ha_rocksdb::set_last_rowkey(const uchar *const old_data) {
#endif
}
+/**
+ Collect update data for primary key
+
+ @param[in, out] row_info hold all data for update row, such as
+ new row data/old row data
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) {
int size;
@@ -9564,10 +9542,9 @@ int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) {
row_info->new_pk_unpack_info = &m_pk_unpack_info;
- size =
- m_pk_descr->pack_record(table, m_pack_buffer, row_info->new_data,
- m_pk_packed_tuple, row_info->new_pk_unpack_info,
- false, 0, 0, nullptr, &row_info->ttl_pk_offset);
+ size = m_pk_descr->pack_record(
+ table, m_pack_buffer, row_info->new_data, m_pk_packed_tuple,
+ row_info->new_pk_unpack_info, false, 0, 0, nullptr);
} else if (row_info->old_data == nullptr) {
row_info->hidden_pk_id = update_hidden_pk_val();
size =
@@ -9591,30 +9568,32 @@ int ha_rocksdb::get_pk_for_update(struct update_row_info *const row_info) {
return HA_EXIT_SUCCESS;
}
-int ha_rocksdb::check_and_lock_unique_pk(const uint &key_id,
+/**
+ Check the specified primary key value is unique and also lock the row
+
+ @param[in] key_id key index
+ @param[in] row_info hold all data for update row, such as old row
+ data and new row data
+ @param[out] found whether the primary key exists before.
+ @param[out] pk_changed whether primary key is changed
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int ha_rocksdb::check_and_lock_unique_pk(const uint key_id,
const struct update_row_info &row_info,
- bool *const found,
- bool *const pk_changed) {
+ bool *const found) {
DBUG_ASSERT(found != nullptr);
- DBUG_ASSERT(pk_changed != nullptr);
- *pk_changed = false;
-
- /*
- For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs
- always require locking.
- */
- if (row_info.old_pk_slice.size() > 0) {
- /*
- If the keys are the same, then no lock is needed
- */
- if (!row_info.new_pk_slice.compare(row_info.old_pk_slice)) {
- *found = false;
- return HA_EXIT_SUCCESS;
- }
+ DBUG_ASSERT(row_info.old_pk_slice.size() == 0 ||
+ row_info.new_pk_slice.compare(row_info.old_pk_slice) != 0);
- *pk_changed = true;
- }
+ /* Ignore PK violations if this is a optimized 'replace into' */
+#ifdef MARIAROCKS_NOT_YET
+ const bool ignore_pk_unique_check = ha_thd()->lex->blind_replace_into;
+#else
+ const bool ignore_pk_unique_check= false;
+#endif
/*
Perform a read to determine if a duplicate entry exists. For primary
@@ -9637,17 +9616,56 @@ int ha_rocksdb::check_and_lock_unique_pk(const uint &key_id,
*/
const rocksdb::Status s =
get_for_update(row_info.tx, m_pk_descr->get_cf(), row_info.new_pk_slice,
- &m_retrieved_record);
+ ignore_pk_unique_check ? nullptr : &m_retrieved_record);
if (!s.ok() && !s.IsNotFound()) {
return row_info.tx->set_status_error(
table->in_use, s, *m_key_descr_arr[key_id], m_tbl_def, m_table_handler);
}
- *found = !s.IsNotFound();
+ bool key_found = ignore_pk_unique_check ? false : !s.IsNotFound();
+
+ /*
+ If the pk key has ttl, we may need to pretend the row wasn't
+ found if it is already expired.
+ */
+ if (key_found && m_pk_descr->has_ttl() &&
+ should_hide_ttl_rec(*m_pk_descr, m_retrieved_record,
+ (row_info.tx->m_snapshot_timestamp
+ ? row_info.tx->m_snapshot_timestamp
+ : static_cast<int64_t>(std::time(nullptr))))) {
+ key_found = false;
+ }
+
+ if (key_found && row_info.old_data == nullptr && m_insert_with_update) {
+ // In INSERT ON DUPLICATE KEY UPDATE ... case, if the insert failed
+ // due to a duplicate key, remember the last key and skip the check
+ // next time
+ m_dup_pk_found = true;
+
+#ifndef DBUG_OFF
+ // save it for sanity checking later
+ m_dup_pk_retrieved_record.copy(m_retrieved_record.data(),
+ m_retrieved_record.size(), &my_charset_bin);
+#endif
+ }
+
+ *found = key_found;
+
return HA_EXIT_SUCCESS;
}
-int ha_rocksdb::check_and_lock_sk(const uint &key_id,
+/**
+ Check the specified secondary key value is unique and also lock the row
+
+ @param[in] key_id key index
+ @param[in] row_info hold all data for update row, such as old row
+ data and new row data
+ @param[out] found whether specified key value exists before.
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
+int ha_rocksdb::check_and_lock_sk(const uint key_id,
const struct update_row_info &row_info,
bool *const found) {
DBUG_ASSERT(found != nullptr);
@@ -9777,8 +9795,18 @@ int ha_rocksdb::check_and_lock_sk(const uint &key_id,
return HA_EXIT_SUCCESS;
}
+/**
+ Enumerate all keys to check their uniquess and also lock it
+
+ @param[in] row_info hold all data for update row, such as old row
+ data and new row data
+ @param[out] pk_changed whether primary key is changed
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
int ha_rocksdb::check_uniqueness_and_lock(
- const struct update_row_info &row_info, bool *const pk_changed) {
+ const struct update_row_info &row_info, bool pk_changed) {
/*
Go through each index and determine if the index has uniqueness
requirements. If it does, then try to obtain a row lock on the new values.
@@ -9790,7 +9818,12 @@ int ha_rocksdb::check_uniqueness_and_lock(
int rc;
if (is_pk(key_id, table, m_tbl_def)) {
- rc = check_and_lock_unique_pk(key_id, row_info, &found, pk_changed);
+ if (row_info.old_pk_slice.size() > 0 && !pk_changed) {
+ found = false;
+ rc = HA_EXIT_SUCCESS;
+ } else {
+ rc = check_and_lock_unique_pk(key_id, row_info, &found);
+ }
} else {
rc = check_and_lock_sk(key_id, row_info, &found);
}
@@ -9799,23 +9832,11 @@ int ha_rocksdb::check_uniqueness_and_lock(
return rc;
}
- /*
- If the pk key has ttl, we may need to pretend the row wasn't
- found if it is already expired. The pk record is read into
- m_retrieved_record by check_and_lock_unique_pk().
- */
- if (is_pk(key_id, table, m_tbl_def) && found && m_pk_descr->has_ttl() &&
- should_hide_ttl_rec(*m_pk_descr, m_retrieved_record,
- (row_info.tx->m_snapshot_timestamp
- ? row_info.tx->m_snapshot_timestamp
- : static_cast<int64_t>(std::time(nullptr))))) {
- found = false;
- }
-
if (found) {
/* There is a row with this key already, so error out. */
errkey = key_id;
m_dupp_errkey = errkey;
+
return HA_ERR_FOUND_DUPP_KEY;
}
}
@@ -9823,19 +9844,31 @@ int ha_rocksdb::check_uniqueness_and_lock(
return HA_EXIT_SUCCESS;
}
+/**
+ Check whether secondary key value is duplicate or not
+
+ @param[in] table_arg the table currently working on
+ @param[in key_def the key_def is being checked
+ @param[in] key secondary key storage data
+ @param[out] sk_info hold secondary key memcmp datas(new/old)
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
+
int ha_rocksdb::check_duplicate_sk(const TABLE *table_arg,
- const Rdb_key_def &index,
+ const Rdb_key_def &key_def,
const rocksdb::Slice *key,
struct unique_sk_buf_info *sk_info) {
uint n_null_fields = 0;
- const rocksdb::Comparator *index_comp = index.get_cf()->GetComparator();
+ const rocksdb::Comparator *index_comp = key_def.get_cf()->GetComparator();
/* Get proper SK buffer. */
uchar *sk_buf = sk_info->swap_and_get_sk_buf();
/* Get memcmp form of sk without extended pk tail */
uint sk_memcmp_size =
- index.get_memcmp_sk_parts(table_arg, *key, sk_buf, &n_null_fields);
+ key_def.get_memcmp_sk_parts(table_arg, *key, sk_buf, &n_null_fields);
sk_info->sk_memcmp_key =
rocksdb::Slice(reinterpret_cast<char *>(sk_buf), sk_memcmp_size);
@@ -9855,12 +9888,17 @@ int ha_rocksdb::bulk_load_key(Rdb_transaction *const tx, const Rdb_key_def &kd,
const rocksdb::Slice &value, bool sort) {
DBUG_ENTER_FUNC();
int res;
+ THD *thd = ha_thd();
+ if (thd && thd->killed) {
+ DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
+ }
+
rocksdb::ColumnFamilyHandle *cf = kd.get_cf();
// In the case of unsorted inserts, m_sst_info allocated here is not
// used to store the keys. It is still used to indicate when tables
// are switched.
- if (m_sst_info == nullptr || m_sst_info->is_committed()) {
+ if (m_sst_info == nullptr || m_sst_info->is_done()) {
m_sst_info.reset(new Rdb_sst_info(rdb, m_table_handler->m_table_name,
kd.get_name(), cf, *rocksdb_db_options,
THDVAR(ha_thd(), trace_sst_api)));
@@ -9893,17 +9931,61 @@ int ha_rocksdb::finalize_bulk_load(bool print_client_error) {
/* Skip if there are no possible ongoing bulk loads */
if (m_sst_info) {
- res = m_sst_info->commit(print_client_error);
+ if (m_sst_info->is_done()) {
+ m_sst_info.reset();
+ DBUG_RETURN(res);
+ }
+
+ Rdb_sst_info::Rdb_sst_commit_info commit_info;
+
+ // Wrap up the current work in m_sst_info and get ready to commit
+ // This transfer the responsibility of commit over to commit_info
+ res = m_sst_info->finish(&commit_info, print_client_error);
+ if (res == 0) {
+ // Make sure we have work to do - under race condition we could lose
+ // to another thread and end up with no work
+ if (commit_info.has_work()) {
+ rocksdb::IngestExternalFileOptions opts;
+ opts.move_files = true;
+ opts.snapshot_consistency = false;
+ opts.allow_global_seqno = false;
+ opts.allow_blocking_flush = false;
+
+ const rocksdb::Status s = rdb->IngestExternalFile(
+ commit_info.get_cf(), commit_info.get_committed_files(), opts);
+ if (!s.ok()) {
+ if (print_client_error) {
+ Rdb_sst_info::report_error_msg(s, nullptr);
+ }
+ res = HA_ERR_ROCKSDB_BULK_LOAD;
+ } else {
+ // Mark the list of SST files as committed, otherwise they'll get
+ // cleaned up when commit_info destructs
+ commit_info.commit();
+ }
+ }
+ }
m_sst_info.reset();
}
DBUG_RETURN(res);
}
-int ha_rocksdb::update_pk(const Rdb_key_def &kd,
- const struct update_row_info &row_info,
- const bool &pk_changed) {
- const uint key_id = kd.get_keyno();
- const bool hidden_pk = is_hidden_pk(key_id, table, m_tbl_def);
+/**
+ Update an existing primary key record or write a new primary key record
+
+ @param[in] kd the primary key is being update/write
+ @param[in] update_row_info hold all row data, such as old row data and
+ new row data
+ @param[in] pk_changed whether primary key is changed
+ @return
+ HA_EXIT_SUCCESS OK
+ Other HA_ERR error code (can be SE-specific)
+ */
+int ha_rocksdb::update_write_pk(const Rdb_key_def &kd,
+ const struct update_row_info &row_info,
+ bool pk_changed) {
+ uint key_id = kd.get_keyno();
+ bool hidden_pk = is_hidden_pk(key_id, table, m_tbl_def);
ulonglong bytes_written = 0;
/*
@@ -9931,7 +10013,10 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd,
int rc = HA_EXIT_SUCCESS;
rocksdb::Slice value_slice;
/* Prepare the new record to be written into RocksDB */
- if ((rc = convert_record_to_storage_format(row_info, &value_slice))) {
+ if ((rc = m_converter->encode_value_slice(
+ m_pk_descr, row_info.new_pk_slice, row_info.new_pk_unpack_info,
+ !row_info.old_pk_slice.empty(), should_store_row_debug_checksums(),
+ m_ttl_bytes, &m_ttl_bytes_updated, &value_slice))) {
return rc;
}
@@ -9951,7 +10036,9 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd,
row_info.tx->get_indexed_write_batch()->Put(cf, row_info.new_pk_slice,
value_slice);
} else {
- const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice);
+ const bool assume_tracked = can_assume_tracked(ha_thd());
+ const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice,
+ assume_tracked);
if (!s.ok()) {
if (s.IsBusy()) {
errkey = table->s->primary_key;
@@ -9971,9 +10058,22 @@ int ha_rocksdb::update_pk(const Rdb_key_def &kd,
return rc;
}
-int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd,
- const struct update_row_info &row_info,
- const bool bulk_load_sk) {
+/**
+ update an existing secondary key record or write a new secondary key record
+
+ @param[in] table_arg Table we're working on
+ @param[in] kd The secondary key being update/write
+ @param[in] row_info data structure contains old row data and new row data
+ @param[in] bulk_load_sk whether support bulk load. Currently it is only
+ support for write
+ @return
+ HA_EXIT_SUCCESS OK
+ Other HA_ERR error code (can be SE-specific)
+ */
+int ha_rocksdb::update_write_sk(const TABLE *const table_arg,
+ const Rdb_key_def &kd,
+ const struct update_row_info &row_info,
+ const bool bulk_load_sk) {
int new_packed_size;
int old_packed_size;
int rc = HA_EXIT_SUCCESS;
@@ -9995,19 +10095,18 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd,
return HA_EXIT_SUCCESS;
}
- const bool store_row_debug_checksums = should_store_row_debug_checksums();
-
+ bool store_row_debug_checksums = should_store_row_debug_checksums();
new_packed_size =
kd.pack_record(table_arg, m_pack_buffer, row_info.new_data,
m_sk_packed_tuple, &m_sk_tails, store_row_debug_checksums,
- row_info.hidden_pk_id, 0, nullptr, nullptr, m_ttl_bytes);
+ row_info.hidden_pk_id, 0, nullptr, m_ttl_bytes);
if (row_info.old_data != nullptr) {
// The old value
old_packed_size = kd.pack_record(
table_arg, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old,
&m_sk_tails_old, store_row_debug_checksums, row_info.hidden_pk_id, 0,
- nullptr, nullptr, m_ttl_bytes);
+ nullptr, m_ttl_bytes);
/*
Check if we are going to write the same value. This can happen when
@@ -10067,13 +10166,22 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd,
return rc;
}
-int ha_rocksdb::update_indexes(const struct update_row_info &row_info,
- const bool &pk_changed) {
+/**
+ Update existing indexes(PK/SKs) or write new indexes(PK/SKs)
+
+ @param[in] row_info hold all row data, such as old key/new key
+ @param[in] pk_changed whether primary key is changed
+ @return
+ HA_EXIT_SUCCESS OK
+ Other HA_ERR error code (can be SE-specific)
+ */
+int ha_rocksdb::update_write_indexes(const struct update_row_info &row_info,
+ const bool pk_changed) {
int rc;
bool bulk_load_sk;
// The PK must be updated first to pull out the TTL value.
- rc = update_pk(*m_pk_descr, row_info, pk_changed);
+ rc = update_write_pk(*m_pk_descr, row_info, pk_changed);
if (rc != HA_EXIT_SUCCESS) {
return rc;
}
@@ -10088,7 +10196,8 @@ int ha_rocksdb::update_indexes(const struct update_row_info &row_info,
continue;
}
- rc = update_sk(table, *m_key_descr_arr[key_id], row_info, bulk_load_sk);
+ rc = update_write_sk(table, *m_key_descr_arr[key_id], row_info,
+ bulk_load_sk);
if (rc != HA_EXIT_SUCCESS) {
return rc;
}
@@ -10097,11 +10206,26 @@ int ha_rocksdb::update_indexes(const struct update_row_info &row_info,
return HA_EXIT_SUCCESS;
}
+/**
+ Update an existing row or write a new row
+
+ @param[in] old_data nullptr for write, non-null for update
+ @param[in] new_data non-null for write/update
+ @param[in] skip_unique_check whether to check uniqueness
+ @return
+ HA_EXIT_SUCCESS OK
+ Other HA_ERR error code (can be SE-specific)
+ */
int ha_rocksdb::update_write_row(const uchar *const old_data,
const uchar *const new_data,
const bool skip_unique_check) {
DBUG_ENTER_FUNC();
+ THD *thd = ha_thd();
+ if (thd && thd->killed) {
+ DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
+ }
+
bool pk_changed = false;
struct update_row_info row_info;
@@ -10109,7 +10233,6 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
row_info.new_data = new_data;
row_info.skip_unique_check = skip_unique_check;
row_info.new_pk_unpack_info = nullptr;
-
set_last_rowkey(old_data);
row_info.tx = get_or_create_tx(table->in_use);
@@ -10130,12 +10253,20 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
DBUG_RETURN(rc);
}
+ /*
+ For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs
+ always require locking.
+ */
+ if (row_info.old_pk_slice.size() > 0) {
+ pk_changed = row_info.new_pk_slice.compare(row_info.old_pk_slice) != 0;
+ }
+
if (!skip_unique_check) {
/*
Check to see if we are going to have failures because of unique
keys. Also lock the appropriate key values.
*/
- rc = check_uniqueness_and_lock(row_info, &pk_changed);
+ rc = check_uniqueness_and_lock(row_info, pk_changed);
if (rc != HA_EXIT_SUCCESS) {
DBUG_RETURN(rc);
}
@@ -10149,7 +10280,7 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
here because updates to the transaction will be made and those updates
cannot be easily removed without rolling back the entire transaction.
*/
- rc = update_indexes(row_info, pk_changed);
+ rc = update_write_indexes(row_info, pk_changed);
if (rc != HA_EXIT_SUCCESS) {
DBUG_RETURN(rc);
}
@@ -10190,18 +10321,29 @@ void ha_rocksdb::setup_iterator_bounds(
const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len,
uchar *const lower_bound, uchar *const upper_bound,
rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice) {
- uint min_len = std::min(eq_cond.size(), bound_len);
- memcpy(upper_bound, eq_cond.data(), min_len);
- kd.successor(upper_bound, min_len);
- memcpy(lower_bound, eq_cond.data(), min_len);
- kd.predecessor(lower_bound, min_len);
+ // If eq_cond is shorter than Rdb_key_def::INDEX_NUMBER_SIZE, we should be
+ // able to get better bounds just by using index id directly.
+ if (eq_cond.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
+ DBUG_ASSERT(bound_len == Rdb_key_def::INDEX_NUMBER_SIZE);
+ uint size;
+ kd.get_infimum_key(lower_bound, &size);
+ DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE);
+ kd.get_supremum_key(upper_bound, &size);
+ DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE);
+ } else {
+ DBUG_ASSERT(bound_len <= eq_cond.size());
+ memcpy(upper_bound, eq_cond.data(), bound_len);
+ kd.successor(upper_bound, bound_len);
+ memcpy(lower_bound, eq_cond.data(), bound_len);
+ kd.predecessor(lower_bound, bound_len);
+ }
if (kd.m_is_reverse_cf) {
- *upper_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len);
- *lower_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len);
+ *upper_bound_slice = rocksdb::Slice((const char *)lower_bound, bound_len);
+ *lower_bound_slice = rocksdb::Slice((const char *)upper_bound, bound_len);
} else {
- *upper_bound_slice = rocksdb::Slice((const char *)upper_bound, min_len);
- *lower_bound_slice = rocksdb::Slice((const char *)lower_bound, min_len);
+ *upper_bound_slice = rocksdb::Slice((const char *)upper_bound, bound_len);
+ *lower_bound_slice = rocksdb::Slice((const char *)lower_bound, bound_len);
}
}
@@ -10220,8 +10362,17 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd,
bool skip_bloom = true;
const rocksdb::Slice eq_cond(slice->data(), eq_cond_len);
+ // The size of m_scan_it_lower_bound (and upper) is technically
+ // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers. Rather
+ // than recalculating that number, we pass in the max of eq_cond_len and
+ // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than
+ // max_packed_sk_len, hence ensuring no buffer overrun.
+ //
+ // See ha_rocksdb::setup_iterator_bounds on how the bound_len parameter is
+ // used.
if (check_bloom_and_set_bounds(
- ha_thd(), kd, eq_cond, use_all_keys, eq_cond_len,
+ ha_thd(), kd, eq_cond, use_all_keys,
+ std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_NUMBER_SIZE),
m_scan_it_lower_bound, m_scan_it_upper_bound,
&m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) {
skip_bloom = false;
@@ -10259,7 +10410,8 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd,
m_scan_it_snapshot = rdb->GetSnapshot();
auto read_opts = rocksdb::ReadOptions();
- read_opts.total_order_seek = true; // TODO: set based on WHERE conditions
+ // TODO(mung): set based on WHERE conditions
+ read_opts.total_order_seek = true;
read_opts.snapshot = m_scan_it_snapshot;
m_scan_it = rdb->NewIterator(read_opts, kd.get_cf());
} else {
@@ -10284,12 +10436,12 @@ void ha_rocksdb::release_scan_iterator() {
void ha_rocksdb::setup_iterator_for_rnd_scan() {
uint key_size;
- int key_start_matching_bytes = m_pk_descr->get_first_key(m_pk_packed_tuple, &key_size);
+ int key_start_matching_bytes =
+ m_pk_descr->get_first_key(m_pk_packed_tuple, &key_size);
rocksdb::Slice table_key((const char *)m_pk_packed_tuple, key_size);
- setup_scan_iterator(*m_pk_descr, &table_key, false,
- key_start_matching_bytes);
+ setup_scan_iterator(*m_pk_descr, &table_key, false, key_start_matching_bytes);
m_scan_it->Seek(table_key);
m_skip_scan_it_next_call = true;
}
@@ -10302,8 +10454,16 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() {
int ha_rocksdb::rnd_init(bool scan) {
DBUG_ENTER_FUNC();
+ THD *thd = ha_thd();
+ if (thd && thd->killed) {
+ DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
+ }
+
Rdb_transaction *const tx = get_or_create_tx(table->in_use);
- setup_read_decoders();
+
+ // when this table is being updated, decode all fields
+ m_converter->setup_field_decoders(table->read_set,
+ m_lock_rows == RDB_LOCK_WRITE);
if (scan) {
m_rnd_scan_is_new_snapshot = !tx->has_snapshot();
@@ -10331,9 +10491,9 @@ int ha_rocksdb::rnd_next(uchar *const buf) {
int rc;
for (;;) {
rc = rnd_next_with_direction(buf, true);
- if (rc != HA_ERR_ROCKSDB_STATUS_BUSY || !m_rnd_scan_is_new_snapshot)
+ if (!should_recreate_snapshot(rc, m_rnd_scan_is_new_snapshot)) {
break; /* exit the loop */
-
+ }
// release the snapshot and iterator and then regenerate them
Rdb_transaction *tx = get_or_create_tx(table->in_use);
tx->release_snapshot();
@@ -10343,8 +10503,7 @@ int ha_rocksdb::rnd_next(uchar *const buf) {
m_rnd_scan_is_new_snapshot = false;
- if (rc == HA_ERR_KEY_NOT_FOUND)
- rc = HA_ERR_END_OF_FILE;
+ if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE;
DBUG_RETURN(rc);
}
@@ -10356,6 +10515,7 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) {
DBUG_ENTER_FUNC();
int rc;
+ THD *thd = ha_thd();
table->status = STATUS_NOT_FOUND;
#ifdef MARIAROCKS_NOT_YET
@@ -10374,13 +10534,20 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) {
}
for (;;) {
+ DEBUG_SYNC(thd, "rocksdb.check_flags_rnwd");
+ if (thd && thd->killed) {
+ rc = HA_ERR_QUERY_INTERRUPTED;
+ break;
+ }
+
if (m_skip_scan_it_next_call) {
m_skip_scan_it_next_call = false;
} else {
- if (move_forward)
+ if (move_forward) {
m_scan_it->Next(); /* this call cannot fail */
- else
+ } else {
m_scan_it->Prev(); /* this call cannot fail */
+ }
}
if (!is_valid(m_scan_it)) {
@@ -10414,11 +10581,7 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) {
const rocksdb::Status s =
get_for_update(tx, m_pk_descr->get_cf(), key, &m_retrieved_record);
if (s.IsNotFound() &&
- my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED) {
- // This occurs if we accessed a row, tried to lock it, failed,
- // released and reacquired the snapshot (because of READ COMMITTED
- // mode) and the row was deleted by someone else in the meantime.
- // If so, we just want to move on to the next row.
+ should_skip_invalidated_record(HA_ERR_KEY_NOT_FOUND)) {
continue;
}
@@ -10478,11 +10641,18 @@ int ha_rocksdb::rnd_end() {
int ha_rocksdb::index_init(uint idx, bool sorted) {
DBUG_ENTER_FUNC();
+ THD *thd = ha_thd();
+ if (thd && thd->killed) {
+ DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
+ }
+
Rdb_transaction *const tx = get_or_create_tx(table->in_use);
DBUG_ASSERT(tx != nullptr);
- setup_read_decoders();
- m_start_range= NULL;
+ m_start_range= NULL;
+ // when this table is being updated, decode all fields
+ m_converter->setup_field_decoders(table->read_set,
+ m_lock_rows == RDB_LOCK_WRITE);
if (!m_keyread_only) {
m_key_descr_arr[idx]->get_lookup_bitmap(table, &m_lookup_bitmap);
@@ -10526,10 +10696,20 @@ int ha_rocksdb::truncate() {
DBUG_ASSERT(m_tbl_def != nullptr);
- m_tbl_def->m_auto_incr_val = 1;
- m_tbl_def->m_hidden_pk_val = 1;
+ // Save table name to use later
+ std::string table_name = m_tbl_def->full_tablename();
+
+ // Delete current table
+ int err = delete_table(m_tbl_def);
+ if (err != HA_EXIT_SUCCESS) {
+ DBUG_RETURN(err);
+ }
- DBUG_RETURN(remove_rows(m_tbl_def));
+ // Reset auto_increment_value to 1 if auto-increment feature is enabled
+ // By default, the starting valid value for auto_increment_value is 1
+ DBUG_RETURN(create_table(
+ table_name, table,
+ table->found_next_number_field ? 1 : 0 /* auto_increment_value */));
}
/*
@@ -10607,9 +10787,11 @@ rocksdb::Status ha_rocksdb::delete_or_singledelete(
uint index, Rdb_transaction *const tx,
rocksdb::ColumnFamilyHandle *const column_family,
const rocksdb::Slice &key) {
- if (can_use_single_delete(index))
- return tx->single_delete(column_family, key);
- return tx->delete_key(column_family, key);
+ const bool assume_tracked = can_assume_tracked(ha_thd());
+ if (can_use_single_delete(index)) {
+ return tx->single_delete(column_family, key, assume_tracked);
+ }
+ return tx->delete_key(column_family, key, assume_tracked);
}
void ha_rocksdb::update_stats(void) {
@@ -10640,8 +10822,9 @@ void ha_rocksdb::update_stats(void) {
int ha_rocksdb::info(uint flag) {
DBUG_ENTER_FUNC();
- if (!table)
+ if (!table) {
DBUG_RETURN(HA_EXIT_FAILURE);
+ }
if (flag & HA_STATUS_VARIABLE) {
/*
@@ -10669,10 +10852,8 @@ int ha_rocksdb::info(uint flag) {
// if number of records is hardcoded, we do not want to force computation
// of memtable cardinalities
- if (stats.records == 0 ||
- (rocksdb_force_compute_memtable_stats &&
- rocksdb_debug_optimizer_n_rows == 0))
- {
+ if (stats.records == 0 || (rocksdb_force_compute_memtable_stats &&
+ rocksdb_debug_optimizer_n_rows == 0)) {
// First, compute SST files stats
uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2];
auto r = get_range(pk_index(table, m_tbl_def), buf);
@@ -10682,8 +10863,8 @@ int ha_rocksdb::info(uint flag) {
if (stats.records == 0) {
rdb->GetApproximateSizes(m_pk_descr->get_cf(), &r, 1, &sz,
include_flags);
- stats.records+= sz/ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE;
- stats.data_file_length+= sz;
+ stats.records += sz / ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE;
+ stats.data_file_length += sz;
}
// Second, compute memtable stats. This call is expensive, so cache
// values computed for some time.
@@ -10720,14 +10901,17 @@ int ha_rocksdb::info(uint flag) {
stats.records += m_table_handler->m_mtcache_count;
stats.data_file_length += m_table_handler->m_mtcache_size;
}
+ }
- if (rocksdb_debug_optimizer_n_rows > 0)
- stats.records = rocksdb_debug_optimizer_n_rows;
+ if (rocksdb_debug_optimizer_n_rows > 0) {
+ stats.records = rocksdb_debug_optimizer_n_rows;
}
- if (stats.records != 0)
+ if (stats.records != 0) {
stats.mean_rec_length = stats.data_file_length / stats.records;
+ }
}
+
if (flag & HA_STATUS_CONST) {
ref_length = m_pk_descr->max_storage_fmt_length();
@@ -10753,8 +10937,7 @@ int ha_rocksdb::info(uint flag) {
} else {
x = 0;
}
- if (x > stats.records)
- x = stats.records;
+ if (x > stats.records) x = stats.records;
if ((x == 0 && rocksdb_debug_optimizer_no_zero_cardinality) ||
rocksdb_debug_optimizer_n_rows > 0) {
// Fake cardinality implementation. For example, (idx1, idx2, idx3)
@@ -10789,7 +10972,7 @@ int ha_rocksdb::info(uint flag) {
uniqueness violation.
*/
errkey = m_dupp_errkey;
- dup_ref = m_pk_tuple; // TODO(?): this should store packed PK.
+ dup_ref = m_pk_tuple; // TODO(?): this should store packed PK.
}
if (flag & HA_STATUS_AUTO) {
@@ -10803,8 +10986,9 @@ void ha_rocksdb::position(const uchar *const record) {
DBUG_ENTER_FUNC();
longlong hidden_pk_id = 0;
- if (has_hidden_pk(table) && read_hidden_pk_id_from_rowkey(&hidden_pk_id))
- DBUG_ASSERT(false); // should never reach here
+ if (has_hidden_pk(table) && read_hidden_pk_id_from_rowkey(&hidden_pk_id)) {
+ DBUG_ASSERT(false); // should never reach here
+ }
/*
Get packed primary key value from the record.
@@ -10826,8 +11010,9 @@ void ha_rocksdb::position(const uchar *const record) {
It could be that mem-comparable form of PK occupies less than ref_length
bytes. Fill the remainder with zeros.
*/
- if (ref_length > packed_size)
+ if (ref_length > packed_size) {
memset(ref + packed_size, 0, ref_length - packed_size);
+ }
DBUG_VOID_RETURN;
}
@@ -10885,8 +11070,7 @@ void ha_rocksdb::calc_updated_indexes() {
/* Walk over all key parts, including the "extended key" suffix */
const uint key_parts = kd.get_key_parts();
for (uint kp = 0; kp < key_parts; kp++) {
- if (has_hidden_pk(table) && kp + 1 == key_parts)
- break;
+ if (has_hidden_pk(table) && kp + 1 == key_parts) break;
Field *const field = kd.get_table_field_for_part_no(table, kp);
if (bitmap_is_set(table->write_set, field->field_index)) {
@@ -10899,6 +11083,9 @@ void ha_rocksdb::calc_updated_indexes() {
}
/**
+ Update an existing row
+ @param[in] old_data nullptr for write, non-null for update
+ @param[in] new_data non-null for write/update
@return
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
@@ -10916,7 +11103,7 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) {
*/
DBUG_ASSERT(new_data == table->record[0]);
- const int rv = update_write_row(old_data, new_data, false);
+ const int rv = update_write_row(old_data, new_data, skip_unique_check());
if (rv == 0) {
#ifdef MARIAROCKS_NOT_YET
@@ -11021,7 +11208,8 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *const thd, THR_LOCK_DATA **to,
void ha_rocksdb::read_thd_vars(THD *const thd) {
m_store_row_debug_checksums = THDVAR(thd, store_row_debug_checksums);
- m_verify_row_debug_checksums = THDVAR(thd, verify_row_debug_checksums);
+ m_converter->set_verify_row_debug_checksums(
+ THDVAR(thd, verify_row_debug_checksums));
m_checksums_pct = THDVAR(thd, checksums_pct);
}
@@ -11112,7 +11300,8 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
tx->io_perf_end_and_record(&m_io_perf);
tx->m_n_mysql_tables_in_use--;
if (tx->m_n_mysql_tables_in_use == 0 &&
- !my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
+ !my_core::thd_test_options(thd,
+ OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/*
Do like InnoDB: when we get here, it's time to commit a
single-statement transaction.
@@ -11244,9 +11433,8 @@ rocksdb::Range get_range(const Rdb_key_def &kd,
}
}
-rocksdb::Range
-ha_rocksdb::get_range(const int &i,
- uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]) const {
+rocksdb::Range ha_rocksdb::get_range(
+ const int i, uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2]) const {
return myrocks::get_range(*m_key_descr_arr[i], buf);
}
@@ -11257,11 +11445,10 @@ ha_rocksdb::get_range(const int &i,
but in drop_index_thread's case, it means index is marked as removed,
so no further seek will happen for the index id.
*/
-static bool is_myrocks_index_empty(
- rocksdb::ColumnFamilyHandle *cfh, const bool is_reverse_cf,
- const rocksdb::ReadOptions &read_opts,
- const uint index_id)
-{
+static bool is_myrocks_index_empty(rocksdb::ColumnFamilyHandle *cfh,
+ const bool is_reverse_cf,
+ const rocksdb::ReadOptions &read_opts,
+ const uint index_id) {
bool index_removed = false;
uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE] = {0};
rdb_netbuf_store_uint32(key_buf, index_id);
@@ -11272,8 +11459,7 @@ static bool is_myrocks_index_empty(
if (!it->Valid()) {
index_removed = true;
} else {
- if (memcmp(it->key().data(), key_buf,
- Rdb_key_def::INDEX_NUMBER_SIZE)) {
+ if (memcmp(it->key().data(), key_buf, Rdb_key_def::INDEX_NUMBER_SIZE)) {
// Key does not have same prefix
index_removed = true;
}
@@ -11300,8 +11486,8 @@ void Rdb_drop_index_thread::run() {
timespec ts;
int sec= dict_manager.is_drop_index_empty()
- ? 24 * 60 * 60 // no filtering
- : 60; // filtering
+ ? 24 * 60 * 60 // no filtering
+ : 60; // filtering
set_timespec(ts,sec);
const auto ret MY_ATTRIBUTE((__unused__)) =
@@ -11318,26 +11504,23 @@ void Rdb_drop_index_thread::run() {
if (!indices.empty()) {
std::unordered_set<GL_INDEX_ID> finished;
rocksdb::ReadOptions read_opts;
- read_opts.total_order_seek = true; // disable bloom filter
+ read_opts.total_order_seek = true; // disable bloom filter
for (const auto d : indices) {
uint32 cf_flags = 0;
if (!dict_manager.get_cf_flags(d.cf_id, &cf_flags)) {
- sql_print_error("RocksDB: Failed to get column family flags "
- "from cf id %u. MyRocks data dictionary may "
- "get corrupted.",
- d.cf_id);
+ // NO_LINT_DEBUG
+ sql_print_error(
+ "RocksDB: Failed to get column family flags "
+ "from cf id %u. MyRocks data dictionary may "
+ "get corrupted.",
+ d.cf_id);
abort();
}
rocksdb::ColumnFamilyHandle *cfh = cf_manager.get_cf(d.cf_id);
DBUG_ASSERT(cfh);
const bool is_reverse_cf = cf_flags & Rdb_key_def::REVERSE_CF_FLAG;
- if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id))
- {
- finished.insert(d);
- continue;
- }
uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2];
rocksdb::Range range = get_range(d.index_id, buf, is_reverse_cf ? 1 : 0,
is_reverse_cf ? 0 : 1);
@@ -11357,8 +11540,7 @@ void Rdb_drop_index_thread::run() {
}
rdb_handle_io_error(status, RDB_IO_ERROR_BG_THREAD);
}
- if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id))
- {
+ if (is_myrocks_index_empty(cfh, is_reverse_cf, read_opts, d.index_id)) {
finished.insert(d);
}
}
@@ -11385,28 +11567,24 @@ Rdb_tbl_def *ha_rocksdb::get_table_if_exists(const char *const tablename) {
}
/*
- Note: the following function is called when the table is not open. That is,
- this->table==nullptr, pk_key_descr==nullptr, etc.
+ Overload func for delete table ---it deletes table meta data in data
+ dictionary immediately and delete real data in background thread(async)
- tablename points to line in form "./dbname/tablename".
+ @param tbl IN MyRocks table definition
@return
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
*/
-
-int ha_rocksdb::delete_table(const char *const tablename) {
+int ha_rocksdb::delete_table(Rdb_tbl_def *const tbl) {
DBUG_ENTER_FUNC();
- DBUG_ASSERT(tablename != nullptr);
+ DBUG_ASSERT(tbl != nullptr);
+ DBUG_ASSERT(m_tbl_def == nullptr || m_tbl_def == tbl);
const std::unique_ptr<rocksdb::WriteBatch> wb = dict_manager.begin();
rocksdb::WriteBatch *const batch = wb.get();
- /* Find the table in the hash */
- Rdb_tbl_def *const tbl = get_table_if_exists(tablename);
- if (!tbl)
- DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
dict_manager.add_drop_table(tbl->m_key_descr_arr, tbl->m_key_count, batch);
/*
@@ -11421,10 +11599,36 @@ int ha_rocksdb::delete_table(const char *const tablename) {
}
rdb_drop_idx_thread.signal();
-
+ // avoid dangling pointer
+ m_tbl_def = nullptr;
DBUG_RETURN(HA_EXIT_SUCCESS);
}
+/*
+ Note: the following function is called when the table is not open. That is,
+ this->table==nullptr, pk_key_descr==nullptr, etc.
+
+ tablename points to line in form "./dbname/tablename".
+
+ @return
+ HA_EXIT_SUCCESS OK
+ other HA_ERR error code (can be SE-specific)
+*/
+
+int ha_rocksdb::delete_table(const char *const tablename) {
+ DBUG_ENTER_FUNC();
+
+ DBUG_ASSERT(tablename != nullptr);
+
+ /* Find the table in the hash */
+ Rdb_tbl_def *const tbl = get_table_if_exists(tablename);
+ if (!tbl) {
+ DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
+ }
+
+ DBUG_RETURN(delete_table(tbl));
+}
+
int ha_rocksdb::remove_rows(Rdb_tbl_def *const tbl) {
const rocksdb::WriteOptions wo =
rdb_get_rocksdb_write_options(handler::ha_thd());
@@ -11575,21 +11779,33 @@ int ha_rocksdb::extra(enum ha_extra_function operation) {
DBUG_ENTER_FUNC();
switch (operation) {
- case HA_EXTRA_KEYREAD:
- m_keyread_only = true;
- break;
- case HA_EXTRA_NO_KEYREAD:
- m_keyread_only = false;
- break;
- case HA_EXTRA_FLUSH:
- /*
- If the table has blobs, then they are part of m_retrieved_record.
- This call invalidates them.
- */
- m_retrieved_record.Reset();
- break;
- default:
- break;
+ case HA_EXTRA_KEYREAD:
+ m_keyread_only = true;
+ break;
+ case HA_EXTRA_NO_KEYREAD:
+ m_keyread_only = false;
+ break;
+ case HA_EXTRA_FLUSH:
+ /*
+ If the table has blobs, then they are part of m_retrieved_record.
+ This call invalidates them.
+ */
+ m_retrieved_record.Reset();
+ break;
+ case HA_EXTRA_INSERT_WITH_UPDATE:
+ // INSERT ON DUPLICATE KEY UPDATE
+ if (rocksdb_enable_insert_with_update_caching) {
+ m_insert_with_update = true;
+ }
+ break;
+ case HA_EXTRA_NO_IGNORE_DUP_KEY:
+ // PAIRED with HA_EXTRA_INSERT_WITH_UPDATE or HA_EXTRA_WRITE_CAN_REPLACE
+ // that indicates the end of REPLACE / INSERT ON DUPLICATE KEY
+ m_insert_with_update = false;
+ break;
+
+ default:
+ break;
}
DBUG_RETURN(HA_EXIT_SUCCESS);
@@ -11660,8 +11876,7 @@ ha_rows ha_rocksdb::records_in_range(uint inx, key_range *const min_key,
uint64_t sz = 0;
auto disk_size = kd.m_stats.m_actual_disk_size;
- if (disk_size == 0)
- disk_size = kd.m_stats.m_data_size;
+ if (disk_size == 0) disk_size = kd.m_stats.m_data_size;
auto rows = kd.m_stats.m_rows;
if (rows == 0 || disk_size == 0) {
rows = 1;
@@ -11772,7 +11987,7 @@ static int calculate_stats(
// get RocksDB table properties for these ranges
rocksdb::TablePropertiesCollection props;
- for (auto it : ranges) {
+ for (const auto &it : ranges) {
const auto old_size MY_ATTRIBUTE((__unused__)) = props.size();
const auto status = rdb->GetPropertiesOfTablesInRange(
it.first, &it.second[0], it.second.size(), &props);
@@ -11962,73 +12177,77 @@ void ha_rocksdb::get_auto_increment(ulonglong off, ulonglong inc,
// The next value can be more complicated if either 'inc' or 'off' is not 1
ulonglong last_val = auto_incr;
- // Loop until we can correctly update the atomic value
- do {
- DBUG_ASSERT(last_val > 0);
- // Calculate the next value in the auto increment series: offset
- // + N * increment where N is 0, 1, 2, ...
- //
- // For further information please visit:
- // http://dev.mysql.com/doc/refman/5.7/en/replication-options-master.html
- //
- // The following is confusing so here is an explanation:
- // To get the next number in the sequence above you subtract out the
- // offset, calculate the next sequence (N * increment) and then add the
- // offset back in.
- //
- // The additions are rearranged to avoid overflow. The following is
- // equivalent to (last_val - 1 + inc - off) / inc. This uses the fact
- // that (a+b)/c = a/c + b/c + (a%c + b%c)/c. To show why:
- //
- // (a+b)/c
- // = (a - a%c + a%c + b - b%c + b%c) / c
- // = (a - a%c) / c + (b - b%c) / c + (a%c + b%c) / c
- // = a/c + b/c + (a%c + b%c) / c
- //
- // Now, substitute a = last_val - 1, b = inc - off, c = inc to get the
- // following statement.
- ulonglong n =
- (last_val - 1) / inc + ((last_val - 1) % inc + inc - off) / inc;
-
- // Check if n * inc + off will overflow. This can only happen if we have
- // an UNSIGNED BIGINT field.
- if (n > (std::numeric_limits<ulonglong>::max() - off) / inc) {
- DBUG_ASSERT(max_val == std::numeric_limits<ulonglong>::max());
- // The 'last_val' value is already equal to or larger than the largest
- // value in the sequence. Continuing would wrap around (technically
- // the behavior would be undefined). What should we do?
- // We could:
- // 1) set the new value to the last possible number in our sequence
- // as described above. The problem with this is that this
- // number could be smaller than a value in an existing row.
- // 2) set the new value to the largest possible number. This number
- // may not be in our sequence, but it is guaranteed to be equal
- // to or larger than any other value already inserted.
+ if (last_val > max_val) {
+ new_val = std::numeric_limits<ulonglong>::max();
+ } else {
+ // Loop until we can correctly update the atomic value
+ do {
+ DBUG_ASSERT(last_val > 0);
+ // Calculate the next value in the auto increment series: offset
+ // + N * increment where N is 0, 1, 2, ...
//
- // For now I'm going to take option 2.
+ // For further information please visit:
+ // http://dev.mysql.com/doc/refman/5.7/en/replication-options-master.html
//
- // Returning ULLONG_MAX from get_auto_increment will cause the SQL
- // layer to fail with ER_AUTOINC_READ_FAILED. This means that due to
- // the SE API for get_auto_increment, inserts will fail with
- // ER_AUTOINC_READ_FAILED if the column is UNSIGNED BIGINT, but
- // inserts will fail with ER_DUP_ENTRY for other types (or no failure
- // if the column is in a non-unique SK).
- new_val = std::numeric_limits<ulonglong>::max();
- auto_incr = new_val; // Store the largest value into auto_incr
- break;
- }
+ // The following is confusing so here is an explanation:
+ // To get the next number in the sequence above you subtract out the
+ // offset, calculate the next sequence (N * increment) and then add the
+ // offset back in.
+ //
+ // The additions are rearranged to avoid overflow. The following is
+ // equivalent to (last_val - 1 + inc - off) / inc. This uses the fact
+ // that (a+b)/c = a/c + b/c + (a%c + b%c)/c. To show why:
+ //
+ // (a+b)/c
+ // = (a - a%c + a%c + b - b%c + b%c) / c
+ // = (a - a%c) / c + (b - b%c) / c + (a%c + b%c) / c
+ // = a/c + b/c + (a%c + b%c) / c
+ //
+ // Now, substitute a = last_val - 1, b = inc - off, c = inc to get the
+ // following statement.
+ ulonglong n =
+ (last_val - 1) / inc + ((last_val - 1) % inc + inc - off) / inc;
+
+ // Check if n * inc + off will overflow. This can only happen if we have
+ // an UNSIGNED BIGINT field.
+ if (n > (std::numeric_limits<ulonglong>::max() - off) / inc) {
+ DBUG_ASSERT(max_val == std::numeric_limits<ulonglong>::max());
+ // The 'last_val' value is already equal to or larger than the largest
+ // value in the sequence. Continuing would wrap around (technically
+ // the behavior would be undefined). What should we do?
+ // We could:
+ // 1) set the new value to the last possible number in our sequence
+ // as described above. The problem with this is that this
+ // number could be smaller than a value in an existing row.
+ // 2) set the new value to the largest possible number. This number
+ // may not be in our sequence, but it is guaranteed to be equal
+ // to or larger than any other value already inserted.
+ //
+ // For now I'm going to take option 2.
+ //
+ // Returning ULLONG_MAX from get_auto_increment will cause the SQL
+ // layer to fail with ER_AUTOINC_READ_FAILED. This means that due to
+ // the SE API for get_auto_increment, inserts will fail with
+ // ER_AUTOINC_READ_FAILED if the column is UNSIGNED BIGINT, but
+ // inserts will fail with ER_DUP_ENTRY for other types (or no failure
+ // if the column is in a non-unique SK).
+ new_val = std::numeric_limits<ulonglong>::max();
+ auto_incr = new_val; // Store the largest value into auto_incr
+ break;
+ }
- new_val = n * inc + off;
+ new_val = n * inc + off;
- // Attempt to store the new value (plus 1 since m_auto_incr_val contains
- // the next available value) into the atomic value. If the current
- // value no longer matches what we have in 'last_val' this will fail and
- // we will repeat the loop (`last_val` will automatically get updated
- // with the current value).
- //
- // See above explanation for inc == 1 for why we use std::min.
- } while (!auto_incr.compare_exchange_weak(last_val,
- std::min(new_val + 1, max_val)));
+ // Attempt to store the new value (plus 1 since m_auto_incr_val contains
+ // the next available value) into the atomic value. If the current
+ // value no longer matches what we have in 'last_val' this will fail and
+ // we will repeat the loop (`last_val` will automatically get updated
+ // with the current value).
+ //
+ // See above explanation for inc == 1 for why we use std::min.
+ } while (!auto_incr.compare_exchange_weak(
+ last_val, std::min(new_val + 1, max_val)));
+ }
}
*first_value = new_val;
@@ -12044,13 +12263,13 @@ const char *dbug_print_item(Item *const item) {
char *const buf = dbug_item_print_buf;
String str(buf, sizeof(dbug_item_print_buf), &my_charset_bin);
str.length(0);
- if (!item)
- return "(Item*)nullptr";
+ if (!item) return "(Item*)nullptr";
item->print(&str, QT_ORDINARY);
- if (str.c_ptr() == buf)
+ if (str.c_ptr() == buf) {
return buf;
- else
+ } else {
return "Couldn't fit into buffer";
+ }
}
#endif /*DBUG_OFF*/
@@ -12179,7 +12398,6 @@ bool ha_rocksdb::prepare_inplace_alter_table(
my_core::Alter_inplace_info::DROP_UNIQUE_INDEX |
my_core::Alter_inplace_info::ADD_INDEX |
my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) {
-
if (has_hidden_pk(altered_table)) {
new_n_keys += 1;
}
@@ -12466,7 +12684,7 @@ int ha_rocksdb::inplace_populate_sk(
const int new_packed_size = index->pack_record(
new_table_arg, m_pack_buffer, table->record[0], m_sk_packed_tuple,
&m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0,
- nullptr, nullptr, m_ttl_bytes);
+ nullptr, m_ttl_bytes);
const rocksdb::Slice key = rocksdb::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size);
@@ -12514,9 +12732,9 @@ int ha_rocksdb::inplace_populate_sk(
is used inside print_keydup_error so that the error message shows
the duplicate record.
*/
- if (index->unpack_record(new_table_arg, new_table_arg->record[0],
- &merge_key, nullptr,
- m_verify_row_debug_checksums)) {
+ if (index->unpack_record(
+ new_table_arg, new_table_arg->record[0], &merge_key,
+ &merge_val, m_converter->get_verify_row_debug_checksums())) {
/* Should never reach here */
DBUG_ASSERT(0);
}
@@ -12546,7 +12764,9 @@ int ha_rocksdb::inplace_populate_sk(
DBUG_RETURN(res);
}
- if ((res = tx->finish_bulk_load())) {
+ bool is_critical_error;
+ res = tx->finish_bulk_load(&is_critical_error);
+ if (res && is_critical_error) {
// NO_LINT_DEBUG
sql_print_error("Error finishing bulk load.");
DBUG_RETURN(res);
@@ -12756,22 +12976,22 @@ bool ha_rocksdb::commit_inplace_alter_table(
#define SHOW_FNAME(name) rocksdb_show_##name
-#define DEF_SHOW_FUNC(name, key) \
- static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR * var, char *buff) { \
- rocksdb_status_counters.name = \
- rocksdb_stats->getTickerCount(rocksdb::key); \
- var->type = SHOW_LONGLONG; \
- var->value = (char *)&rocksdb_status_counters.name; \
- return HA_EXIT_SUCCESS; \
+#define DEF_SHOW_FUNC(name, key) \
+ static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR * var, char *buff) { \
+ rocksdb_status_counters.name = \
+ rocksdb_stats->getTickerCount(rocksdb::key); \
+ var->type = SHOW_LONGLONG; \
+ var->value = reinterpret_cast<char *>(&rocksdb_status_counters.name); \
+ return HA_EXIT_SUCCESS; \
}
-#define DEF_STATUS_VAR(name) \
+#define DEF_STATUS_VAR(name) \
{ "rocksdb_" #name, (char *)&SHOW_FNAME(name), SHOW_FUNC }
-#define DEF_STATUS_VAR_PTR(name, ptr, option) \
+#define DEF_STATUS_VAR_PTR(name, ptr, option) \
{ "rocksdb_" name, (char *)ptr, option }
-#define DEF_STATUS_VAR_FUNC(name, ptr, option) \
+#define DEF_STATUS_VAR_FUNC(name, ptr, option) \
{ name, reinterpret_cast<char *>(ptr), option }
struct rocksdb_status_counters_t {
@@ -13001,9 +13221,8 @@ static void show_myrocks_vars(THD *thd, SHOW_VAR *var, char *buff) {
var->value = reinterpret_cast<char *>(&myrocks_status_variables);
}
-static ulonglong
-io_stall_prop_value(const std::map<std::string, std::string> &props,
- const std::string &key) {
+static ulonglong io_stall_prop_value(
+ const std::map<std::string, std::string> &props, const std::string &key) {
std::map<std::string, std::string>::const_iterator iter =
props.find("io_stalls." + key);
if (iter != props.end()) {
@@ -13181,6 +13400,10 @@ static SHOW_VAR rocksdb_status_vars[] = {
SHOW_LONGLONG),
DEF_STATUS_VAR_PTR("number_sst_entry_other", &rocksdb_num_sst_entry_other,
SHOW_LONGLONG),
+#ifndef DBUG_OFF
+ DEF_STATUS_VAR_PTR("num_get_for_update_calls",
+ &rocksdb_num_get_for_update_calls, SHOW_LONGLONG),
+#endif
// the variables generated by SHOW_FUNC are sorted only by prefix (first
// arg in the tuple below), so make sure it is unique to make sorting
// deterministic as quick sort is not stable
@@ -13422,6 +13645,51 @@ bool Rdb_manual_compaction_thread::is_manual_compaction_finished(int mc_id) {
return finished;
}
+/**
+ * Locking read + Not Found + Read Committed occurs if we accessed
+ * a row by Seek, tried to lock it, failed, released and reacquired the
+ * snapshot (because of READ COMMITTED mode) and the row was deleted by
+ * someone else in the meantime.
+ * If so, we either just skipping the row, or re-creating a snapshot
+ * and seek again. In both cases, Read Committed constraint is not broken.
+ */
+bool ha_rocksdb::should_skip_invalidated_record(const int rc) {
+ if ((m_lock_rows != RDB_LOCK_NONE && rc == HA_ERR_KEY_NOT_FOUND &&
+ my_core::thd_tx_isolation(ha_thd()) == ISO_READ_COMMITTED)) {
+ return true;
+ }
+ return false;
+}
+/**
+ * Indicating snapshot needs to be re-created and retrying seek again,
+ * instead of returning errors or empty set. This is normally applicable
+ * when hitting kBusy when locking the first row of the transaction,
+ * with Repeatable Read isolation level.
+ */
+bool ha_rocksdb::should_recreate_snapshot(const int rc,
+ const bool is_new_snapshot) {
+ if (should_skip_invalidated_record(rc) ||
+ (rc == HA_ERR_ROCKSDB_STATUS_BUSY && is_new_snapshot)) {
+ return true;
+ }
+ return false;
+}
+
+/**
+ * If calling put/delete/singledelete without locking the row,
+ * it is necessary to pass assume_tracked=false to RocksDB TX API.
+ * Read Free Replication and Blind Deletes are the cases when
+ * using TX API and skipping row locking.
+ */
+bool ha_rocksdb::can_assume_tracked(THD *thd) {
+#ifdef MARIAROCKS_NOT_YET
+ if (use_read_free_rpl() || (THDVAR(thd, blind_delete_primary_key))) {
+ return false;
+ }
+#endif
+ return true;
+}
+
bool ha_rocksdb::check_bloom_and_set_bounds(
THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond,
const bool use_all_keys, size_t bound_len, uchar *const lower_bound,
@@ -13482,20 +13750,22 @@ bool ha_rocksdb::can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
shorter require all parts of the key to be available
for the short key match.
*/
- if ((use_all_keys && prefix_extractor->InRange(eq_cond))
- || prefix_extractor->SameResultWhenAppended(eq_cond))
+ if ((use_all_keys && prefix_extractor->InRange(eq_cond)) ||
+ prefix_extractor->SameResultWhenAppended(eq_cond)) {
can_use = true;
- else
+ } else {
can_use = false;
+ }
} else {
/*
if prefix extractor is not defined, all key parts have to be
used by eq_cond.
*/
- if (use_all_keys)
+ if (use_all_keys) {
can_use = true;
- else
+ } else {
can_use = false;
+ }
}
return can_use;
@@ -13514,7 +13784,7 @@ bool rdb_is_ttl_enabled() { return rocksdb_enable_ttl; }
bool rdb_is_ttl_read_filtering_enabled() {
return rocksdb_enable_ttl_read_filtering;
}
-#ifndef NDEBUG
+#ifndef DBUG_OFF
int rdb_dbug_set_ttl_rec_ts() { return rocksdb_debug_ttl_rec_ts; }
int rdb_dbug_set_ttl_snapshot_ts() { return rocksdb_debug_ttl_snapshot_ts; }
int rdb_dbug_set_ttl_read_filter_ts() {
@@ -13561,17 +13831,17 @@ const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type) {
static_assert(RDB_IO_ERROR_LAST == 4, "Please handle all the error types.");
switch (err_type) {
- case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_TX_COMMIT:
- return "RDB_IO_ERROR_TX_COMMIT";
- case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_DICT_COMMIT:
- return "RDB_IO_ERROR_DICT_COMMIT";
- case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_BG_THREAD:
- return "RDB_IO_ERROR_BG_THREAD";
- case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_GENERAL:
- return "RDB_IO_ERROR_GENERAL";
- default:
- DBUG_ASSERT(false);
- return "(unknown)";
+ case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_TX_COMMIT:
+ return "RDB_IO_ERROR_TX_COMMIT";
+ case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_DICT_COMMIT:
+ return "RDB_IO_ERROR_DICT_COMMIT";
+ case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_BG_THREAD:
+ return "RDB_IO_ERROR_BG_THREAD";
+ case RDB_IO_ERROR_TYPE::RDB_IO_ERROR_GENERAL:
+ return "RDB_IO_ERROR_GENERAL";
+ default:
+ DBUG_ASSERT(false);
+ return "(unknown)";
}
}
@@ -13583,32 +13853,38 @@ const char *get_rdb_io_error_string(const RDB_IO_ERROR_TYPE err_type) {
void rdb_handle_io_error(const rocksdb::Status status,
const RDB_IO_ERROR_TYPE err_type) {
if (status.IsIOError()) {
- switch (err_type) {
- case RDB_IO_ERROR_TX_COMMIT:
- case RDB_IO_ERROR_DICT_COMMIT: {
- rdb_log_status_error(status, "failed to write to WAL");
- /* NO_LINT_DEBUG */
- sql_print_error("MyRocks: aborting on WAL write error.");
- abort();
- break;
- }
- case RDB_IO_ERROR_BG_THREAD: {
- rdb_log_status_error(status, "BG thread failed to write to RocksDB");
- /* NO_LINT_DEBUG */
- sql_print_error("MyRocks: aborting on BG write error.");
- abort();
- break;
- }
- case RDB_IO_ERROR_GENERAL: {
- rdb_log_status_error(status, "failed on I/O");
- /* NO_LINT_DEBUG */
- sql_print_error("MyRocks: aborting on I/O error.");
- abort();
- break;
+ /* skip dumping core if write failed and we are allowed to do so */
+#ifdef MARIAROCKS_NOT_YET
+ if (skip_core_dump_on_error) {
+ opt_core_file = false;
}
- default:
- DBUG_ASSERT(0);
- break;
+#endif
+ switch (err_type) {
+ case RDB_IO_ERROR_TX_COMMIT:
+ case RDB_IO_ERROR_DICT_COMMIT: {
+ rdb_log_status_error(status, "failed to write to WAL");
+ /* NO_LINT_DEBUG */
+ sql_print_error("MyRocks: aborting on WAL write error.");
+ abort();
+ break;
+ }
+ case RDB_IO_ERROR_BG_THREAD: {
+ rdb_log_status_error(status, "BG thread failed to write to RocksDB");
+ /* NO_LINT_DEBUG */
+ sql_print_error("MyRocks: aborting on BG write error.");
+ abort();
+ break;
+ }
+ case RDB_IO_ERROR_GENERAL: {
+ rdb_log_status_error(status, "failed on I/O");
+ /* NO_LINT_DEBUG */
+ sql_print_error("MyRocks: aborting on I/O error.");
+ abort();
+ break;
+ }
+ default:
+ DBUG_ASSERT(0);
+ break;
}
} else if (status.IsCorruption()) {
rdb_log_status_error(status, "data corruption detected!");
@@ -13618,16 +13894,16 @@ void rdb_handle_io_error(const rocksdb::Status status,
abort();
} else if (!status.ok()) {
switch (err_type) {
- case RDB_IO_ERROR_DICT_COMMIT: {
- rdb_log_status_error(status, "Failed to write to WAL (dictionary)");
- /* NO_LINT_DEBUG */
- sql_print_error("MyRocks: aborting on WAL write error.");
- abort();
- break;
- }
- default:
- rdb_log_status_error(status, "Failed to read/write in RocksDB");
- break;
+ case RDB_IO_ERROR_DICT_COMMIT: {
+ rdb_log_status_error(status, "Failed to write to WAL (dictionary)");
+ /* NO_LINT_DEBUG */
+ sql_print_error("MyRocks: aborting on WAL write error.");
+ abort();
+ break;
+ }
+ default:
+ rdb_log_status_error(status, "Failed to read/write in RocksDB");
+ break;
}
}
}
@@ -13735,9 +14011,10 @@ void rocksdb_set_delayed_write_rate(THD *thd, struct st_mysql_sys_var *var,
if (!s.ok()) {
/* NO_LINT_DEBUG */
- sql_print_warning("MyRocks: failed to update delayed_write_rate. "
- "status code = %d, status = %s",
- s.code(), s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to update delayed_write_rate. "
+ "status code = %d, status = %s",
+ s.code(), s.ToString().c_str());
}
}
RDB_MUTEX_UNLOCK_CHECK(rdb_sysvars_mutex);
@@ -13795,8 +14072,7 @@ int mysql_value_to_bool(struct st_mysql_value *value, my_bool *return_value) {
} else if (new_value_type == MYSQL_VALUE_TYPE_INT) {
long long intbuf;
value->val_int(value, &intbuf);
- if (intbuf > 1)
- return 1;
+ if (intbuf > 1) return 1;
*return_value = intbuf > 0 ? TRUE : FALSE;
} else {
return 1;
@@ -13815,12 +14091,14 @@ int rocksdb_check_bulk_load(
Rdb_transaction *tx = get_tx_from_thd(thd);
if (tx != nullptr) {
- const int rc = tx->finish_bulk_load();
- if (rc != 0) {
+ bool is_critical_error;
+ const int rc = tx->finish_bulk_load(&is_critical_error);
+ if (rc != 0 && is_critical_error) {
// NO_LINT_DEBUG
- sql_print_error("RocksDB: Error %d finalizing last SST file while "
- "setting bulk loading variable",
- rc);
+ sql_print_error(
+ "RocksDB: Error %d finalizing last SST file while "
+ "setting bulk loading variable",
+ rc);
THDVAR(thd, bulk_load) = 0;
return 1;
}
@@ -13868,9 +14146,10 @@ static void rocksdb_set_max_background_jobs(THD *thd,
if (!s.ok()) {
/* NO_LINT_DEBUG */
- sql_print_warning("MyRocks: failed to update max_background_jobs. "
- "Status code = %d, status = %s.",
- s.code(), s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to update max_background_jobs. "
+ "Status code = %d, status = %s.",
+ s.code(), s.ToString().c_str());
}
}
@@ -13896,9 +14175,10 @@ static void rocksdb_set_bytes_per_sync(
if (!s.ok()) {
/* NO_LINT_DEBUG */
- sql_print_warning("MyRocks: failed to update max_background_jobs. "
- "Status code = %d, status = %s.",
- s.code(), s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to update max_background_jobs. "
+ "Status code = %d, status = %s.",
+ s.code(), s.ToString().c_str());
}
}
@@ -13924,9 +14204,10 @@ static void rocksdb_set_wal_bytes_per_sync(
if (!s.ok()) {
/* NO_LINT_DEBUG */
- sql_print_warning("MyRocks: failed to update max_background_jobs. "
- "Status code = %d, status = %s.",
- s.code(), s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to update max_background_jobs. "
+ "Status code = %d, status = %s.",
+ s.code(), s.ToString().c_str());
}
}
@@ -13953,7 +14234,7 @@ static int rocksdb_validate_set_block_cache_size(
}
if (new_value < RDB_MIN_BLOCK_CACHE_SIZE ||
- (uint64_t)new_value > (uint64_t)LONGLONG_MAX) {
+ (uint64_t)new_value > (uint64_t)LLONG_MAX) {
return HA_EXIT_FAILURE;
}
@@ -13969,17 +14250,19 @@ static int rocksdb_validate_set_block_cache_size(
return HA_EXIT_SUCCESS;
}
-static int
-rocksdb_validate_update_cf_options(THD * /* unused */,
- struct st_mysql_sys_var * /*unused*/,
- void *save, struct st_mysql_value *value) {
-
+static int rocksdb_validate_update_cf_options(
+ THD * /* unused */, struct st_mysql_sys_var * /*unused*/, void *save,
+ struct st_mysql_value *value) {
char buff[STRING_BUFFER_USUAL_SIZE];
const char *str;
int length;
length = sizeof(buff);
str = value->val_str(value, buff, &length);
- *(const char **)save = str;
+ // In some cases, str can point to buff in the stack.
+ // This can cause invalid memory access after validation is finished.
+ // To avoid this kind case, let's alway duplicate the str if str is not
+ // nullptr
+ *(const char **)save = (str == nullptr) ? nullptr : my_strdup(str, MYF(0));
if (str == nullptr) {
return HA_EXIT_SUCCESS;
@@ -13993,13 +14276,17 @@ rocksdb_validate_update_cf_options(THD * /* unused */,
my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "rocksdb_update_cf_options", str);
return HA_EXIT_FAILURE;
}
+ // Loop through option_map and create missing column families
+ for (Rdb_cf_options::Name_to_config_t::iterator it = option_map.begin();
+ it != option_map.end(); ++it) {
+ cf_manager.get_or_create_cf(rdb, it->first);
+ }
return HA_EXIT_SUCCESS;
}
-static void
-rocksdb_set_update_cf_options(THD *const /* unused */,
- struct st_mysql_sys_var *const /* unused */,
- void *const var_ptr, const void *const save) {
+static void rocksdb_set_update_cf_options(
+ THD *const /* unused */, struct st_mysql_sys_var *const /* unused */,
+ void *const var_ptr, const void *const save) {
const char *const val = *static_cast<const char *const *>(save);
RDB_MUTEX_LOCK_CHECK(rdb_sysvars_mutex);
@@ -14017,7 +14304,7 @@ rocksdb_set_update_cf_options(THD *const /* unused */,
// Reset the pointers regardless of how much success we had with updating
// the CF options. This will results in consistent behavior and avoids
// dealing with cases when only a subset of CF-s was successfully updated.
- *reinterpret_cast<char **>(var_ptr) = my_strdup(val, MYF(0));
+ *reinterpret_cast<const char **>(var_ptr) = val;
// Do the real work of applying the changes.
Rdb_cf_options::Name_to_config_t option_map;
@@ -14045,9 +14332,10 @@ rocksdb_set_update_cf_options(THD *const /* unused */,
if (s != rocksdb::Status::OK()) {
// NO_LINT_DEBUG
- sql_print_warning("MyRocks: failed to convert the options for column "
- "family '%s' to a map. %s", cf_name.c_str(),
- s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to convert the options for column "
+ "family '%s' to a map. %s",
+ cf_name.c_str(), s.ToString().c_str());
} else {
DBUG_ASSERT(rdb != nullptr);
@@ -14056,14 +14344,16 @@ rocksdb_set_update_cf_options(THD *const /* unused */,
if (s != rocksdb::Status::OK()) {
// NO_LINT_DEBUG
- sql_print_warning("MyRocks: failed to apply the options for column "
- "family '%s'. %s", cf_name.c_str(),
- s.ToString().c_str());
+ sql_print_warning(
+ "MyRocks: failed to apply the options for column "
+ "family '%s'. %s",
+ cf_name.c_str(), s.ToString().c_str());
} else {
// NO_LINT_DEBUG
- sql_print_information("MyRocks: options for column family '%s' "
- "have been successfully updated.",
- cf_name.c_str());
+ sql_print_information(
+ "MyRocks: options for column family '%s' "
+ "have been successfully updated.",
+ cf_name.c_str());
// Make sure that data is internally consistent as well and update
// the CF options. This is necessary also to make sure that the CF
@@ -14126,18 +14416,33 @@ void ha_rocksdb::rpl_after_update_rows() {
DBUG_VOID_RETURN;
}
+bool ha_rocksdb::is_read_free_rpl_table() const {
+ return table->s && m_tbl_def->m_is_read_free_rpl_table;
+}
+
/**
@brief
- Read Free Replication can be used or not. Returning False means
- Read Free Replication can be used. Read Free Replication can be used
- on UPDATE or DELETE row events, and table must have user defined
- primary key.
+ Read Free Replication can be used or not. Returning true means
+ Read Free Replication can be used.
*/
-bool ha_rocksdb::use_read_free_rpl() {
+bool ha_rocksdb::use_read_free_rpl() const {
DBUG_ENTER_FUNC();
- DBUG_RETURN((m_in_rpl_delete_rows || m_in_rpl_update_rows) &&
- !has_hidden_pk(table) && m_use_read_free_rpl);
+ if (!ha_thd()->rli_slave || table->triggers || !is_read_free_rpl_table()) {
+ DBUG_RETURN(false);
+ }
+
+ switch (rocksdb_read_free_rpl) {
+ case read_free_rpl_type::OFF:
+ DBUG_RETURN(false);
+ case read_free_rpl_type::PK_ONLY:
+ DBUG_RETURN(!has_hidden_pk(table) && table->s->keys == 1);
+ case read_free_rpl_type::PK_SK:
+ DBUG_RETURN(!has_hidden_pk(table));
+ }
+
+ DBUG_ASSERT(false);
+ DBUG_RETURN(false);
}
#endif // MARIAROCKS_NOT_YET
@@ -14176,7 +14481,7 @@ void sql_print_verbose_info(const char *format, ...)
}
}
-} // namespace myrocks
+} // namespace myrocks
/**