summaryrefslogtreecommitdiff
path: root/storage/pbxt/src/xactlog_xt.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/pbxt/src/xactlog_xt.cc')
-rw-r--r--storage/pbxt/src/xactlog_xt.cc123
1 files changed, 114 insertions, 9 deletions
diff --git a/storage/pbxt/src/xactlog_xt.cc b/storage/pbxt/src/xactlog_xt.cc
index 82c0d85b770..9a1c01a41a3 100644
--- a/storage/pbxt/src/xactlog_xt.cc
+++ b/storage/pbxt/src/xactlog_xt.cc
@@ -28,6 +28,10 @@
#include "xt_config.h"
+#ifdef DRIZZLED
+#include <bitset>
+#endif
+
#include <signal.h>
#include "xactlog_xt.h"
@@ -600,7 +604,12 @@ void XTDatabaseLog::xlog_setup(XTThreadPtr self, XTDatabaseHPtr db, off_t inp_lo
xt_init_mutex_with_autoname(self, &xl_write_lock);
xt_init_cond(self, &xl_write_cond);
+#ifdef XT_XLOG_WAIT_SPINS
xt_writing = 0;
+ xt_waiting = 0;
+#else
+ xt_writing = FALSE;
+#endif
xl_log_id = 0;
xl_log_file = 0;
@@ -752,6 +761,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xtLogOffset req_flush_log_offset;
size_t part_size;
xtWord8 flush_time;
+ xtWord2 sum;
if (!size1) {
/* Just flush the buffer... */
@@ -790,13 +800,13 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
* enough space in the buffer, or a flush
* is required.
*/
+ xtWord8 then;
/*
* The objective of the following code is to
* pick one writer, out of all threads.
- * The others rest will wait for the writer.
+ * The rest will wait for the writer.
*/
- xtBool i_am_writer;
if (write_reason == WR_FLUSH) {
/* Before we flush, check if we should wait for running
@@ -805,8 +815,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0 && xl_last_flush_time) {
/* Wait for about as long as the last flush took,
* the idea is to saturate the disk with flushing...: */
- xtWord8 then = xt_trace_clock() + (xtWord8) xl_last_flush_time;
-
+ then = xt_trace_clock() + (xtWord8) xl_last_flush_time;
for (;;) {
xt_critical_wait();
/* If a thread leaves this loop because times up, or
@@ -831,6 +840,55 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
}
}
+#ifdef XT_XLOG_WAIT_SPINS
+ /* Spin for 1/1000s: */
+ then = xt_trace_clock() + (xtWord8) 1000;
+ for (;;) {
+ if (!xt_atomic_tas4(&xt_writing, 1))
+ break;
+
+ /* If I am not the writer, then I just waited for the
+ * writer. So it may be that my requirements have now
+ * been met!
+ */
+ if (write_reason == WR_FLUSH) {
+ /* If the reason was to flush, then
+ * check the last flush sequence, maybe it is passed
+ * our required sequence.
+ */
+ if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
+ /* The required flush position of the log is before
+ * or equal to the actual flush position. This means the condition
+ * for this thread have been satified (via group commit).
+ * Nothing more to do!
+ */
+ ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
+ return OK;
+ }
+ }
+ else {
+ /* It may be that there is now space in the append buffer: */
+ if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
+ goto copy_to_log_buffer;
+ }
+
+ if (xt_trace_clock() >= then) {
+ xt_lock_mutex_ns(&xl_write_lock);
+ xt_waiting++;
+ if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
+ xt_waiting--;
+ xt_unlock_mutex_ns(&xl_write_lock);
+ return FALSE;
+ }
+ xt_waiting--;
+ xt_unlock_mutex_ns(&xl_write_lock);
+ }
+ else
+ xt_critical_wait();
+ }
+#else
+ xtBool i_am_writer;
+
i_am_writer = FALSE;
xt_lock_mutex_ns(&xl_write_lock);
if (xt_writing) {
@@ -873,6 +931,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
goto write_log_to_file;
}
+#endif
/* I am the writer, check the conditions, again: */
if (write_reason == WR_FLUSH) {
@@ -881,8 +940,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
/* The writers required flush position is before or equal
* to the actual position, so the writer is done...
*/
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
return OK;
}
@@ -923,8 +988,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xt_unlock_mutex_ns(&xl_db->db_wr_lock);
}
}
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
return ok;
}
@@ -934,8 +1005,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
* to copy our data into the buffer:
*/
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) {
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
goto copy_to_log_buffer;
}
}
@@ -1085,8 +1162,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
* position, continue writing: */
goto rewrite;
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
return OK;
}
@@ -1100,8 +1183,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers)
goto rewrite;
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
}
copy_to_log_buffer:
@@ -1146,8 +1235,6 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
case XT_LOG_ENT_DELETE_BG:
case XT_LOG_ENT_DELETE_FL:
case XT_LOG_ENT_DELETE_FL_BG:
- xtWord2 sum;
-
sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
XT_SET_DISK_2(record->xu.xu_checksum_2, sum);
@@ -1158,6 +1245,10 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xl_db->db_xn_total_writer_count++;
}
break;
+ case XT_LOG_ENT_REC_REMOVED_BI:
+ sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
+ XT_SET_DISK_2(record->xu.xu_checksum_2, sum);
+ break;
case XT_LOG_ENT_ROW_NEW:
case XT_LOG_ENT_ROW_NEW_FL:
record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
@@ -1209,8 +1300,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
return OK;
write_failed:
+#ifdef XT_XLOG_WAIT_SPINS
+ xt_writing = 0;
+ if (xt_waiting)
+ xt_cond_wakeall(&xl_write_cond);
+#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
+#endif
return FAILED;
}
@@ -1595,7 +1692,7 @@ void XTDatabaseLog::xlog_seq_close(XTXactSeqReadPtr seq)
seq->xseq_log_eof = 0;
}
-xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok __attribute__((unused)))
+xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool XT_UNUSED(missing_ok))
{
if (seq->xseq_rec_log_id != log_id) {
seq->xseq_rec_log_id = log_id;
@@ -2094,7 +2191,9 @@ xtBool XTDatabaseLog::xlog_seq_next(XTXactSeqReadPtr seq, XTXactLogBufferDPtr *r
goto return_empty;
}
default:
- ASSERT_NS(FALSE);
+ /* It is possible to land here after a crash, if the
+ * log was not completely written.
+ */
seq->xseq_record_len = 0;
goto return_empty;
}
@@ -2304,7 +2403,13 @@ static void xlog_wr_wait_for_log_flush(XTThreadPtr self, XTDatabaseHPtr db)
* the wait, and the sweeper has nothing to do, and the checkpointer.
*/
if (db->db_xn_curr_id == last_xn_id &&
- xt_xn_is_before(xt_xn_get_curr_id(db), db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id
+ /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
+ * This should work because we are not concerned about the difference
+ * between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
+ * Which is just a matter of when transactions we can expect ot find
+ * in memory (see {GAP-INC-ADD-XACT})
+ */
+ xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id
!db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
/* There seems to be no activity at the moment.
* this might be a good time to write the log data.