diff options
-rw-r--r-- | storage/pbxt/src/ha_pbxt.cc | 39 | ||||
-rw-r--r-- | storage/pbxt/src/thread_xt.h | 3 | ||||
-rw-r--r-- | storage/pbxt/src/xaction_xt.cc | 149 | ||||
-rw-r--r-- | storage/pbxt/src/xaction_xt.h | 2 | ||||
-rwxr-xr-x | tests/consistent_snapshot.pl | 107 |
5 files changed, 253 insertions, 47 deletions
diff --git a/storage/pbxt/src/ha_pbxt.cc b/storage/pbxt/src/ha_pbxt.cc index ef0ae582c07..85a41ce0178 100644 --- a/storage/pbxt/src/ha_pbxt.cc +++ b/storage/pbxt/src/ha_pbxt.cc @@ -108,6 +108,9 @@ static int pbxt_end(void *p); static int pbxt_panic(handlerton *hton, enum ha_panic_function flag); static void pbxt_drop_database(handlerton *hton, char *path); static int pbxt_close_connection(handlerton *hton, THD* thd); +#ifdef MARIADB_BASE_VERSION +static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all); +#endif static int pbxt_commit(handlerton *hton, THD *thd, bool all); static int pbxt_rollback(handlerton *hton, THD *thd, bool all); static int pbxt_prepare(handlerton *hton, THD *thd, bool all); @@ -1147,6 +1150,9 @@ static int pbxt_init(void *p) pbxt_hton->state = SHOW_OPTION_YES; pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own! pbxt_hton->close_connection = pbxt_close_connection; /* close_connection, cleanup thread related data. */ +#ifdef MARIADB_BASE_VERSION + pbxt_hton->commit_ordered = pbxt_commit_ordered; +#endif pbxt_hton->commit = pbxt_commit; /* commit */ pbxt_hton->rollback = pbxt_rollback; /* rollback */ if (pbxt_support_xa) { @@ -1484,6 +1490,29 @@ static int pbxt_start_consistent_snapshot(handlerton *hton, THD *thd) return err; } +#ifdef MARIADB_BASE_VERSION +/* + * Quickly commit the transaction to memory and make it visible to others. + * The remaining part of commit will happen later, in pbxt_commit(). + */ +static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all) +{ + XTThreadPtr self; + + if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) { + XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END CONN XACT" : "END STAT", all); + + if (self->st_xact_data) { + if (all || self->st_auto_commit) { + self->st_commit_ordered = TRUE; + self->st_writer = self->st_xact_writer; + self->st_delayed_error= !xt_xn_commit_fast(self, self->st_writer); + } + } + } +} +#endif + /* * Commit the PBXT transaction of the given thread. * thd is the MySQL thread structure. @@ -1512,7 +1541,13 @@ static int pbxt_commit(handlerton *hton, THD *thd, bool all) if (all || self->st_auto_commit) { XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n"); - if (!xt_xn_commit(self)) + if (self->st_commit_ordered) { + self->st_commit_ordered = FALSE; + err = !xt_xn_commit_slow(self, self->st_writer) || self->st_delayed_error; + } else { + err = !xt_xn_commit(self); + } + if (err) err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE); } } @@ -6064,7 +6099,7 @@ static MYSQL_SYSVAR_INT(max_threads, pbxt_max_threads, NULL, NULL, 0, 0, 20000, 1); #endif -#ifndef DEBUG +#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION) static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa, PLUGIN_VAR_OPCMDARG, "Enable PBXT support for the XA two-phase commit, default is enabled", diff --git a/storage/pbxt/src/thread_xt.h b/storage/pbxt/src/thread_xt.h index a07f7b7ae01..282df46a5d5 100644 --- a/storage/pbxt/src/thread_xt.h +++ b/storage/pbxt/src/thread_xt.h @@ -299,6 +299,9 @@ typedef struct XTThread { xtBool st_stat_ended; /* TRUE if the statement was ended. */ xtBool st_stat_trans; /* TRUE if a statement transaction is running (started on UPDATE). */ xtBool st_stat_modify; /* TRUE if the statement is an INSERT/UPDATE/DELETE */ + xtBool st_commit_ordered; /* TRUE if we have run commit_ordered() */ + xtBool st_delayed_error; /* TRUE if we got an error in commit_ordered() */ + xtBool st_writer; /* Copy of thread->st_xact_writer (which is clobbered by xlog_append()) */ #ifdef XT_IMPLEMENT_NO_ACTION XTBasicListRec st_restrict_list; /* These records have been deleted and should have no reference. */ #endif diff --git a/storage/pbxt/src/xaction_xt.cc b/storage/pbxt/src/xaction_xt.cc index 48abc5d2b66..c5f07eedabf 100644 --- a/storage/pbxt/src/xaction_xt.cc +++ b/storage/pbxt/src/xaction_xt.cc @@ -1287,27 +1287,61 @@ xtPublic xtBool xt_xn_begin(XTThreadPtr self) return OK; } -static xtBool xn_end_xact(XTThreadPtr thread, u_int status) +static void xn_end_release_locks(XTThreadPtr thread) +{ + XTXactDataPtr xact = thread->st_xact_data; + XTDatabaseHPtr db = thread->st_database; + ASSERT_NS(xact); + + /* {REMOVE-LOCKS} Drop locks if you have any: */ + thread->st_lock_list.xt_remove_all_locks(db, thread); + + /* Do this afterwards to make sure the sweeper + * does not cleanup transactions start cleaning up + * before any transactions that were waiting for + * this transaction have completed! + */ + xact->xd_end_xn_id = db->db_xn_curr_id; + + /* Now you can sweep! */ + xact->xd_flags |= XT_XN_XAC_SWEEP; +} + +/* The commit is split into two phases: one "fast" for MariaDB commit_ordered(), + * and one "slow" for commit(). When not using internal 2pc, there is only one + * call combining both phases. + */ + +enum { + XN_END_PHASE_FAST = 1, + XN_END_PHASE_SLOW = 2, + XN_END_PHASE_BOTH = 3 +}; + +static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool writer, int phase) { XTXactDataPtr xact; xtBool ok = TRUE; + xtBool err; ASSERT_NS(thread->st_xact_data); if ((xact = thread->st_xact_data)) { XTDatabaseHPtr db = thread->st_database; xtXactID xn_id = xact->xd_start_xn_id; - xtBool writer; - if ((writer = thread->st_xact_writer)) { + if (writer) { /* The transaction wrote something: */ XTXactEndEntryDRec entry; xtWord4 sum; - sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0); - entry.xe_status_1 = status; - entry.xe_checksum_1 = XT_CHECKSUM_1(sum); - XT_SET_DISK_4(entry.xe_xact_id_4, xn_id); - XT_SET_DISK_4(entry.xe_not_used_4, 0); + if (phase & XN_END_PHASE_FAST) + { + sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0); + entry.xe_status_1 = status; + entry.xe_checksum_1 = XT_CHECKSUM_1(sum); + XT_SET_DISK_4(entry.xe_xact_id_4, xn_id); + XT_SET_DISK_4(entry.xe_not_used_4, 0); + } #ifdef XT_IMPLEMENT_NO_ACTION /* This will check any resticts that have been delayed to the end of the statement. */ @@ -1319,20 +1353,35 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status) } #endif - /* Flush the data log: */ - if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) { + /* Flush the data log (in the "fast" case we already did it in prepare: */ + if ((phase & XN_END_PHASE_SLOW) && !thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) { ok = FALSE; status = XT_LOG_ENT_ABORT; } /* Write and flush the transaction log: */ - if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) { + if (phase == XN_END_PHASE_FAST) { + /* Fast phase, delay any write or flush to later. */ + err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH); + } else if (phase == XN_END_PHASE_SLOW) { + /* We already appended the commit record in the fast phase. + * Now just call with empty record to ensure we write/flush + * the log as needed for this commit. + */ + err = !xt_xlog_log_data(thread, 0, NULL, xt_db_flush_log_at_trx_commit); + } else /* phase == XN_END_PHASE_BOTH */ { + /* Both phases at once, append commit record and write/flush normally. */ + ASSERT_NS(phase == XN_END_PHASE_BOTH); + err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit); + } + + if (err) { ok = FALSE; status = XT_LOG_ENT_ABORT; /* Make sure this is done, if we failed to log * the transction end! */ - if (thread->st_xact_writer) { + if (writer) { /* Adjust this in case of error, but don't forget * to lock! */ @@ -1347,46 +1396,46 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status) } } - /* Setting this flag completes the transaction, - * Do this before we release the locks, because - * the unlocked transactions expect the - * transaction they are waiting for to be - * gone! - */ - xact->xd_end_time = ++db->db_xn_end_time; - if (status == XT_LOG_ENT_COMMIT) { - thread->st_statistics.st_commits++; - xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); - } - else { - thread->st_statistics.st_rollbacks++; - xact->xd_flags |= XT_XN_XAC_ENDED; + if (phase & XN_END_PHASE_FAST) { + /* Setting this flag completes the transaction, + * Do this before we release the locks, because + * the unlocked transactions expect the + * transaction they are waiting for to be + * gone! + */ + xact->xd_end_time = ++db->db_xn_end_time; + if (status == XT_LOG_ENT_COMMIT) { + thread->st_statistics.st_commits++; + xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); + } + else { + thread->st_statistics.st_rollbacks++; + xact->xd_flags |= XT_XN_XAC_ENDED; + } } - /* {REMOVE-LOCKS} Drop locks is you have any: */ - thread->st_lock_list.xt_remove_all_locks(db, thread); - - /* Do this afterwards to make sure the sweeper - * does not cleanup transactions start cleaning up - * before any transactions that were waiting for - * this transaction have completed! + /* Be as fast as possible in the "fast" path, as we want to be as + * fast as possible here (we will release slow locks immediately + * after in the "slow" part). + * ToDo: If we ran the fast part, the slow part could release locks + * _before_ fsync(), rather than after. */ - xact->xd_end_xn_id = db->db_xn_curr_id; + if (!(phase & XN_END_PHASE_SLOW)) + return ok; - /* Now you can sweep! */ - xact->xd_flags |= XT_XN_XAC_SWEEP; + xn_end_release_locks(thread); } else { /* Read-only transaction can be removed, immediately */ - xact->xd_end_time = ++db->db_xn_end_time; - xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); - - /* Drop locks is you have any: */ - thread->st_lock_list.xt_remove_all_locks(db, thread); + if (phase & XN_END_PHASE_FAST) { + xact->xd_end_time = ++db->db_xn_end_time; + xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); - xact->xd_end_xn_id = db->db_xn_curr_id; + if (!(phase & XN_END_PHASE_SLOW)) + return ok; + } - xact->xd_flags |= XT_XN_XAC_SWEEP; + xn_end_release_locks(thread); if (xt_xn_delete_xact(db, xn_id, thread)) { if (db->db_xn_min_ram_id == xn_id) @@ -1478,12 +1527,22 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status) xtPublic xtBool xt_xn_commit(XTThreadPtr thread) { - return xn_end_xact(thread, XT_LOG_ENT_COMMIT); + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread->st_xact_writer, XN_END_PHASE_BOTH); +} + +xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer) +{ + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_FAST); +} + +xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer) +{ + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_SLOW); } xtPublic xtBool xt_xn_rollback(XTThreadPtr thread) { - return xn_end_xact(thread, XT_LOG_ENT_ABORT); + return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread->st_xact_writer, XN_END_PHASE_BOTH); } xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id) diff --git a/storage/pbxt/src/xaction_xt.h b/storage/pbxt/src/xaction_xt.h index e679a0f38f0..cd350200506 100644 --- a/storage/pbxt/src/xaction_xt.h +++ b/storage/pbxt/src/xaction_xt.h @@ -193,6 +193,8 @@ void xt_wakeup_sweeper(struct XTDatabase *db); xtBool xt_xn_begin(struct XTThread *self); xtBool xt_xn_commit(struct XTThread *self); +xtBool xt_xn_commit_fast(struct XTThread *self, xtBool writer); +xtBool xt_xn_commit_slow(struct XTThread *self, xtBool writer); xtBool xt_xn_rollback(struct XTThread *self); xtBool xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id); int xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id, xtRecordID rec_id); diff --git a/tests/consistent_snapshot.pl b/tests/consistent_snapshot.pl new file mode 100755 index 00000000000..9e53eaea6a1 --- /dev/null +++ b/tests/consistent_snapshot.pl @@ -0,0 +1,107 @@ +#! /usr/bin/perl + +# Test START TRANSACTION WITH CONSISTENT SNAPSHOT. +# With MWL#116, this is implemented so it is actually consistent. + +use strict; +use warnings; + +use DBI; + +my $UPDATERS= 10; +my $READERS= 5; + +my $ROWS= 50; +my $DURATION= 20; + +my $stop_time= time() + $DURATION; + +sub my_connect { + my $dbh= DBI->connect("dbi:mysql:mysql_socket=/tmp/mysql.sock;database=test", + "root", undef, { RaiseError=>1, PrintError=>0, AutoCommit=>0}); + $dbh->do("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + $dbh->do("SET SESSION autocommit = 0"); + return $dbh; +} + +sub my_setup { + my $dbh= my_connect(); + + $dbh->do("DROP TABLE IF EXISTS test_consistent_snapshot1, test_consistent_snapshot2"); + $dbh->do(<<TABLE); +CREATE TABLE test_consistent_snapshot1 ( + a INT PRIMARY KEY, + b INT NOT NULL +) ENGINE=InnoDB +TABLE + $dbh->do(<<TABLE); +CREATE TABLE test_consistent_snapshot2( + a INT PRIMARY KEY, + b INT NOT NULL +) ENGINE=PBXT +TABLE + + for (my $i= 0; $i < $ROWS; $i++) { + my $value= int(rand()*1000); + $dbh->do("INSERT INTO test_consistent_snapshot1 VALUES (?, ?)", undef, + $i, $value); + $dbh->do("INSERT INTO test_consistent_snapshot2 VALUES (?, ?)", undef, + $i, -$value); + } + $dbh->commit(); + $dbh->disconnect(); +} + +sub my_updater { + my $dbh= my_connect(); + + while (time() < $stop_time) { + my $i1= int(rand()*$ROWS); + my $i2= int(rand()*$ROWS); + my $v= int(rand()*99)-49; + $dbh->do("UPDATE test_consistent_snapshot1 SET b = b + ? WHERE a = ?", + undef, $v, $i1); + $dbh->do("UPDATE test_consistent_snapshot2 SET b = b - ? WHERE a = ?", + undef, $v, $i2); + $dbh->commit(); + } + + $dbh->disconnect(); + exit(0); +} + +sub my_reader { + my $dbh= my_connect(); + + my $iteration= 0; + while (time() < $stop_time) { + $dbh->do("START TRANSACTION WITH CONSISTENT SNAPSHOT"); + my $s1= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot1"); + $s1= $s1->[0]; + my $s2= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot2"); + $s2= $s2->[0]; + $dbh->commit(); + if ($s1 + $s2 != 0) { + print STDERR "Found inconsistency, s1=$s1 s2=$s2 iteration=$iteration\n"; + last; + } + ++$iteration; + } + + $dbh->disconnect(); + exit(0); +} + +my_setup(); + +for (1 .. $UPDATERS) { + fork() || my_updater(); +} + +for (1 .. $READERS) { + fork() || my_reader(); +} + +waitpid(-1, 0) for (1 .. ($UPDATERS + $READERS)); + +print "All checks done\n"; |