summaryrefslogtreecommitdiff
path: root/storage/pbxt/src/xaction_xt.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/pbxt/src/xaction_xt.cc')
-rw-r--r--storage/pbxt/src/xaction_xt.cc150
1 files changed, 104 insertions, 46 deletions
diff --git a/storage/pbxt/src/xaction_xt.cc b/storage/pbxt/src/xaction_xt.cc
index 48abc5d2b66..fd7ae88a4ae 100644
--- a/storage/pbxt/src/xaction_xt.cc
+++ b/storage/pbxt/src/xaction_xt.cc
@@ -1123,7 +1123,6 @@ xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
*/
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
seg = &db->db_xn_idx[i];
- XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
seg->xs_last_xn_id = db->db_xn_curr_id;
}
@@ -1287,27 +1286,61 @@ xtPublic xtBool xt_xn_begin(XTThreadPtr self)
return OK;
}
-static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
+static void xn_end_release_locks(XTThreadPtr thread)
+{
+ XTXactDataPtr xact = thread->st_xact_data;
+ XTDatabaseHPtr db = thread->st_database;
+ ASSERT_NS(xact);
+
+ /* {REMOVE-LOCKS} Drop locks if you have any: */
+ thread->st_lock_list.xt_remove_all_locks(db, thread);
+
+ /* Do this afterwards to make sure the sweeper
+ * does not cleanup transactions start cleaning up
+ * before any transactions that were waiting for
+ * this transaction have completed!
+ */
+ xact->xd_end_xn_id = db->db_xn_curr_id;
+
+ /* Now you can sweep! */
+ xact->xd_flags |= XT_XN_XAC_SWEEP;
+}
+
+/* The commit is split into two phases: one "fast" for MariaDB commit_ordered(),
+ * and one "slow" for commit(). When not using internal 2pc, there is only one
+ * call combining both phases.
+ */
+
+enum {
+ XN_END_PHASE_FAST = 1,
+ XN_END_PHASE_SLOW = 2,
+ XN_END_PHASE_BOTH = 3
+};
+
+static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool writer, int phase)
{
XTXactDataPtr xact;
xtBool ok = TRUE;
+ xtBool err;
ASSERT_NS(thread->st_xact_data);
if ((xact = thread->st_xact_data)) {
XTDatabaseHPtr db = thread->st_database;
xtXactID xn_id = xact->xd_start_xn_id;
- xtBool writer;
- if ((writer = thread->st_xact_writer)) {
+ if (writer) {
/* The transaction wrote something: */
XTXactEndEntryDRec entry;
xtWord4 sum;
- sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
- entry.xe_status_1 = status;
- entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
- XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
- XT_SET_DISK_4(entry.xe_not_used_4, 0);
+ if (phase & XN_END_PHASE_FAST)
+ {
+ sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
+ entry.xe_status_1 = status;
+ entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
+ XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
+ XT_SET_DISK_4(entry.xe_not_used_4, 0);
+ }
#ifdef XT_IMPLEMENT_NO_ACTION
/* This will check any resticts that have been delayed to the end of the statement. */
@@ -1319,20 +1352,35 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
}
#endif
- /* Flush the data log: */
- if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
+ /* Flush the data log (in the "fast" case we already did it in prepare: */
+ if ((phase & XN_END_PHASE_SLOW) && !thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
ok = FALSE;
status = XT_LOG_ENT_ABORT;
}
/* Write and flush the transaction log: */
- if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
+ if (phase == XN_END_PHASE_FAST) {
+ /* Fast phase, delay any write or flush to later. */
+ err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH);
+ } else if (phase == XN_END_PHASE_SLOW) {
+ /* We already appended the commit record in the fast phase.
+ * Now just call with empty record to ensure we write/flush
+ * the log as needed for this commit.
+ */
+ err = !xt_xlog_log_data(thread, 0, NULL, xt_db_flush_log_at_trx_commit);
+ } else /* phase == XN_END_PHASE_BOTH */ {
+ /* Both phases at once, append commit record and write/flush normally. */
+ ASSERT_NS(phase == XN_END_PHASE_BOTH);
+ err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit);
+ }
+
+ if (err) {
ok = FALSE;
status = XT_LOG_ENT_ABORT;
/* Make sure this is done, if we failed to log
* the transction end!
*/
- if (thread->st_xact_writer) {
+ if (writer) {
/* Adjust this in case of error, but don't forget
* to lock!
*/
@@ -1347,46 +1395,46 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
}
}
- /* Setting this flag completes the transaction,
- * Do this before we release the locks, because
- * the unlocked transactions expect the
- * transaction they are waiting for to be
- * gone!
- */
- xact->xd_end_time = ++db->db_xn_end_time;
- if (status == XT_LOG_ENT_COMMIT) {
- thread->st_statistics.st_commits++;
- xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
- }
- else {
- thread->st_statistics.st_rollbacks++;
- xact->xd_flags |= XT_XN_XAC_ENDED;
+ if (phase & XN_END_PHASE_FAST) {
+ /* Setting this flag completes the transaction,
+ * Do this before we release the locks, because
+ * the unlocked transactions expect the
+ * transaction they are waiting for to be
+ * gone!
+ */
+ xact->xd_end_time = ++db->db_xn_end_time;
+ if (status == XT_LOG_ENT_COMMIT) {
+ thread->st_statistics.st_commits++;
+ xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
+ }
+ else {
+ thread->st_statistics.st_rollbacks++;
+ xact->xd_flags |= XT_XN_XAC_ENDED;
+ }
}
- /* {REMOVE-LOCKS} Drop locks is you have any: */
- thread->st_lock_list.xt_remove_all_locks(db, thread);
-
- /* Do this afterwards to make sure the sweeper
- * does not cleanup transactions start cleaning up
- * before any transactions that were waiting for
- * this transaction have completed!
+ /* Be as fast as possible in the "fast" path, as we want to be as
+ * fast as possible here (we will release slow locks immediately
+ * after in the "slow" part).
+ * ToDo: If we ran the fast part, the slow part could release locks
+ * _before_ fsync(), rather than after.
*/
- xact->xd_end_xn_id = db->db_xn_curr_id;
+ if (!(phase & XN_END_PHASE_SLOW))
+ return ok;
- /* Now you can sweep! */
- xact->xd_flags |= XT_XN_XAC_SWEEP;
+ xn_end_release_locks(thread);
}
else {
/* Read-only transaction can be removed, immediately */
- xact->xd_end_time = ++db->db_xn_end_time;
- xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-
- /* Drop locks is you have any: */
- thread->st_lock_list.xt_remove_all_locks(db, thread);
+ if (phase & XN_END_PHASE_FAST) {
+ xact->xd_end_time = ++db->db_xn_end_time;
+ xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
- xact->xd_end_xn_id = db->db_xn_curr_id;
+ if (!(phase & XN_END_PHASE_SLOW))
+ return ok;
+ }
- xact->xd_flags |= XT_XN_XAC_SWEEP;
+ xn_end_release_locks(thread);
if (xt_xn_delete_xact(db, xn_id, thread)) {
if (db->db_xn_min_ram_id == xn_id)
@@ -1478,12 +1526,22 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
{
- return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread->st_xact_writer, XN_END_PHASE_BOTH);
+}
+
+xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer)
+{
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_FAST);
+}
+
+xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer)
+{
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_SLOW);
}
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
{
- return xn_end_xact(thread, XT_LOG_ENT_ABORT);
+ return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread->st_xact_writer, XN_END_PHASE_BOTH);
}
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)