summaryrefslogtreecommitdiff
path: root/storage/rocksdb/rdb_sst_info.h
diff options
context:
space:
mode:
Diffstat (limited to 'storage/rocksdb/rdb_sst_info.h')
-rw-r--r--storage/rocksdb/rdb_sst_info.h119
1 files changed, 99 insertions, 20 deletions
diff --git a/storage/rocksdb/rdb_sst_info.h b/storage/rocksdb/rdb_sst_info.h
index f50645b1eeb..66da3b7c1e7 100644
--- a/storage/rocksdb/rdb_sst_info.h
+++ b/storage/rocksdb/rdb_sst_info.h
@@ -34,8 +34,6 @@
/* MyRocks header files */
#include "./rdb_utils.h"
-// #define RDB_SST_INFO_USE_THREAD /* uncomment to use threads */
-
namespace myrocks {
class Rdb_sst_file_ordered {
@@ -125,43 +123,114 @@ class Rdb_sst_info {
uint64_t m_max_size;
uint32_t m_sst_count;
std::atomic<int> m_background_error;
+ bool m_done;
std::string m_prefix;
static std::atomic<uint64_t> m_prefix_counter;
static std::string m_suffix;
- bool m_committed;
mysql_mutex_t m_commit_mutex;
-#if defined(RDB_SST_INFO_USE_THREAD)
- std::queue<Rdb_sst_file_ordered *> m_queue;
- std::mutex m_mutex;
- std::condition_variable m_cond;
- std::thread *m_thread;
- bool m_finished;
-#endif
Rdb_sst_file_ordered *m_sst_file;
+
+ // List of committed SST files - we'll ingest them later in one single batch
+ std::vector<std::string> m_committed_files;
+
const bool m_tracing;
bool m_print_client_error;
int open_new_sst_file();
void close_curr_sst_file();
+ void commit_sst_file(Rdb_sst_file_ordered *sst_file);
+
void set_error_msg(const std::string &sst_file_name,
const rocksdb::Status &s);
-#if defined(RDB_SST_INFO_USE_THREAD)
- void run_thread();
-
- static void thread_fcn(void *object);
-#endif
-
public:
Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
const std::string &indexname,
rocksdb::ColumnFamilyHandle *const cf,
- const rocksdb::DBOptions &db_options, const bool &tracing);
+ const rocksdb::DBOptions &db_options, const bool tracing);
~Rdb_sst_info();
+ /*
+ This is the unit of work returned from Rdb_sst_info::finish and represents
+ a group of SST to be ingested atomically with other Rdb_sst_commit_info.
+ This is always local to the bulk loading complete operation so no locking
+ is required
+ */
+ class Rdb_sst_commit_info {
+ public:
+ Rdb_sst_commit_info() : m_committed(true), m_cf(nullptr) {}
+
+ Rdb_sst_commit_info(Rdb_sst_commit_info &&rhs) noexcept
+ : m_committed(rhs.m_committed),
+ m_cf(rhs.m_cf),
+ m_committed_files(std::move(rhs.m_committed_files)) {
+ rhs.m_committed = true;
+ rhs.m_cf = nullptr;
+ }
+
+ Rdb_sst_commit_info &operator=(Rdb_sst_commit_info &&rhs) noexcept {
+ reset();
+
+ m_cf = rhs.m_cf;
+ m_committed_files = std::move(rhs.m_committed_files);
+ m_committed = rhs.m_committed;
+
+ rhs.m_committed = true;
+ rhs.m_cf = nullptr;
+
+ return *this;
+ }
+
+ Rdb_sst_commit_info(const Rdb_sst_commit_info &) = delete;
+ Rdb_sst_commit_info &operator=(const Rdb_sst_commit_info &) = delete;
+
+ ~Rdb_sst_commit_info() { reset(); }
+
+ void reset() {
+ if (!m_committed) {
+ for (auto sst_file : m_committed_files) {
+ // In case something went wrong attempt to delete the temporary file.
+ // If everything went fine that file will have been renamed and this
+ // function call will fail.
+ std::remove(sst_file.c_str());
+ }
+ }
+ m_committed_files.clear();
+ m_cf = nullptr;
+ m_committed = true;
+ }
+
+ bool has_work() const {
+ return m_cf != nullptr && m_committed_files.size() > 0;
+ }
+
+ void init(rocksdb::ColumnFamilyHandle *cf,
+ std::vector<std::string> &&files) {
+ DBUG_ASSERT(m_cf == nullptr && m_committed_files.size() == 0 &&
+ m_committed);
+ m_cf = cf;
+ m_committed_files = std::move(files);
+ m_committed = false;
+ }
+
+ rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; }
+
+ const std::vector<std::string> &get_committed_files() const {
+ return m_committed_files;
+ }
+
+ void commit() { m_committed = true; }
+
+ private:
+ bool m_committed;
+ rocksdb::ColumnFamilyHandle *m_cf;
+ std::vector<std::string> m_committed_files;
+ };
+
int put(const rocksdb::Slice &key, const rocksdb::Slice &value);
- int commit(bool print_client_error = true);
- bool is_committed() const { return m_committed; }
+ int finish(Rdb_sst_commit_info *commit_info, bool print_client_error = true);
+
+ bool is_done() const { return m_done; }
bool have_background_error() { return m_background_error != 0; }
@@ -180,7 +249,17 @@ class Rdb_sst_info {
m_background_error.compare_exchange_strong(expected, code);
}
+ /** Return the list of committed files later to be ingested **/
+ const std::vector<std::string> &get_committed_files() {
+ return m_committed_files;
+ }
+
+ rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf; }
+
static void init(const rocksdb::DB *const db);
+
+ static void report_error_msg(const rocksdb::Status &s,
+ const char *sst_file_name);
};
-} // namespace myrocks
+} // namespace myrocks