summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2017-10-17 12:04:53 +0300
committerSergei Petrunia <psergey@askmonty.org>2017-10-17 12:04:53 +0300
commitcd7fa0fd62da36f8c6bb88e29fd23fcfd85e3991 (patch)
treee2dccec1376235d5b7e379b1f1ffffad9c6098ee /storage
parentf11eaaa3f6fc28834a7758100e1c0daeccedd972 (diff)
downloadmariadb-git-cd7fa0fd62da36f8c6bb88e29fd23fcfd85e3991.tar.gz
MDEV-11934: MariaRocks: Group Commit with binlog
Port the previous patch: - Implement MariaDB's Group Commit API. This is a first attempt which lacks the expected performance. To newer MariaDB (which includes newer MyRocks)
Diffstat (limited to 'storage')
-rw-r--r--storage/rocksdb/ha_rocksdb.cc106
1 files changed, 99 insertions, 7 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index 17ea7b74565..3ca58b3f935 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -1807,6 +1807,12 @@ public:
int64_t m_n_mysql_tables_in_use = 0;
/*
+ MariaDB's group commit:
+ */
+ bool commit_ordered_done;
+ bool commit_ordered_res;
+
+ /*
for distinction between rdb_transaction_impl and rdb_writebatch_impl
when using walk tx list
*/
@@ -2431,6 +2437,8 @@ public:
THDVAR(m_thd, write_ignore_missing_column_families);
m_is_two_phase = rocksdb_enable_2pc;
+ commit_ordered_done= false;
+
/*
If m_rocksdb_reuse_tx is null this will create a new transaction object.
Otherwise it will reuse the existing one.
@@ -2643,6 +2651,7 @@ public:
bool is_tx_started() const override { return (m_batch != nullptr); }
void start_tx() override {
+ commit_ordered_done= false; // Do we need this here?
reset();
write_opts.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC);
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
@@ -2831,8 +2840,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__)))
*/
static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
{
-// This is "ASYNC_COMMIT" feature which is only in webscalesql
- bool async=false;
+ bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
Rdb_transaction *&tx = get_tx_from_thd(thd);
if (!tx->can_prepare()) {
@@ -2842,7 +2850,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
(!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/* We were instructed to prepare the whole transaction, or
this is an SQL statement end and autocommit is on */
-#ifdef MARIAROCKS_NOT_YET // disable prepare/commit
+
+#ifdef MARIAROCKS_NOT_YET // Crash-safe slave does not work yet
std::vector<st_slave_gtid_info> slave_gtid_info;
my_core::thd_slave_gtid_info(thd, &slave_gtid_info);
for (const auto &it : slave_gtid_info) {
@@ -2852,22 +2861,41 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
#endif
if (tx->is_two_phase()) {
+
+ /*
+ MariaDB: the following branch is never taken.
+ We always flush at Prepare and rely on RocksDB's internal Group Commit
+ to do some grouping.
+ */
if (thd->durability_property == HA_IGNORE_DURABILITY || async) {
tx->set_sync(false);
}
+
+ /*
+ MariaDB: do not flush logs if we are running in a non-crash-safe mode.
+ */
+ if (!rocksdb_flush_log_at_trx_commit)
+ tx->set_sync(false);
+
XID xid;
thd_get_xid(thd, reinterpret_cast<MYSQL_XID *>(&xid));
if (!tx->prepare(rdb_xid_to_string(xid))) {
return HA_EXIT_FAILURE;
}
- if (thd->durability_property == HA_IGNORE_DURABILITY )
+
+ /*
+ MariaDB: our Group Commit implementation does not use the
+ hton->flush_logs call (at least currently) so the following is not
+ needed (TODO: will we need this for binlog rotation?)
+ */
#ifdef MARIAROCKS_NOT_YET
- (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER)) {
+ if (thd->durability_property == HA_IGNORE_DURABILITY )
+ (rocksdb_flush_log_at_trx_commit != FLUSH_LOG_NEVER))
&&
THDVAR(thd, flush_log_at_trx_commit))
#endif
- {
#ifdef MARIAROCKS_NOT_YET
+ {
// MariaRocks: disable the
// "write/sync redo log before flushing binlog cache to file"
// feature. See a869c56d361bb44f46c0efeb11a8f03561676247
@@ -2875,8 +2903,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
we set the log sequence as '1' just to trigger hton->flush_logs
*/
thd_store_lsn(thd, 1, DB_TYPE_ROCKSDB);
-#endif
}
+#endif
}
DEBUG_SYNC(thd, "rocksdb.prepared");
@@ -3026,6 +3054,50 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
return count;
}
+
+/*
+ Handle a commit checkpoint request from server layer.
+
+ InnoDB does this:
+ We put the request in a queue, so that we can notify upper layer about
+ checkpoint complete when we have flushed the redo log.
+ If we have already flushed all relevant redo log, we notify immediately.
+
+ MariaRocks just flushes everything right away ATM
+*/
+
+static void rocksdb_checkpoint_request(handlerton *hton,
+ void *cookie)
+{
+ const rocksdb::Status s= rdb->SyncWAL();
+ //TODO: what to do on error?
+ if (s.ok())
+ {
+ rocksdb_wal_group_syncs++;
+ commit_checkpoint_notify_ha(hton, cookie);
+ }
+}
+
+/*
+ @param all: TRUE - commit the transaction
+ FALSE - SQL statement ended
+*/
+static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
+{
+ // Same assert as InnoDB has
+ DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)));
+ Rdb_transaction *&tx = get_tx_from_thd(thd);
+
+ tx->set_sync(false);
+
+ /* This will note the master position also */
+ tx->commit_ordered_res= tx->commit();
+ tx->commit_ordered_done= true;
+
+}
+
+
static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
{
DBUG_ENTER_FUNC();
@@ -3046,6 +3118,16 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
OPTION_BEGIN))) {
/*
+ This will not add anything to commit_latency_stats, and this is correct
+ right?
+ */
+ if (tx->commit_ordered_done)
+ {
+ thd_wakeup_subsequent_commits(thd, 0);
+ DBUG_RETURN((tx->commit_ordered_res? HA_ERR_INTERNAL_ERROR: 0));
+ }
+
+ /*
We get here
- For a COMMIT statement that finishes a multi-statement transaction
- For a statement that has its own transaction
@@ -3053,6 +3135,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (tx->commit()) {
DBUG_RETURN(HA_ERR_ROCKSDB_COMMIT_FAILED);
}
+ thd_wakeup_subsequent_commits(thd, 0);
} else {
/*
We get here when committing a statement within a transaction.
@@ -3076,6 +3159,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
DBUG_RETURN(HA_EXIT_SUCCESS);
}
+
static int rocksdb_rollback(handlerton *const hton, THD *const thd,
bool rollback_tx) {
Rdb_transaction *&tx = get_tx_from_thd(thd);
@@ -3882,11 +3966,19 @@ static int rocksdb_init_func(void *const p) {
rocksdb_hton->state = SHOW_OPTION_YES;
rocksdb_hton->create = rocksdb_create_handler;
rocksdb_hton->close_connection = rocksdb_close_connection;
+
rocksdb_hton->prepare = rocksdb_prepare;
+ rocksdb_hton->prepare_ordered = NULL; // Do not need it
+
rocksdb_hton->commit_by_xid = rocksdb_commit_by_xid;
rocksdb_hton->rollback_by_xid = rocksdb_rollback_by_xid;
rocksdb_hton->recover = rocksdb_recover;
+
+ rocksdb_hton->commit_ordered= rocksdb_commit_ordered;
rocksdb_hton->commit = rocksdb_commit;
+
+ rocksdb_hton->commit_checkpoint_request= rocksdb_checkpoint_request;
+
rocksdb_hton->rollback = rocksdb_rollback;
rocksdb_hton->show_status = rocksdb_show_status;
rocksdb_hton->start_consistent_snapshot =