diff options
Diffstat (limited to 'src/conn')
-rw-r--r-- | src/conn/conn_log.c | 140 |
1 files changed, 139 insertions, 1 deletions
diff --git a/src/conn/conn_log.c b/src/conn/conn_log.c index 36d4d539d92..7499d424c4e 100644 --- a/src/conn/conn_log.c +++ b/src/conn/conn_log.c @@ -347,6 +347,122 @@ err: __wt_err(session, ret, "log close server error"); } /* + * Simple structure for sorting written slots. + */ +typedef struct { + WT_LSN lsn; + uint32_t slot_index; +} WT_LOG_WRLSN_ENTRY; + +/* + * __log_wrlsn_cmp -- + * The log wrlsn comparison function for qsort. + */ +static int +__log_wrlsn_cmp(const void *a, const void *b) +{ + WT_LOG_WRLSN_ENTRY *ae, *be; + + ae = (WT_LOG_WRLSN_ENTRY *)a; + be = (WT_LOG_WRLSN_ENTRY *)b; + return (LOG_CMP(&ae->lsn, &be->lsn)); +} + +/* + * __log_wrlsn_server -- + * The log wrlsn server thread. + */ +static void * +__log_wrlsn_server(void *arg) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_LOG *log; + WT_LOG_WRLSN_ENTRY written[SLOT_POOL]; + WT_LOGSLOT *slot; + WT_SESSION_IMPL *session; + int i, save_i, written_i, yield; + + session = arg; + conn = S2C(session); + log = conn->log; + yield = 0; + while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { + /* + * No need to use the log_slot_lock because the slot pool + * is statically allocated and any slot in the + * WT_LOG_SLOT_WRITTEN state is exclusively ours for now. + */ + i = 0; + written_i = 0; + /* + * Walk the array once saving any slots that are in the + * WT_LOG_SLOT_WRITTEN state. + */ + while (i < SLOT_POOL) { + save_i = i; + slot = &log->slot_pool[i++]; + if (slot->slot_state != WT_LOG_SLOT_WRITTEN) + continue; + written[written_i].slot_index = save_i; + written[written_i++].lsn = slot->slot_release_lsn; + } + /* + * If we found any written slots process them. We sort them + * based on the release LSN, and then look for them in order. + */ + if (written_i > 0) { + yield = 0; + qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY), + __log_wrlsn_cmp); + /* + * We know the written array is sorted by LSN. Go + * through them either advancing write_lsn or stop + * as soon as one is not in order. + */ + for (i = 0; i < written_i; i++) { + if (LOG_CMP(&log->write_lsn, + &written[i].lsn) != 0) + break; + /* + * If we get here we have a slot to process. + * Advance the LSN and process the slot. + */ + slot = &log->slot_pool[written[i].slot_index]; + WT_ASSERT(session, LOG_CMP(&written[i].lsn, + &slot->slot_release_lsn) == 0); + log->write_lsn = slot->slot_end_lsn; + WT_ERR(__wt_cond_signal(session, + log->log_write_cond)); + WT_STAT_FAST_CONN_INCR(session, log_write_lsn); + + /* + * Signal the close thread if needed. + */ + if (F_ISSET(slot, SLOT_CLOSEFH)) + WT_ERR(__wt_cond_signal(session, + conn->log_close_cond)); + WT_ERR(__wt_log_slot_free(session, slot)); + } + } + /* + * If we saw a later write, we always want to yield because + * we know something is in progress. + */ + if (yield++ < 1000) + __wt_yield(); + else + /* Wait until the next event. */ + WT_ERR(__wt_cond_wait(session, + conn->log_wrlsn_cond, 100000)); + } + + if (0) +err: __wt_err(session, ret, "log wrlsn server error"); + return (NULL); +} + +/* * __log_server -- * The log server thread. */ @@ -479,12 +595,24 @@ __wt_logmgr_open(WT_SESSION_IMPL *session) "log close server", 0, &conn->log_close_cond)); /* - * Start the thread. + * Start the log file close thread. */ WT_RET(__wt_thread_create(conn->log_close_session, &conn->log_close_tid, __log_close_server, conn->log_close_session)); conn->log_close_tid_set = 1; + /* + * Start the log write LSN thread. It is not configurable. + * If logging is enabled, this thread runs. + */ + WT_RET(__wt_open_internal_session( + conn, "log-wrlsn-server", 0, 0, &conn->log_wrlsn_session)); + WT_RET(__wt_cond_alloc(conn->log_wrlsn_session, + "log write lsn server", 0, &conn->log_wrlsn_cond)); + WT_RET(__wt_thread_create(conn->log_wrlsn_session, + &conn->log_wrlsn_tid, __log_wrlsn_server, conn->log_wrlsn_session)); + conn->log_wrlsn_tid_set = 1; + /* If no log thread services are configured, we're done. */ if (!FLD_ISSET(conn->log_flags, (WT_CONN_LOG_ARCHIVE | WT_CONN_LOG_PREALLOC))) @@ -557,6 +685,16 @@ __wt_logmgr_destroy(WT_SESSION_IMPL *session) WT_TRET(wt_session->close(wt_session, NULL)); conn->log_close_session = NULL; } + if (conn->log_wrlsn_tid_set) { + WT_TRET(__wt_cond_signal(session, conn->log_wrlsn_cond)); + WT_TRET(__wt_thread_join(session, conn->log_wrlsn_tid)); + conn->log_wrlsn_tid_set = 0; + } + if (conn->log_wrlsn_session != NULL) { + wt_session = &conn->log_wrlsn_session->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + conn->log_wrlsn_session = NULL; + } WT_TRET(__wt_log_close(session)); |