summaryrefslogtreecommitdiff
path: root/storage/rocksdb/rdb_sst_info.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.cc')
-rw-r--r--storage/rocksdb/rdb_sst_info.cc233
1 files changed, 87 insertions, 146 deletions
diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc
index 6201c9f0207..9f470ea2fef 100644
--- a/storage/rocksdb/rdb_sst_info.cc
+++ b/storage/rocksdb/rdb_sst_info.cc
@@ -51,8 +51,13 @@ Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file(
rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
const rocksdb::DBOptions &db_options, const std::string &name,
const bool tracing)
- : m_db(db), m_cf(cf), m_db_options(db_options), m_sst_file_writer(nullptr),
- m_name(name), m_tracing(tracing), m_comparator(cf->GetComparator()) {
+ : m_db(db),
+ m_cf(cf),
+ m_db_options(db_options),
+ m_sst_file_writer(nullptr),
+ m_name(name),
+ m_tracing(tracing),
+ m_comparator(cf->GetComparator()) {
DBUG_ASSERT(db != nullptr);
DBUG_ASSERT(cf != nullptr);
}
@@ -61,11 +66,6 @@ Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() {
// Make sure we clean up
delete m_sst_file_writer;
m_sst_file_writer = nullptr;
-
- // In case something went wrong attempt to delete the temporary file.
- // If everything went fine that file will have been renamed and this
- // function call will fail.
- std::remove(m_name.c_str());
}
rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
@@ -102,9 +102,8 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
return s;
}
-rocksdb::Status
-Rdb_sst_file_ordered::Rdb_sst_file::put(const rocksdb::Slice &key,
- const rocksdb::Slice &value) {
+rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::put(
+ const rocksdb::Slice &key, const rocksdb::Slice &value) {
DBUG_ASSERT(m_sst_file_writer != nullptr);
#ifdef __GNUC__
@@ -118,8 +117,8 @@ Rdb_sst_file_ordered::Rdb_sst_file::put(const rocksdb::Slice &key,
return m_sst_file_writer->Add(key, value);
}
-std::string
-Rdb_sst_file_ordered::Rdb_sst_file::generateKey(const std::string &key) {
+std::string Rdb_sst_file_ordered::Rdb_sst_file::generateKey(
+ const std::string &key) {
static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
@@ -140,7 +139,7 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
DBUG_ASSERT(m_sst_file_writer != nullptr);
rocksdb::Status s;
- rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified
+ rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified
// Close out the sst file
s = m_sst_file_writer->Finish(&fileinfo);
@@ -153,30 +152,15 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
if (s.ok()) {
if (m_tracing) {
// NO_LINT_DEBUG
- sql_print_information("SST Tracing: Adding file %s, smallest key: %s, "
- "largest key: %s, file size: %" PRIu64 ", "
- "num_entries: %" PRIu64,
- fileinfo.file_path.c_str(),
- generateKey(fileinfo.smallest_key).c_str(),
- generateKey(fileinfo.largest_key).c_str(),
- fileinfo.file_size, fileinfo.num_entries);
- }
-
- // Add the file to the database
- // Set the snapshot_consistency parameter to false since no one
- // should be accessing the table we are bulk loading
- rocksdb::IngestExternalFileOptions opts;
- opts.move_files = true;
- opts.snapshot_consistency = false;
- opts.allow_global_seqno = false;
- opts.allow_blocking_flush = false;
- s = m_db->IngestExternalFile(m_cf, {m_name}, opts);
-
- if (m_tracing) {
- // NO_LINT_DEBUG
- sql_print_information("SST Tracing: AddFile(%s) returned %s",
- fileinfo.file_path.c_str(),
- s.ok() ? "ok" : "not ok");
+ sql_print_information(
+ "SST Tracing: Adding file %s, smallest key: %s, "
+ "largest key: %s, file size: %" PRIu64
+ ", "
+ "num_entries: %" PRIu64,
+ fileinfo.file_path.c_str(),
+ generateKey(fileinfo.smallest_key).c_str(),
+ generateKey(fileinfo.largest_key).c_str(), fileinfo.file_size,
+ fileinfo.num_entries);
}
}
@@ -222,7 +206,9 @@ Rdb_sst_file_ordered::Rdb_sst_file_ordered(
rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
const rocksdb::DBOptions &db_options, const std::string &name,
const bool tracing, size_t max_size)
- : m_use_stack(false), m_first(true), m_stack(max_size),
+ : m_use_stack(false),
+ m_first(true),
+ m_stack(max_size),
m_file(db, cf, db_options, name, tracing) {
m_stack.reset();
}
@@ -329,21 +315,26 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
const std::string &indexname,
rocksdb::ColumnFamilyHandle *const cf,
const rocksdb::DBOptions &db_options,
- const bool &tracing)
- : m_db(db), m_cf(cf), m_db_options(db_options), m_curr_size(0),
- m_sst_count(0), m_background_error(HA_EXIT_SUCCESS), m_committed(false),
-#if defined(RDB_SST_INFO_USE_THREAD)
- m_queue(), m_mutex(), m_cond(), m_thread(nullptr), m_finished(false),
-#endif
- m_sst_file(nullptr), m_tracing(tracing), m_print_client_error(true) {
+ const bool tracing)
+ : m_db(db),
+ m_cf(cf),
+ m_db_options(db_options),
+ m_curr_size(0),
+ m_sst_count(0),
+ m_background_error(HA_EXIT_SUCCESS),
+ m_done(false),
+ m_sst_file(nullptr),
+ m_tracing(tracing),
+ m_print_client_error(true) {
m_prefix = db->GetName() + "/";
std::string normalized_table;
if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) {
// We failed to get a normalized table name. This should never happen,
// but handle it anyway.
- m_prefix += "fallback_" + std::to_string(reinterpret_cast<intptr_t>(
- reinterpret_cast<void *>(this))) +
+ m_prefix += "fallback_" +
+ std::to_string(reinterpret_cast<intptr_t>(
+ reinterpret_cast<void *>(this))) +
"_" + indexname + "_";
} else {
m_prefix += normalized_table + "_" + indexname + "_";
@@ -367,9 +358,15 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
Rdb_sst_info::~Rdb_sst_info() {
DBUG_ASSERT(m_sst_file == nullptr);
-#if defined(RDB_SST_INFO_USE_THREAD)
- DBUG_ASSERT(m_thread == nullptr);
-#endif
+
+ for (auto sst_file : m_committed_files) {
+ // In case something went wrong attempt to delete the temporary file.
+ // If everything went fine that file will have been renamed and this
+ // function call will fail.
+ std::remove(sst_file.c_str());
+ }
+ m_committed_files.clear();
+
mysql_mutex_destroy(&m_commit_mutex);
}
@@ -380,8 +377,8 @@ int Rdb_sst_info::open_new_sst_file() {
const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix;
// Create the new sst file object
- m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options,
- name, m_tracing, m_max_size);
+ m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, name,
+ m_tracing, m_max_size);
// Open the sst file
const rocksdb::Status s = m_sst_file->open();
@@ -397,35 +394,23 @@ int Rdb_sst_info::open_new_sst_file() {
return HA_EXIT_SUCCESS;
}
-void Rdb_sst_info::close_curr_sst_file() {
- DBUG_ASSERT(m_sst_file != nullptr);
- DBUG_ASSERT(m_curr_size > 0);
-
-#if defined(RDB_SST_INFO_USE_THREAD)
- if (m_thread == nullptr) {
- // We haven't already started a background thread, so start one
- m_thread = new std::thread(thread_fcn, this);
+void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) {
+ const rocksdb::Status s = sst_file->commit();
+ if (!s.ok()) {
+ set_error_msg(sst_file->get_name(), s);
+ set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
}
- DBUG_ASSERT(m_thread != nullptr);
+ m_committed_files.push_back(sst_file->get_name());
- {
- // Add this finished sst file to the queue (while holding mutex)
- const std::lock_guard<std::mutex> guard(m_mutex);
- m_queue.push(m_sst_file);
- }
+ delete sst_file;
+}
- // Notify the background thread that there is a new entry in the queue
- m_cond.notify_one();
-#else
- const rocksdb::Status s = m_sst_file->commit();
- if (!s.ok()) {
- set_error_msg(m_sst_file->get_name(), s);
- set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
- }
+void Rdb_sst_info::close_curr_sst_file() {
+ DBUG_ASSERT(m_sst_file != nullptr);
+ DBUG_ASSERT(m_curr_size > 0);
- delete m_sst_file;
-#endif
+ commit_sst_file(m_sst_file);
// Reset for next sst file
m_sst_file = nullptr;
@@ -435,7 +420,7 @@ void Rdb_sst_info::close_curr_sst_file() {
int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
int rc;
- DBUG_ASSERT(!m_committed);
+ DBUG_ASSERT(!m_done);
if (m_curr_size + key.size() + value.size() >= m_max_size) {
// The current sst file has reached its maximum, close it out
@@ -470,15 +455,22 @@ int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
return HA_EXIT_SUCCESS;
}
-int Rdb_sst_info::commit(bool print_client_error) {
+/*
+ Finish the current work and return the list of SST files ready to be
+ ingested. This function need to be idempotent and atomic
+ */
+int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info,
+ bool print_client_error) {
int ret = HA_EXIT_SUCCESS;
// Both the transaction clean up and the ha_rocksdb handler have
// references to this Rdb_sst_info and both can call commit, so
// synchronize on the object here.
+ // This also means in such case the bulk loading operation stop being truly
+ // atomic, and we should consider fixing this in the future
RDB_MUTEX_LOCK_CHECK(m_commit_mutex);
- if (m_committed) {
+ if (is_done()) {
RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
return ret;
}
@@ -490,20 +482,13 @@ int Rdb_sst_info::commit(bool print_client_error) {
close_curr_sst_file();
}
-#if defined(RDB_SST_INFO_USE_THREAD)
- if (m_thread != nullptr) {
- // Tell the background thread we are done
- m_finished = true;
- m_cond.notify_one();
+ // This checks out the list of files so that the caller can collect/group
+ // them and ingest them all in one go, and any racing calls to commit
+ // won't see them at all
+ commit_info->init(m_cf, std::move(m_committed_files));
+ DBUG_ASSERT(m_committed_files.size() == 0);
- // Wait for the background thread to finish
- m_thread->join();
- delete m_thread;
- m_thread = nullptr;
- }
-#endif
-
- m_committed = true;
+ m_done = true;
RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
// Did we get any errors?
@@ -517,16 +502,13 @@ int Rdb_sst_info::commit(bool print_client_error) {
void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
const rocksdb::Status &s) {
+ if (!m_print_client_error) return;
- if (!m_print_client_error)
- return;
+ report_error_msg(s, sst_file_name.c_str());
+}
-#if defined(RDB_SST_INFO_USE_THREAD)
- // Both the foreground and background threads can set the error message
- // so lock the mutex to protect it. We only want the first error that
- // we encounter.
- const std::lock_guard<std::mutex> guard(m_mutex);
-#endif
+void Rdb_sst_info::report_error_msg(const rocksdb::Status &s,
+ const char *sst_file_name) {
if (s.IsInvalidArgument() &&
strcmp(s.getState(), "Keys must be added in order") == 0) {
my_printf_error(ER_KEYS_OUT_OF_ORDER,
@@ -536,57 +518,16 @@ void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
} else if (s.IsInvalidArgument() &&
strcmp(s.getState(), "Global seqno is required, but disabled") ==
0) {
- my_printf_error(ER_OVERLAPPING_KEYS, "Rows inserted during bulk load "
- "must not overlap existing rows",
+ my_printf_error(ER_OVERLAPPING_KEYS,
+ "Rows inserted during bulk load "
+ "must not overlap existing rows",
MYF(0));
} else {
my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0),
- sst_file_name.c_str(), s.ToString().c_str());
+ sst_file_name, s.ToString().c_str());
}
}
-#if defined(RDB_SST_INFO_USE_THREAD)
-// Static thread function - the Rdb_sst_info object is in 'object'
-void Rdb_sst_info::thread_fcn(void *object) {
- reinterpret_cast<Rdb_sst_info *>(object)->run_thread();
-}
-
-void Rdb_sst_info::run_thread() {
- std::unique_lock<std::mutex> lk(m_mutex);
-
- do {
- // Wait for notification or 1 second to pass
- m_cond.wait_for(lk, std::chrono::seconds(1));
-
- // Inner loop pulls off all Rdb_sst_file_ordered entries and processes them
- while (!m_queue.empty()) {
- Rdb_sst_file_ordered *const sst_file = m_queue.front();
- m_queue.pop();
-
- // Release the lock - we don't want to hold it while committing the file
- lk.unlock();
-
- // Close out the sst file and add it to the database
- const rocksdb::Status s = sst_file->commit();
- if (!s.ok()) {
- set_error_msg(sst_file->get_name(), s);
- set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
- }
-
- delete sst_file;
-
- // Reacquire the lock for the next inner loop iteration
- lk.lock();
- }
-
- // If the queue is empty and the main thread has indicated we should exit
- // break out of the loop.
- } while (!m_finished);
-
- DBUG_ASSERT(m_queue.empty());
-}
-#endif
-
void Rdb_sst_info::init(const rocksdb::DB *const db) {
const std::string path = db->GetName() + FN_DIRSEP;
struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT));
@@ -618,4 +559,4 @@ void Rdb_sst_info::init(const rocksdb::DB *const db) {
std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0);
std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp";
-} // namespace myrocks
+} // namespace myrocks