diff options
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.cc')
-rw-r--r-- | storage/rocksdb/rdb_sst_info.cc | 233 |
1 files changed, 87 insertions, 146 deletions
diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc index 6201c9f0207..9f470ea2fef 100644 --- a/storage/rocksdb/rdb_sst_info.cc +++ b/storage/rocksdb/rdb_sst_info.cc @@ -51,8 +51,13 @@ Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file( rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf, const rocksdb::DBOptions &db_options, const std::string &name, const bool tracing) - : m_db(db), m_cf(cf), m_db_options(db_options), m_sst_file_writer(nullptr), - m_name(name), m_tracing(tracing), m_comparator(cf->GetComparator()) { + : m_db(db), + m_cf(cf), + m_db_options(db_options), + m_sst_file_writer(nullptr), + m_name(name), + m_tracing(tracing), + m_comparator(cf->GetComparator()) { DBUG_ASSERT(db != nullptr); DBUG_ASSERT(cf != nullptr); } @@ -61,11 +66,6 @@ Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() { // Make sure we clean up delete m_sst_file_writer; m_sst_file_writer = nullptr; - - // In case something went wrong attempt to delete the temporary file. - // If everything went fine that file will have been renamed and this - // function call will fail. - std::remove(m_name.c_str()); } rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() { @@ -102,9 +102,8 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() { return s; } -rocksdb::Status -Rdb_sst_file_ordered::Rdb_sst_file::put(const rocksdb::Slice &key, - const rocksdb::Slice &value) { +rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::put( + const rocksdb::Slice &key, const rocksdb::Slice &value) { DBUG_ASSERT(m_sst_file_writer != nullptr); #ifdef __GNUC__ @@ -118,8 +117,8 @@ Rdb_sst_file_ordered::Rdb_sst_file::put(const rocksdb::Slice &key, return m_sst_file_writer->Add(key, value); } -std::string -Rdb_sst_file_ordered::Rdb_sst_file::generateKey(const std::string &key) { +std::string Rdb_sst_file_ordered::Rdb_sst_file::generateKey( + const std::string &key) { static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; @@ -140,7 +139,7 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() { DBUG_ASSERT(m_sst_file_writer != nullptr); rocksdb::Status s; - rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified + rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified // Close out the sst file s = m_sst_file_writer->Finish(&fileinfo); @@ -153,30 +152,15 @@ rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() { if (s.ok()) { if (m_tracing) { // NO_LINT_DEBUG - sql_print_information("SST Tracing: Adding file %s, smallest key: %s, " - "largest key: %s, file size: %" PRIu64 ", " - "num_entries: %" PRIu64, - fileinfo.file_path.c_str(), - generateKey(fileinfo.smallest_key).c_str(), - generateKey(fileinfo.largest_key).c_str(), - fileinfo.file_size, fileinfo.num_entries); - } - - // Add the file to the database - // Set the snapshot_consistency parameter to false since no one - // should be accessing the table we are bulk loading - rocksdb::IngestExternalFileOptions opts; - opts.move_files = true; - opts.snapshot_consistency = false; - opts.allow_global_seqno = false; - opts.allow_blocking_flush = false; - s = m_db->IngestExternalFile(m_cf, {m_name}, opts); - - if (m_tracing) { - // NO_LINT_DEBUG - sql_print_information("SST Tracing: AddFile(%s) returned %s", - fileinfo.file_path.c_str(), - s.ok() ? "ok" : "not ok"); + sql_print_information( + "SST Tracing: Adding file %s, smallest key: %s, " + "largest key: %s, file size: %" PRIu64 + ", " + "num_entries: %" PRIu64, + fileinfo.file_path.c_str(), + generateKey(fileinfo.smallest_key).c_str(), + generateKey(fileinfo.largest_key).c_str(), fileinfo.file_size, + fileinfo.num_entries); } } @@ -222,7 +206,9 @@ Rdb_sst_file_ordered::Rdb_sst_file_ordered( rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf, const rocksdb::DBOptions &db_options, const std::string &name, const bool tracing, size_t max_size) - : m_use_stack(false), m_first(true), m_stack(max_size), + : m_use_stack(false), + m_first(true), + m_stack(max_size), m_file(db, cf, db_options, name, tracing) { m_stack.reset(); } @@ -329,21 +315,26 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename, const std::string &indexname, rocksdb::ColumnFamilyHandle *const cf, const rocksdb::DBOptions &db_options, - const bool &tracing) - : m_db(db), m_cf(cf), m_db_options(db_options), m_curr_size(0), - m_sst_count(0), m_background_error(HA_EXIT_SUCCESS), m_committed(false), -#if defined(RDB_SST_INFO_USE_THREAD) - m_queue(), m_mutex(), m_cond(), m_thread(nullptr), m_finished(false), -#endif - m_sst_file(nullptr), m_tracing(tracing), m_print_client_error(true) { + const bool tracing) + : m_db(db), + m_cf(cf), + m_db_options(db_options), + m_curr_size(0), + m_sst_count(0), + m_background_error(HA_EXIT_SUCCESS), + m_done(false), + m_sst_file(nullptr), + m_tracing(tracing), + m_print_client_error(true) { m_prefix = db->GetName() + "/"; std::string normalized_table; if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) { // We failed to get a normalized table name. This should never happen, // but handle it anyway. - m_prefix += "fallback_" + std::to_string(reinterpret_cast<intptr_t>( - reinterpret_cast<void *>(this))) + + m_prefix += "fallback_" + + std::to_string(reinterpret_cast<intptr_t>( + reinterpret_cast<void *>(this))) + "_" + indexname + "_"; } else { m_prefix += normalized_table + "_" + indexname + "_"; @@ -367,9 +358,15 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename, Rdb_sst_info::~Rdb_sst_info() { DBUG_ASSERT(m_sst_file == nullptr); -#if defined(RDB_SST_INFO_USE_THREAD) - DBUG_ASSERT(m_thread == nullptr); -#endif + + for (auto sst_file : m_committed_files) { + // In case something went wrong attempt to delete the temporary file. + // If everything went fine that file will have been renamed and this + // function call will fail. + std::remove(sst_file.c_str()); + } + m_committed_files.clear(); + mysql_mutex_destroy(&m_commit_mutex); } @@ -380,8 +377,8 @@ int Rdb_sst_info::open_new_sst_file() { const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix; // Create the new sst file object - m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, - name, m_tracing, m_max_size); + m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, name, + m_tracing, m_max_size); // Open the sst file const rocksdb::Status s = m_sst_file->open(); @@ -397,35 +394,23 @@ int Rdb_sst_info::open_new_sst_file() { return HA_EXIT_SUCCESS; } -void Rdb_sst_info::close_curr_sst_file() { - DBUG_ASSERT(m_sst_file != nullptr); - DBUG_ASSERT(m_curr_size > 0); - -#if defined(RDB_SST_INFO_USE_THREAD) - if (m_thread == nullptr) { - // We haven't already started a background thread, so start one - m_thread = new std::thread(thread_fcn, this); +void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) { + const rocksdb::Status s = sst_file->commit(); + if (!s.ok()) { + set_error_msg(sst_file->get_name(), s); + set_background_error(HA_ERR_ROCKSDB_BULK_LOAD); } - DBUG_ASSERT(m_thread != nullptr); + m_committed_files.push_back(sst_file->get_name()); - { - // Add this finished sst file to the queue (while holding mutex) - const std::lock_guard<std::mutex> guard(m_mutex); - m_queue.push(m_sst_file); - } + delete sst_file; +} - // Notify the background thread that there is a new entry in the queue - m_cond.notify_one(); -#else - const rocksdb::Status s = m_sst_file->commit(); - if (!s.ok()) { - set_error_msg(m_sst_file->get_name(), s); - set_background_error(HA_ERR_ROCKSDB_BULK_LOAD); - } +void Rdb_sst_info::close_curr_sst_file() { + DBUG_ASSERT(m_sst_file != nullptr); + DBUG_ASSERT(m_curr_size > 0); - delete m_sst_file; -#endif + commit_sst_file(m_sst_file); // Reset for next sst file m_sst_file = nullptr; @@ -435,7 +420,7 @@ void Rdb_sst_info::close_curr_sst_file() { int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) { int rc; - DBUG_ASSERT(!m_committed); + DBUG_ASSERT(!m_done); if (m_curr_size + key.size() + value.size() >= m_max_size) { // The current sst file has reached its maximum, close it out @@ -470,15 +455,22 @@ int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) { return HA_EXIT_SUCCESS; } -int Rdb_sst_info::commit(bool print_client_error) { +/* + Finish the current work and return the list of SST files ready to be + ingested. This function need to be idempotent and atomic + */ +int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info, + bool print_client_error) { int ret = HA_EXIT_SUCCESS; // Both the transaction clean up and the ha_rocksdb handler have // references to this Rdb_sst_info and both can call commit, so // synchronize on the object here. + // This also means in such case the bulk loading operation stop being truly + // atomic, and we should consider fixing this in the future RDB_MUTEX_LOCK_CHECK(m_commit_mutex); - if (m_committed) { + if (is_done()) { RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex); return ret; } @@ -490,20 +482,13 @@ int Rdb_sst_info::commit(bool print_client_error) { close_curr_sst_file(); } -#if defined(RDB_SST_INFO_USE_THREAD) - if (m_thread != nullptr) { - // Tell the background thread we are done - m_finished = true; - m_cond.notify_one(); + // This checks out the list of files so that the caller can collect/group + // them and ingest them all in one go, and any racing calls to commit + // won't see them at all + commit_info->init(m_cf, std::move(m_committed_files)); + DBUG_ASSERT(m_committed_files.size() == 0); - // Wait for the background thread to finish - m_thread->join(); - delete m_thread; - m_thread = nullptr; - } -#endif - - m_committed = true; + m_done = true; RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex); // Did we get any errors? @@ -517,16 +502,13 @@ int Rdb_sst_info::commit(bool print_client_error) { void Rdb_sst_info::set_error_msg(const std::string &sst_file_name, const rocksdb::Status &s) { + if (!m_print_client_error) return; - if (!m_print_client_error) - return; + report_error_msg(s, sst_file_name.c_str()); +} -#if defined(RDB_SST_INFO_USE_THREAD) - // Both the foreground and background threads can set the error message - // so lock the mutex to protect it. We only want the first error that - // we encounter. - const std::lock_guard<std::mutex> guard(m_mutex); -#endif +void Rdb_sst_info::report_error_msg(const rocksdb::Status &s, + const char *sst_file_name) { if (s.IsInvalidArgument() && strcmp(s.getState(), "Keys must be added in order") == 0) { my_printf_error(ER_KEYS_OUT_OF_ORDER, @@ -536,57 +518,16 @@ void Rdb_sst_info::set_error_msg(const std::string &sst_file_name, } else if (s.IsInvalidArgument() && strcmp(s.getState(), "Global seqno is required, but disabled") == 0) { - my_printf_error(ER_OVERLAPPING_KEYS, "Rows inserted during bulk load " - "must not overlap existing rows", + my_printf_error(ER_OVERLAPPING_KEYS, + "Rows inserted during bulk load " + "must not overlap existing rows", MYF(0)); } else { my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0), - sst_file_name.c_str(), s.ToString().c_str()); + sst_file_name, s.ToString().c_str()); } } -#if defined(RDB_SST_INFO_USE_THREAD) -// Static thread function - the Rdb_sst_info object is in 'object' -void Rdb_sst_info::thread_fcn(void *object) { - reinterpret_cast<Rdb_sst_info *>(object)->run_thread(); -} - -void Rdb_sst_info::run_thread() { - std::unique_lock<std::mutex> lk(m_mutex); - - do { - // Wait for notification or 1 second to pass - m_cond.wait_for(lk, std::chrono::seconds(1)); - - // Inner loop pulls off all Rdb_sst_file_ordered entries and processes them - while (!m_queue.empty()) { - Rdb_sst_file_ordered *const sst_file = m_queue.front(); - m_queue.pop(); - - // Release the lock - we don't want to hold it while committing the file - lk.unlock(); - - // Close out the sst file and add it to the database - const rocksdb::Status s = sst_file->commit(); - if (!s.ok()) { - set_error_msg(sst_file->get_name(), s); - set_background_error(HA_ERR_ROCKSDB_BULK_LOAD); - } - - delete sst_file; - - // Reacquire the lock for the next inner loop iteration - lk.lock(); - } - - // If the queue is empty and the main thread has indicated we should exit - // break out of the loop. - } while (!m_finished); - - DBUG_ASSERT(m_queue.empty()); -} -#endif - void Rdb_sst_info::init(const rocksdb::DB *const db) { const std::string path = db->GetName() + FN_DIRSEP; struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT)); @@ -618,4 +559,4 @@ void Rdb_sst_info::init(const rocksdb::DB *const db) { std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0); std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp"; -} // namespace myrocks +} // namespace myrocks |