diff options
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.h')
-rw-r--r-- | storage/rocksdb/rdb_sst_info.h | 119 |
1 files changed, 99 insertions, 20 deletions
diff --git a/storage/rocksdb/rdb_sst_info.h b/storage/rocksdb/rdb_sst_info.h index f50645b1eeb..66da3b7c1e7 100644 --- a/storage/rocksdb/rdb_sst_info.h +++ b/storage/rocksdb/rdb_sst_info.h @@ -34,8 +34,6 @@ /* MyRocks header files */ #include "./rdb_utils.h" -// #define RDB_SST_INFO_USE_THREAD /* uncomment to use threads */ - namespace myrocks { class Rdb_sst_file_ordered { @@ -125,43 +123,114 @@ class Rdb_sst_info { uint64_t m_max_size; uint32_t m_sst_count; std::atomic<int> m_background_error; + bool m_done; std::string m_prefix; static std::atomic<uint64_t> m_prefix_counter; static std::string m_suffix; - bool m_committed; mysql_mutex_t m_commit_mutex; -#if defined(RDB_SST_INFO_USE_THREAD) - std::queue<Rdb_sst_file_ordered *> m_queue; - std::mutex m_mutex; - std::condition_variable m_cond; - std::thread *m_thread; - bool m_finished; -#endif Rdb_sst_file_ordered *m_sst_file; + + // List of committed SST files - we'll ingest them later in one single batch + std::vector<std::string> m_committed_files; + const bool m_tracing; bool m_print_client_error; int open_new_sst_file(); void close_curr_sst_file(); + void commit_sst_file(Rdb_sst_file_ordered *sst_file); + void set_error_msg(const std::string &sst_file_name, const rocksdb::Status &s); -#if defined(RDB_SST_INFO_USE_THREAD) - void run_thread(); - - static void thread_fcn(void *object); -#endif - public: Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename, const std::string &indexname, rocksdb::ColumnFamilyHandle *const cf, - const rocksdb::DBOptions &db_options, const bool &tracing); + const rocksdb::DBOptions &db_options, const bool tracing); ~Rdb_sst_info(); + /* + This is the unit of work returned from Rdb_sst_info::finish and represents + a group of SST to be ingested atomically with other Rdb_sst_commit_info. + This is always local to the bulk loading complete operation so no locking + is required + */ + class Rdb_sst_commit_info { + public: + Rdb_sst_commit_info() : m_committed(true), m_cf(nullptr) {} + + Rdb_sst_commit_info(Rdb_sst_commit_info &&rhs) noexcept + : m_committed(rhs.m_committed), + m_cf(rhs.m_cf), + m_committed_files(std::move(rhs.m_committed_files)) { + rhs.m_committed = true; + rhs.m_cf = nullptr; + } + + Rdb_sst_commit_info &operator=(Rdb_sst_commit_info &&rhs) noexcept { + reset(); + + m_cf = rhs.m_cf; + m_committed_files = std::move(rhs.m_committed_files); + m_committed = rhs.m_committed; + + rhs.m_committed = true; + rhs.m_cf = nullptr; + + return *this; + } + + Rdb_sst_commit_info(const Rdb_sst_commit_info &) = delete; + Rdb_sst_commit_info &operator=(const Rdb_sst_commit_info &) = delete; + + ~Rdb_sst_commit_info() { reset(); } + + void reset() { + if (!m_committed) { + for (auto sst_file : m_committed_files) { + // In case something went wrong attempt to delete the temporary file. + // If everything went fine that file will have been renamed and this + // function call will fail. + std::remove(sst_file.c_str()); + } + } + m_committed_files.clear(); + m_cf = nullptr; + m_committed = true; + } + + bool has_work() const { + return m_cf != nullptr && m_committed_files.size() > 0; + } + + void init(rocksdb::ColumnFamilyHandle *cf, + std::vector<std::string> &&files) { + DBUG_ASSERT(m_cf == nullptr && m_committed_files.size() == 0 && + m_committed); + m_cf = cf; + m_committed_files = std::move(files); + m_committed = false; + } + + rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; } + + const std::vector<std::string> &get_committed_files() const { + return m_committed_files; + } + + void commit() { m_committed = true; } + + private: + bool m_committed; + rocksdb::ColumnFamilyHandle *m_cf; + std::vector<std::string> m_committed_files; + }; + int put(const rocksdb::Slice &key, const rocksdb::Slice &value); - int commit(bool print_client_error = true); - bool is_committed() const { return m_committed; } + int finish(Rdb_sst_commit_info *commit_info, bool print_client_error = true); + + bool is_done() const { return m_done; } bool have_background_error() { return m_background_error != 0; } @@ -180,7 +249,17 @@ class Rdb_sst_info { m_background_error.compare_exchange_strong(expected, code); } + /** Return the list of committed files later to be ingested **/ + const std::vector<std::string> &get_committed_files() { + return m_committed_files; + } + + rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; } + static void init(const rocksdb::DB *const db); + + static void report_error_msg(const rocksdb::Status &s, + const char *sst_file_name); }; -} // namespace myrocks +} // namespace myrocks |