diff options
Diffstat (limited to 'storage/pbxt/src/xactlog_xt.cc')
-rw-r--r-- | storage/pbxt/src/xactlog_xt.cc | 123 |
1 files changed, 114 insertions, 9 deletions
diff --git a/storage/pbxt/src/xactlog_xt.cc b/storage/pbxt/src/xactlog_xt.cc index 82c0d85b770..9a1c01a41a3 100644 --- a/storage/pbxt/src/xactlog_xt.cc +++ b/storage/pbxt/src/xactlog_xt.cc @@ -28,6 +28,10 @@ #include "xt_config.h" +#ifdef DRIZZLED +#include <bitset> +#endif + #include <signal.h> #include "xactlog_xt.h" @@ -600,7 +604,12 @@ void XTDatabaseLog::xlog_setup(XTThreadPtr self, XTDatabaseHPtr db, off_t inp_lo xt_init_mutex_with_autoname(self, &xl_write_lock); xt_init_cond(self, &xl_write_cond); +#ifdef XT_XLOG_WAIT_SPINS xt_writing = 0; + xt_waiting = 0; +#else + xt_writing = FALSE; +#endif xl_log_id = 0; xl_log_file = 0; @@ -752,6 +761,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xtLogOffset req_flush_log_offset; size_t part_size; xtWord8 flush_time; + xtWord2 sum; if (!size1) { /* Just flush the buffer... */ @@ -790,13 +800,13 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat * enough space in the buffer, or a flush * is required. */ + xtWord8 then; /* * The objective of the following code is to * pick one writer, out of all threads. - * The others rest will wait for the writer. + * The rest will wait for the writer. */ - xtBool i_am_writer; if (write_reason == WR_FLUSH) { /* Before we flush, check if we should wait for running @@ -805,8 +815,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0 && xl_last_flush_time) { /* Wait for about as long as the last flush took, * the idea is to saturate the disk with flushing...: */ - xtWord8 then = xt_trace_clock() + (xtWord8) xl_last_flush_time; - + then = xt_trace_clock() + (xtWord8) xl_last_flush_time; for (;;) { xt_critical_wait(); /* If a thread leaves this loop because times up, or @@ -831,6 +840,55 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat } } +#ifdef XT_XLOG_WAIT_SPINS + /* Spin for 1/1000s: */ + then = xt_trace_clock() + (xtWord8) 1000; + for (;;) { + if (!xt_atomic_tas4(&xt_writing, 1)) + break; + + /* If I am not the writer, then I just waited for the + * writer. So it may be that my requirements have now + * been met! + */ + if (write_reason == WR_FLUSH) { + /* If the reason was to flush, then + * check the last flush sequence, maybe it is passed + * our required sequence. + */ + if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) { + /* The required flush position of the log is before + * or equal to the actual flush position. This means the condition + * for this thread have been satified (via group commit). + * Nothing more to do! + */ + ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0); + return OK; + } + } + else { + /* It may be that there is now space in the append buffer: */ + if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) + goto copy_to_log_buffer; + } + + if (xt_trace_clock() >= then) { + xt_lock_mutex_ns(&xl_write_lock); + xt_waiting++; + if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) { + xt_waiting--; + xt_unlock_mutex_ns(&xl_write_lock); + return FALSE; + } + xt_waiting--; + xt_unlock_mutex_ns(&xl_write_lock); + } + else + xt_critical_wait(); + } +#else + xtBool i_am_writer; + i_am_writer = FALSE; xt_lock_mutex_ns(&xl_write_lock); if (xt_writing) { @@ -873,6 +931,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat goto write_log_to_file; } +#endif /* I am the writer, check the conditions, again: */ if (write_reason == WR_FLUSH) { @@ -881,8 +940,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat /* The writers required flush position is before or equal * to the actual position, so the writer is done... */ +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0); return OK; } @@ -923,8 +988,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xt_unlock_mutex_ns(&xl_db->db_wr_lock); } } +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0); return ok; } @@ -934,8 +1005,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat * to copy our data into the buffer: */ if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) { +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif goto copy_to_log_buffer; } } @@ -1085,8 +1162,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat * position, continue writing: */ goto rewrite; +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0); return OK; } @@ -1100,8 +1183,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers) goto rewrite; +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif } copy_to_log_buffer: @@ -1146,8 +1235,6 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat case XT_LOG_ENT_DELETE_BG: case XT_LOG_ENT_DELETE_FL: case XT_LOG_ENT_DELETE_FL_BG: - xtWord2 sum; - sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id); XT_SET_DISK_2(record->xu.xu_checksum_2, sum); @@ -1158,6 +1245,10 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xl_db->db_xn_total_writer_count++; } break; + case XT_LOG_ENT_REC_REMOVED_BI: + sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id); + XT_SET_DISK_2(record->xu.xu_checksum_2, sum); + break; case XT_LOG_ENT_ROW_NEW: case XT_LOG_ENT_ROW_NEW_FL: record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id); @@ -1209,8 +1300,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat return OK; write_failed: +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); +#endif return FAILED; } @@ -1595,7 +1692,7 @@ void XTDatabaseLog::xlog_seq_close(XTXactSeqReadPtr seq) seq->xseq_log_eof = 0; } -xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok __attribute__((unused))) +xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool XT_UNUSED(missing_ok)) { if (seq->xseq_rec_log_id != log_id) { seq->xseq_rec_log_id = log_id; @@ -2094,7 +2191,9 @@ xtBool XTDatabaseLog::xlog_seq_next(XTXactSeqReadPtr seq, XTXactLogBufferDPtr *r goto return_empty; } default: - ASSERT_NS(FALSE); + /* It is possible to land here after a crash, if the + * log was not completely written. + */ seq->xseq_record_len = 0; goto return_empty; } @@ -2304,7 +2403,13 @@ static void xlog_wr_wait_for_log_flush(XTThreadPtr self, XTDatabaseHPtr db) * the wait, and the sweeper has nothing to do, and the checkpointer. */ if (db->db_xn_curr_id == last_xn_id && - xt_xn_is_before(xt_xn_get_curr_id(db), db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id + /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id, + * This should work because we are not concerned about the difference + * between xt_xn_get_curr_id(db) and db->db_xn_curr_id, + * Which is just a matter of when transactions we can expect ot find + * in memory (see {GAP-INC-ADD-XACT}) + */ + xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id !db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) { /* There seems to be no activity at the moment. * this might be a good time to write the log data. |