diff options
author | Sergei Petrunia <psergey@askmonty.org> | 2017-01-01 23:33:18 +0000 |
---|---|---|
committer | Sergei Petrunia <psergey@askmonty.org> | 2017-01-01 23:33:18 +0000 |
commit | d8288b306c500e7a87ca9f583de458510325c213 (patch) | |
tree | cf492cbb206e09d5ebe82024bdded038e44c6c93 /storage/rocksdb/rdb_sst_info.cc | |
parent | ae0a490eb3447846f0c18e3485c5ae2d95c3a330 (diff) | |
parent | cfb59f3196aac1b41cdda79952031dcc64042914 (diff) | |
download | mariadb-git-d8288b306c500e7a87ca9f583de458510325c213.tar.gz |
Merge remote-tracking branch 'mergetrees/merge-myrocks' into 10.2-mariarocks
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.cc')
-rw-r--r-- | storage/rocksdb/rdb_sst_info.cc | 128 |
1 files changed, 96 insertions, 32 deletions
diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc index dbab4b50068..f51ea907be1 100644 --- a/storage/rocksdb/rdb_sst_info.cc +++ b/storage/rocksdb/rdb_sst_info.cc @@ -25,11 +25,13 @@ #include <vector> /* MySQL header files */ +#include <mysqld_error.h> #include "../sql/log.h" #include "./my_dir.h" /* RocksDB header files */ #include "rocksdb/db.h" +#include "rocksdb/options.h" /* MyRocks header files */ #include "./ha_rocksdb.h" @@ -38,14 +40,16 @@ namespace myrocks { -Rdb_sst_file::Rdb_sst_file(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, +Rdb_sst_file::Rdb_sst_file(rocksdb::DB* const db, + rocksdb::ColumnFamilyHandle* const cf, const rocksdb::DBOptions& db_options, - const std::string& name) : + const std::string& name, const bool tracing) : m_db(db), m_cf(cf), m_db_options(db_options), m_sst_file_writer(nullptr), - m_name(name) + m_name(name), + m_tracing(tracing) { DBUG_ASSERT(db != nullptr); DBUG_ASSERT(cf != nullptr); @@ -78,13 +82,20 @@ rocksdb::Status Rdb_sst_file::open() // Create an sst file writer with the current options and comparator const rocksdb::Comparator* comparator= m_cf->GetComparator(); - rocksdb::EnvOptions env_options(m_db_options); - rocksdb::Options options(m_db_options, cf_descr.options); + const rocksdb::EnvOptions env_options(m_db_options); + const rocksdb::Options options(m_db_options, cf_descr.options); m_sst_file_writer= - new rocksdb::SstFileWriter(env_options, options, comparator); + new rocksdb::SstFileWriter(env_options, options, comparator, m_cf); s= m_sst_file_writer->Open(m_name); + if (m_tracing) + { + // NO_LINT_DEBUG + sql_print_information("SST Tracing: Open(%s) returned %s", m_name.c_str(), + s.ok() ? "ok" : "not ok"); + } + if (!s.ok()) { delete m_sst_file_writer; @@ -103,22 +114,73 @@ rocksdb::Status Rdb_sst_file::put(const rocksdb::Slice& key, return m_sst_file_writer->Add(key, value); } +std::string Rdb_sst_file::generateKey(const std::string& key) +{ + static char const hexdigit[]= { + '0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' + }; + + std::string res; + + res.reserve(key.size() * 2); + + for (auto ch : key) + { + res += hexdigit[((uint8_t) ch) >> 4]; + res += hexdigit[((uint8_t) ch) & 0x0F]; + } + + return res; +} + // This function is run by the background thread rocksdb::Status Rdb_sst_file::commit() { DBUG_ASSERT(m_sst_file_writer != nullptr); rocksdb::Status s; + rocksdb::ExternalSstFileInfo fileinfo; ///Finish may should be modified // Close out the sst file - s= m_sst_file_writer->Finish(); + s= m_sst_file_writer->Finish(&fileinfo); + if (m_tracing) + { + // NO_LINT_DEBUG + sql_print_information("SST Tracing: Finish returned %s", + s.ok() ? "ok" : "not ok"); + } + if (s.ok()) { - std::vector<std::string> files = { m_name }; + if (m_tracing) + { + // NO_LINT_DEBUG + sql_print_information("SST Tracing: Adding file %s, smallest key: %s, " + "largest key: %s, file size: %" PRIu64 ", " + "num_entries: %" PRIu64, fileinfo.file_path.c_str(), + generateKey(fileinfo.smallest_key).c_str(), + generateKey(fileinfo.largest_key).c_str(), + fileinfo.file_size, fileinfo.num_entries); + } + // Add the file to the database - // Set the skip_snapshot_check parameter to true since no one + // Set the snapshot_consistency parameter to false since no one // should be accessing the table we are bulk loading - s= m_db->AddFile(m_cf, files, true, true); + rocksdb::IngestExternalFileOptions opts; + opts.move_files = true; + opts.snapshot_consistency = false; + opts.allow_global_seqno = false; + opts.allow_blocking_flush = false; + s= m_db->IngestExternalFile(m_cf, { m_name }, opts); + + if (m_tracing) + { + // NO_LINT_DEBUG + sql_print_information("SST Tracing: AddFile(%s) returned %s", + fileinfo.file_path.c_str(), + s.ok() ? "ok" : "not ok"); + } } delete m_sst_file_writer; @@ -127,10 +189,11 @@ rocksdb::Status Rdb_sst_file::commit() return s; } -Rdb_sst_info::Rdb_sst_info(rocksdb::DB* db, const std::string& tablename, +Rdb_sst_info::Rdb_sst_info(rocksdb::DB* const db, const std::string& tablename, const std::string& indexname, - rocksdb::ColumnFamilyHandle* cf, - const rocksdb::DBOptions& db_options) : + rocksdb::ColumnFamilyHandle* const cf, + const rocksdb::DBOptions& db_options, + const bool& tracing) : m_db(db), m_cf(cf), m_db_options(db_options), @@ -144,7 +207,8 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB* db, const std::string& tablename, m_thread(nullptr), m_finished(false), #endif - m_sst_file(nullptr) + m_sst_file(nullptr), + m_tracing(tracing) { m_prefix= db->GetName() + "/"; @@ -164,7 +228,7 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB* db, const std::string& tablename, } rocksdb::ColumnFamilyDescriptor cf_descr; - rocksdb::Status s= m_cf->GetDescriptor(&cf_descr); + const rocksdb::Status s= m_cf->GetDescriptor(&cf_descr); if (!s.ok()) { // Default size if we can't get the cf's target size @@ -190,13 +254,13 @@ int Rdb_sst_info::open_new_sst_file() DBUG_ASSERT(m_sst_file == nullptr); // Create the new sst file's name - std::string name= m_prefix + std::to_string(m_sst_count++) + m_suffix; + const std::string name= m_prefix + std::to_string(m_sst_count++) + m_suffix; // Create the new sst file object - m_sst_file= new Rdb_sst_file(m_db, m_cf, m_db_options, name); + m_sst_file= new Rdb_sst_file(m_db, m_cf, m_db_options, name, m_tracing); // Open the sst file - rocksdb::Status s= m_sst_file->open(); + const rocksdb::Status s= m_sst_file->open(); if (!s.ok()) { set_error_msg(s.ToString()); @@ -226,14 +290,14 @@ void Rdb_sst_info::close_curr_sst_file() { // Add this finished sst file to the queue (while holding mutex) - std::lock_guard<std::mutex> guard(m_mutex); + const std::lock_guard<std::mutex> guard(m_mutex); m_queue.push(m_sst_file); } // Notify the background thread that there is a new entry in the queue m_cond.notify_one(); #else - rocksdb::Status s= m_sst_file->commit(); + const rocksdb::Status s= m_sst_file->commit(); if (!s.ok()) { set_error_msg(s.ToString()); @@ -278,7 +342,7 @@ int Rdb_sst_info::put(const rocksdb::Slice& key, DBUG_ASSERT(m_sst_file != nullptr); // Add the key/value to the current sst file - rocksdb::Status s= m_sst_file->put(key, value); + const rocksdb::Status s= m_sst_file->put(key, value); if (!s.ok()) { set_error_msg(s.ToString()); @@ -327,7 +391,7 @@ void Rdb_sst_info::set_error_msg(const std::string& msg) // Both the foreground and background threads can set the error message // so lock the mutex to protect it. We only want the first error that // we encounter. - std::lock_guard<std::mutex> guard(m_mutex); + const std::lock_guard<std::mutex> guard(m_mutex); #endif my_printf_error(ER_UNKNOWN_ERROR, "bulk load error: %s", MYF(0), msg.c_str()); if (m_error_msg.empty()) @@ -345,7 +409,7 @@ void Rdb_sst_info::thread_fcn(void* object) void Rdb_sst_info::run_thread() { - std::unique_lock<std::mutex> lk(m_mutex); + const std::unique_lock<std::mutex> lk(m_mutex); do { @@ -355,14 +419,14 @@ void Rdb_sst_info::run_thread() // Inner loop pulls off all Rdb_sst_file entries and processes them while (!m_queue.empty()) { - Rdb_sst_file* sst_file= m_queue.front(); + const Rdb_sst_file* const sst_file= m_queue.front(); m_queue.pop(); // Release the lock - we don't want to hold it while committing the file lk.unlock(); // Close out the sst file and add it to the database - rocksdb::Status s= sst_file->commit(); + const rocksdb::Status s= sst_file->commit(); if (!s.ok()) { set_error_msg(s.ToString()); @@ -382,10 +446,10 @@ void Rdb_sst_info::run_thread() } #endif -void Rdb_sst_info::init(rocksdb::DB* db) +void Rdb_sst_info::init(const rocksdb::DB* const db) { - std::string path= db->GetName() + FN_DIRSEP; - struct st_my_dir* dir_info= my_dir(path.c_str(), MYF(MY_DONT_SORT)); + const std::string path= db->GetName() + FN_DIRSEP; + struct st_my_dir* const dir_info= my_dir(path.c_str(), MYF(MY_DONT_SORT)); // Access the directory if (dir_info == nullptr) @@ -397,16 +461,16 @@ void Rdb_sst_info::init(rocksdb::DB* db) } // Scan through the files in the directory - struct fileinfo* file_info= dir_info->dir_entry; + const struct fileinfo* file_info= dir_info->dir_entry; for (uint ii= 0; ii < dir_info->number_of_files; ii++, file_info++) { // find any files ending with m_suffix ... - std::string name= file_info->name; - size_t pos= name.find(m_suffix); + const std::string name= file_info->name; + const size_t pos= name.find(m_suffix); if (pos != std::string::npos && name.size() - pos == m_suffix.size()) { // ... and remove them - std::string fullname= path + name; + const std::string fullname= path + name; my_delete(fullname.c_str(), MYF(0)); } } |