summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@mongodb.com>2015-03-09 12:43:18 +1100
committerMichael Cahill <michael.cahill@mongodb.com>2015-03-09 12:43:18 +1100
commit336aaf9e1396ffa6eed930284227e85ad1513a62 (patch)
treefdfae05db8629dbdce921e5905d4a256614e4c26
parent0315ee75f712ed0ccddca0616339de93b17835a3 (diff)
parent73be9a4ac6eda2533b2dfd8e0a20bdd22bc0e4f9 (diff)
downloadmongo-336aaf9e1396ffa6eed930284227e85ad1513a62.tar.gz
Merge pull request #1696 from wiredtiger/log-wrlsn-thread
Log wrlsn thread
-rw-r--r--dist/s_string.ok2
-rw-r--r--dist/stat_data.py3
-rw-r--r--src/conn/conn_log.c143
-rw-r--r--src/docs/spell.ok1
-rw-r--r--src/include/connection.h4
-rw-r--r--src/include/extern.h2
-rw-r--r--src/include/log.h10
-rw-r--r--src/include/stat.h3
-rw-r--r--src/include/wiredtiger.in110
-rw-r--r--src/log/log.c66
-rw-r--r--src/log/log_slot.c30
-rw-r--r--src/support/stat.c8
12 files changed, 301 insertions, 81 deletions
diff --git a/dist/s_string.ok b/dist/s_string.ok
index 66439faf161..8b0335a6480 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -551,6 +551,7 @@ dest
dev
dhandle
dhandles
+dir
dirlist
dl
dlclose
@@ -1161,6 +1162,7 @@ wrapup
writelock
writeunlock
wrlock
+wrlsn
ws
wti
wtperf
diff --git a/dist/stat_data.py b/dist/stat_data.py
index 5a42f2ff318..dd4d292c8b6 100644
--- a/dist/stat_data.py
+++ b/dist/stat_data.py
@@ -221,11 +221,14 @@ connection_stats = [
LogStat('log_prealloc_max', 'number of pre-allocated log files to create'),
LogStat('log_prealloc_used', 'pre-allocated log files used'),
LogStat('log_reads', 'log read operations'),
+ LogStat('log_release_write_lsn', 'log release advances write LSN'),
LogStat('log_scan_records', 'records processed by log scan'),
LogStat('log_scan_rereads', 'log scan records requiring two reads'),
LogStat('log_scans', 'log scan operations'),
LogStat('log_sync', 'log sync operations'),
+ LogStat('log_sync_dir', 'log sync_dir operations'),
LogStat('log_writes', 'log write operations'),
+ LogStat('log_write_lsn', 'log server thread advances write LSN'),
LogStat('log_slot_consolidated', 'logging bytes consolidated'),
LogStat('log_slot_closes', 'consolidated slot closures'),
diff --git a/src/conn/conn_log.c b/src/conn/conn_log.c
index 36d4d539d92..315e93c1875 100644
--- a/src/conn/conn_log.c
+++ b/src/conn/conn_log.c
@@ -347,6 +347,124 @@ err: __wt_err(session, ret, "log close server error");
}
/*
+ * Simple structure for sorting written slots.
+ */
+typedef struct {
+ WT_LSN lsn;
+ uint32_t slot_index;
+} WT_LOG_WRLSN_ENTRY;
+
+/*
+ * __log_wrlsn_cmp --
+ * The log wrlsn comparison function for qsort.
+ */
+static int
+__log_wrlsn_cmp(const void *a, const void *b)
+{
+ WT_LOG_WRLSN_ENTRY *ae, *be;
+
+ ae = (WT_LOG_WRLSN_ENTRY *)a;
+ be = (WT_LOG_WRLSN_ENTRY *)b;
+ return (LOG_CMP(&ae->lsn, &be->lsn));
+}
+
+/*
+ * __log_wrlsn_server --
+ * The log wrlsn server thread.
+ */
+static void *
+__log_wrlsn_server(void *arg)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_LOG *log;
+ WT_LOG_WRLSN_ENTRY written[SLOT_POOL];
+ WT_LOGSLOT *slot;
+ WT_SESSION_IMPL *session;
+ size_t written_i;
+ uint32_t i, save_i;
+ int yield;
+
+ session = arg;
+ conn = S2C(session);
+ log = conn->log;
+ yield = 0;
+ while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
+ /*
+ * No need to use the log_slot_lock because the slot pool
+ * is statically allocated and any slot in the
+ * WT_LOG_SLOT_WRITTEN state is exclusively ours for now.
+ */
+ i = 0;
+ written_i = 0;
+ /*
+ * Walk the array once saving any slots that are in the
+ * WT_LOG_SLOT_WRITTEN state.
+ */
+ while (i < SLOT_POOL) {
+ save_i = i;
+ slot = &log->slot_pool[i++];
+ if (slot->slot_state != WT_LOG_SLOT_WRITTEN)
+ continue;
+ written[written_i].slot_index = save_i;
+ written[written_i++].lsn = slot->slot_release_lsn;
+ }
+ /*
+ * If we found any written slots process them. We sort them
+ * based on the release LSN, and then look for them in order.
+ */
+ if (written_i > 0) {
+ yield = 0;
+ qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY),
+ __log_wrlsn_cmp);
+ /*
+ * We know the written array is sorted by LSN. Go
+ * through them either advancing write_lsn or stop
+ * as soon as one is not in order.
+ */
+ for (i = 0; i < written_i; i++) {
+ if (LOG_CMP(&log->write_lsn,
+ &written[i].lsn) != 0)
+ break;
+ /*
+ * If we get here we have a slot to process.
+ * Advance the LSN and process the slot.
+ */
+ slot = &log->slot_pool[written[i].slot_index];
+ WT_ASSERT(session, LOG_CMP(&written[i].lsn,
+ &slot->slot_release_lsn) == 0);
+ log->write_lsn = slot->slot_end_lsn;
+ WT_ERR(__wt_cond_signal(session,
+ log->log_write_cond));
+ WT_STAT_FAST_CONN_INCR(session, log_write_lsn);
+
+ /*
+ * Signal the close thread if needed.
+ */
+ if (F_ISSET(slot, SLOT_CLOSEFH))
+ WT_ERR(__wt_cond_signal(session,
+ conn->log_close_cond));
+ WT_ERR(__wt_log_slot_free(session, slot));
+ }
+ }
+ /*
+ * If we saw a later write, we always want to yield because
+ * we know something is in progress.
+ */
+ if (yield++ < 1000)
+ __wt_yield();
+ else
+ /* Wait until the next event. */
+ WT_ERR(__wt_cond_wait(session,
+ conn->log_wrlsn_cond, 100000));
+ }
+
+ if (0)
+err: __wt_err(session, ret, "log wrlsn server error");
+ return (NULL);
+}
+
+/*
* __log_server --
* The log server thread.
*/
@@ -479,12 +597,24 @@ __wt_logmgr_open(WT_SESSION_IMPL *session)
"log close server", 0, &conn->log_close_cond));
/*
- * Start the thread.
+ * Start the log file close thread.
*/
WT_RET(__wt_thread_create(conn->log_close_session,
&conn->log_close_tid, __log_close_server, conn->log_close_session));
conn->log_close_tid_set = 1;
+ /*
+ * Start the log write LSN thread. It is not configurable.
+ * If logging is enabled, this thread runs.
+ */
+ WT_RET(__wt_open_internal_session(
+ conn, "log-wrlsn-server", 0, 0, &conn->log_wrlsn_session));
+ WT_RET(__wt_cond_alloc(conn->log_wrlsn_session,
+ "log write lsn server", 0, &conn->log_wrlsn_cond));
+ WT_RET(__wt_thread_create(conn->log_wrlsn_session,
+ &conn->log_wrlsn_tid, __log_wrlsn_server, conn->log_wrlsn_session));
+ conn->log_wrlsn_tid_set = 1;
+
/* If no log thread services are configured, we're done. */
if (!FLD_ISSET(conn->log_flags,
(WT_CONN_LOG_ARCHIVE | WT_CONN_LOG_PREALLOC)))
@@ -557,6 +687,17 @@ __wt_logmgr_destroy(WT_SESSION_IMPL *session)
WT_TRET(wt_session->close(wt_session, NULL));
conn->log_close_session = NULL;
}
+ if (conn->log_wrlsn_tid_set) {
+ WT_TRET(__wt_cond_signal(session, conn->log_wrlsn_cond));
+ WT_TRET(__wt_thread_join(session, conn->log_wrlsn_tid));
+ conn->log_wrlsn_tid_set = 0;
+ }
+ WT_TRET(__wt_cond_destroy(session, &conn->log_wrlsn_cond));
+ if (conn->log_wrlsn_session != NULL) {
+ wt_session = &conn->log_wrlsn_session->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ conn->log_wrlsn_session = NULL;
+ }
WT_TRET(__wt_log_close(session));
diff --git a/src/docs/spell.ok b/src/docs/spell.ok
index f333a8fff58..df31a272361 100644
--- a/src/docs/spell.ok
+++ b/src/docs/spell.ok
@@ -87,6 +87,7 @@ ack'ed
ajn
alloc
allocator
+allocators
allocsize
ao
api
diff --git a/src/include/connection.h b/src/include/connection.h
index 9cb42ae7c80..78b2949ab98 100644
--- a/src/include/connection.h
+++ b/src/include/connection.h
@@ -320,6 +320,10 @@ struct __wt_connection_impl {
WT_SESSION_IMPL *log_close_session;/* Log close thread session */
wt_thread_t log_close_tid; /* Log close thread thread */
int log_close_tid_set;/* Log close thread set */
+ WT_CONDVAR *log_wrlsn_cond;/* Log write lsn thread wait mutex */
+ WT_SESSION_IMPL *log_wrlsn_session;/* Log write lsn thread session */
+ wt_thread_t log_wrlsn_tid; /* Log write lsn thread thread */
+ int log_wrlsn_tid_set;/* Log write lsn thread set */
WT_LOG *log; /* Logging structure */
WT_COMPRESSOR *log_compressor;/* Logging compressor */
wt_off_t log_file_max; /* Log file max size */
diff --git a/src/include/extern.h b/src/include/extern.h
index 0ef055e1162..bddbb5e01eb 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -351,7 +351,7 @@ extern int __wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot);
extern int __wt_log_slot_notify(WT_SESSION_IMPL *session, WT_LOGSLOT *slot);
extern int __wt_log_slot_wait(WT_SESSION_IMPL *session, WT_LOGSLOT *slot);
extern int64_t __wt_log_slot_release(WT_LOGSLOT *slot, uint64_t size);
-extern int __wt_log_slot_free(WT_LOGSLOT *slot);
+extern int __wt_log_slot_free(WT_SESSION_IMPL *session, WT_LOGSLOT *slot);
extern int __wt_log_slot_grow_buffers(WT_SESSION_IMPL *session, size_t newsize);
extern int __wt_clsm_init_merge( WT_CURSOR *cursor, u_int start_chunk, uint32_t start_id, u_int nchunks);
extern int __wt_clsm_open(WT_SESSION_IMPL *session, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
diff --git a/src/include/log.h b/src/include/log.h
index 82d90070609..760321d9abb 100644
--- a/src/include/log.h
+++ b/src/include/log.h
@@ -59,17 +59,21 @@
/*
* Possible values for the consolidation array slot states:
+ * (NOTE: Any new states must be > WT_LOG_SLOT_DONE and < WT_LOG_SLOT_READY.)
+ *
* < WT_LOG_SLOT_DONE - threads are actively writing to the log.
* WT_LOG_SLOT_DONE - all activity on this slot is complete.
* WT_LOG_SLOT_FREE - slot is available for allocation.
* WT_LOG_SLOT_PENDING - slot is transitioning from ready to active.
+ * WT_LOG_SLOT_WRITTEN - slot is written and should be processed by worker.
* WT_LOG_SLOT_READY - slot is ready for threads to join.
* > WT_LOG_SLOT_READY - threads are actively consolidating on this slot.
*/
#define WT_LOG_SLOT_DONE 0
#define WT_LOG_SLOT_FREE 1
#define WT_LOG_SLOT_PENDING 2
-#define WT_LOG_SLOT_READY 3
+#define WT_LOG_SLOT_WRITTEN 3
+#define WT_LOG_SLOT_READY 4
typedef WT_COMPILER_TYPE_ALIGN(WT_CACHE_LINE_ALIGNMENT) struct {
int64_t slot_state; /* Slot state */
uint64_t slot_group_size; /* Group size */
@@ -92,9 +96,11 @@ typedef WT_COMPILER_TYPE_ALIGN(WT_CACHE_LINE_ALIGNMENT) struct {
uint32_t flags; /* Flags */
} WT_LOGSLOT;
+#define SLOT_INIT_FLAGS (SLOT_BUFFERED)
+
typedef struct {
WT_LOGSLOT *slot;
- wt_off_t offset;
+ wt_off_t offset;
} WT_MYSLOT;
/* Offset of first record */
diff --git a/src/include/stat.h b/src/include/stat.h
index 3f684478358..21eaff0677f 100644
--- a/src/include/stat.h
+++ b/src/include/stat.h
@@ -215,6 +215,7 @@ struct __wt_connection_stats {
WT_STATS log_prealloc_max;
WT_STATS log_prealloc_used;
WT_STATS log_reads;
+ WT_STATS log_release_write_lsn;
WT_STATS log_scan_records;
WT_STATS log_scan_rereads;
WT_STATS log_scans;
@@ -227,6 +228,8 @@ struct __wt_connection_stats {
WT_STATS log_slot_toosmall;
WT_STATS log_slot_transitions;
WT_STATS log_sync;
+ WT_STATS log_sync_dir;
+ WT_STATS log_write_lsn;
WT_STATS log_writes;
WT_STATS lsm_checkpoint_throttle;
WT_STATS lsm_merge_throttle;
diff --git a/src/include/wiredtiger.in b/src/include/wiredtiger.in
index 9f202300378..fed6042c67a 100644
--- a/src/include/wiredtiger.in
+++ b/src/include/wiredtiger.in
@@ -3335,110 +3335,116 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_CONN_LOG_PREALLOC_USED 1081
/*! log: log read operations */
#define WT_STAT_CONN_LOG_READS 1082
+/*! log: log release advances write LSN */
+#define WT_STAT_CONN_LOG_RELEASE_WRITE_LSN 1083
/*! log: records processed by log scan */
-#define WT_STAT_CONN_LOG_SCAN_RECORDS 1083
+#define WT_STAT_CONN_LOG_SCAN_RECORDS 1084
/*! log: log scan records requiring two reads */
-#define WT_STAT_CONN_LOG_SCAN_REREADS 1084
+#define WT_STAT_CONN_LOG_SCAN_REREADS 1085
/*! log: log scan operations */
-#define WT_STAT_CONN_LOG_SCANS 1085
+#define WT_STAT_CONN_LOG_SCANS 1086
/*! log: consolidated slot closures */
-#define WT_STAT_CONN_LOG_SLOT_CLOSES 1086
+#define WT_STAT_CONN_LOG_SLOT_CLOSES 1087
/*! log: logging bytes consolidated */
-#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1087
+#define WT_STAT_CONN_LOG_SLOT_CONSOLIDATED 1088
/*! log: consolidated slot joins */
-#define WT_STAT_CONN_LOG_SLOT_JOINS 1088
+#define WT_STAT_CONN_LOG_SLOT_JOINS 1089
/*! log: consolidated slot join races */
-#define WT_STAT_CONN_LOG_SLOT_RACES 1089
+#define WT_STAT_CONN_LOG_SLOT_RACES 1090
/*! log: slots selected for switching that were unavailable */
-#define WT_STAT_CONN_LOG_SLOT_SWITCH_FAILS 1090
+#define WT_STAT_CONN_LOG_SLOT_SWITCH_FAILS 1091
/*! log: record size exceeded maximum */
-#define WT_STAT_CONN_LOG_SLOT_TOOBIG 1091
+#define WT_STAT_CONN_LOG_SLOT_TOOBIG 1092
/*! log: failed to find a slot large enough for record */
-#define WT_STAT_CONN_LOG_SLOT_TOOSMALL 1092
+#define WT_STAT_CONN_LOG_SLOT_TOOSMALL 1093
/*! log: consolidated slot join transitions */
-#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1093
+#define WT_STAT_CONN_LOG_SLOT_TRANSITIONS 1094
/*! log: log sync operations */
-#define WT_STAT_CONN_LOG_SYNC 1094
+#define WT_STAT_CONN_LOG_SYNC 1095
+/*! log: log sync_dir operations */
+#define WT_STAT_CONN_LOG_SYNC_DIR 1096
+/*! log: log server thread advances write LSN */
+#define WT_STAT_CONN_LOG_WRITE_LSN 1097
/*! log: log write operations */
-#define WT_STAT_CONN_LOG_WRITES 1095
+#define WT_STAT_CONN_LOG_WRITES 1098
/*! LSM: sleep for LSM checkpoint throttle */
-#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1096
+#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1099
/*! LSM: sleep for LSM merge throttle */
-#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1097
+#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1100
/*! LSM: rows merged in an LSM tree */
-#define WT_STAT_CONN_LSM_ROWS_MERGED 1098
+#define WT_STAT_CONN_LSM_ROWS_MERGED 1101
/*! LSM: application work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1099
+#define WT_STAT_CONN_LSM_WORK_QUEUE_APP 1102
/*! LSM: merge work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1100
+#define WT_STAT_CONN_LSM_WORK_QUEUE_MANAGER 1103
/*! LSM: tree queue hit maximum */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1101
+#define WT_STAT_CONN_LSM_WORK_QUEUE_MAX 1104
/*! LSM: switch work units currently queued */
-#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1102
+#define WT_STAT_CONN_LSM_WORK_QUEUE_SWITCH 1105
/*! LSM: tree maintenance operations scheduled */
-#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1103
+#define WT_STAT_CONN_LSM_WORK_UNITS_CREATED 1106
/*! LSM: tree maintenance operations discarded */
-#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1104
+#define WT_STAT_CONN_LSM_WORK_UNITS_DISCARDED 1107
/*! LSM: tree maintenance operations executed */
-#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1105
+#define WT_STAT_CONN_LSM_WORK_UNITS_DONE 1108
/*! connection: memory allocations */
-#define WT_STAT_CONN_MEMORY_ALLOCATION 1106
+#define WT_STAT_CONN_MEMORY_ALLOCATION 1109
/*! connection: memory frees */
-#define WT_STAT_CONN_MEMORY_FREE 1107
+#define WT_STAT_CONN_MEMORY_FREE 1110
/*! connection: memory re-allocations */
-#define WT_STAT_CONN_MEMORY_GROW 1108
+#define WT_STAT_CONN_MEMORY_GROW 1111
/*! thread-yield: page acquire busy blocked */
-#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1109
+#define WT_STAT_CONN_PAGE_BUSY_BLOCKED 1112
/*! thread-yield: page acquire eviction blocked */
-#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1110
+#define WT_STAT_CONN_PAGE_FORCIBLE_EVICT_BLOCKED 1113
/*! thread-yield: page acquire locked blocked */
-#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1111
+#define WT_STAT_CONN_PAGE_LOCKED_BLOCKED 1114
/*! thread-yield: page acquire read blocked */
-#define WT_STAT_CONN_PAGE_READ_BLOCKED 1112
+#define WT_STAT_CONN_PAGE_READ_BLOCKED 1115
/*! thread-yield: page acquire time sleeping (usecs) */
-#define WT_STAT_CONN_PAGE_SLEEP 1113
+#define WT_STAT_CONN_PAGE_SLEEP 1116
/*! connection: total read I/Os */
-#define WT_STAT_CONN_READ_IO 1114
+#define WT_STAT_CONN_READ_IO 1117
/*! reconciliation: page reconciliation calls */
-#define WT_STAT_CONN_REC_PAGES 1115
+#define WT_STAT_CONN_REC_PAGES 1118
/*! reconciliation: page reconciliation calls for eviction */
-#define WT_STAT_CONN_REC_PAGES_EVICTION 1116
+#define WT_STAT_CONN_REC_PAGES_EVICTION 1119
/*! reconciliation: split bytes currently awaiting free */
-#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1117
+#define WT_STAT_CONN_REC_SPLIT_STASHED_BYTES 1120
/*! reconciliation: split objects currently awaiting free */
-#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1118
+#define WT_STAT_CONN_REC_SPLIT_STASHED_OBJECTS 1121
/*! connection: pthread mutex shared lock read-lock calls */
-#define WT_STAT_CONN_RWLOCK_READ 1119
+#define WT_STAT_CONN_RWLOCK_READ 1122
/*! connection: pthread mutex shared lock write-lock calls */
-#define WT_STAT_CONN_RWLOCK_WRITE 1120
+#define WT_STAT_CONN_RWLOCK_WRITE 1123
/*! session: open cursor count */
-#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1121
+#define WT_STAT_CONN_SESSION_CURSOR_OPEN 1124
/*! session: open session count */
-#define WT_STAT_CONN_SESSION_OPEN 1122
+#define WT_STAT_CONN_SESSION_OPEN 1125
/*! transaction: transaction begins */
-#define WT_STAT_CONN_TXN_BEGIN 1123
+#define WT_STAT_CONN_TXN_BEGIN 1126
/*! transaction: transaction checkpoints */
-#define WT_STAT_CONN_TXN_CHECKPOINT 1124
+#define WT_STAT_CONN_TXN_CHECKPOINT 1127
/*! transaction: transaction checkpoint currently running */
-#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1125
+#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1128
/*! transaction: transaction checkpoint max time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1126
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1129
/*! transaction: transaction checkpoint min time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1127
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1130
/*! transaction: transaction checkpoint most recent time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1128
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1131
/*! transaction: transaction checkpoint total time (msecs) */
-#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1129
+#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1132
/*! transaction: transactions committed */
-#define WT_STAT_CONN_TXN_COMMIT 1130
+#define WT_STAT_CONN_TXN_COMMIT 1133
/*! transaction: transaction failures due to cache overflow */
-#define WT_STAT_CONN_TXN_FAIL_CACHE 1131
+#define WT_STAT_CONN_TXN_FAIL_CACHE 1134
/*! transaction: transaction range of IDs currently pinned */
-#define WT_STAT_CONN_TXN_PINNED_RANGE 1132
+#define WT_STAT_CONN_TXN_PINNED_RANGE 1135
/*! transaction: transactions rolled back */
-#define WT_STAT_CONN_TXN_ROLLBACK 1133
+#define WT_STAT_CONN_TXN_ROLLBACK 1136
/*! connection: total write I/Os */
-#define WT_STAT_CONN_WRITE_IO 1134
+#define WT_STAT_CONN_WRITE_IO 1137
/*!
* @}
diff --git a/src/log/log.c b/src/log/log.c
index 3162dd826a0..2fe82e61ef0 100644
--- a/src/log/log.c
+++ b/src/log/log.c
@@ -817,7 +817,7 @@ __wt_log_close(WT_SESSION_IMPL *session)
if (log->log_dir_fh != NULL) {
WT_RET(__wt_verbose(session, WT_VERB_LOG,
"closing log directory %s", log->log_dir_fh->name));
- WT_RET(__wt_fsync(session, log->log_dir_fh));
+ WT_RET(__wt_directory_sync_fh(session, log->log_dir_fh));
WT_RET(__wt_close(session, log->log_dir_fh));
log->log_dir_fh = NULL;
}
@@ -917,7 +917,7 @@ err:
* Release a log slot.
*/
static int
-__log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
+__log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot, int *freep)
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
@@ -930,6 +930,7 @@ __log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
conn = S2C(session);
log = conn->log;
locked = yield_count = 0;
+ *freep = 1;
/* Write the buffered records */
if (F_ISSET(slot, SLOT_BUFFERED)) {
@@ -940,9 +941,29 @@ __log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
}
/*
- * Wait for earlier groups to finish, otherwise there could be holes
- * in the log file.
+ * If this is not a buffered write, meaning the slot we have is a
+ * dummy constructed slot, not from the slot pool, or we have to wait
+ * for a synchronous operation, we do not pass handling of this slot
+ * off to the worker thread. The caller is responsible for freeing
+ * the slot in that case. Otherwise the worker thread will free it.
*/
+ if (F_ISSET(slot, SLOT_BUFFERED) &&
+ !F_ISSET(slot, SLOT_SYNC | SLOT_SYNC_DIR)) {
+ *freep = 0;
+ slot->slot_state = WT_LOG_SLOT_WRITTEN;
+ /*
+ * After this point the worker thread owns the slot. There
+ * is nothing more to do but return.
+ */
+ WT_ERR(__wt_cond_signal(session, conn->log_wrlsn_cond));
+ goto done;
+ }
+
+ /*
+ * Wait for earlier groups to finish, otherwise there could
+ * be holes in the log file.
+ */
+ WT_STAT_FAST_CONN_INCR(session, log_release_write_lsn);
while (LOG_CMP(&log->write_lsn, &slot->slot_release_lsn) != 0) {
if (++yield_count < 1000)
__wt_yield();
@@ -953,6 +974,9 @@ __log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
log->write_lsn = slot->slot_end_lsn;
WT_ERR(__wt_cond_signal(session, log->log_write_cond));
+ /*
+ * Signal the close thread if needed.
+ */
if (F_ISSET(slot, SLOT_CLOSEFH))
WT_ERR(__wt_cond_signal(session, conn->log_close_cond));
@@ -995,7 +1019,7 @@ __log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
WT_ERR(__wt_directory_sync_fh(
session, log->log_dir_fh));
log->sync_dir_lsn = sync_lsn;
- F_CLR(slot, SLOT_SYNC_DIR);
+ WT_STAT_FAST_CONN_INCR(session, log_sync_dir);
}
/*
@@ -1007,26 +1031,22 @@ __log_release(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
"log_release: sync log %s", log->log_fh->name));
WT_STAT_FAST_CONN_INCR(session, log_sync);
WT_ERR(__wt_fsync(session, log->log_fh));
- F_CLR(slot, SLOT_SYNC);
log->sync_lsn = sync_lsn;
WT_ERR(__wt_cond_signal(session, log->log_sync_cond));
}
+ /*
+ * Clear the flags before leaving the loop.
+ */
+ F_CLR(slot, SLOT_SYNC | SLOT_SYNC_DIR);
locked = 0;
__wt_spin_unlock(session, &log->log_sync_lock);
+ if (ret != 0 && slot->slot_error == 0)
+ slot->slot_error = ret;
break;
}
- if (F_ISSET(slot, SLOT_BUF_GROW)) {
- WT_STAT_FAST_CONN_INCR(session, log_buffer_grow);
- F_CLR(slot, SLOT_BUF_GROW);
- WT_STAT_FAST_CONN_INCRV(session,
- log_buffer_size, slot->slot_buf.memsize);
- WT_ERR(__wt_buf_grow(session,
- &slot->slot_buf, slot->slot_buf.memsize * 2));
- }
err: if (locked)
__wt_spin_unlock(session, &log->log_sync_lock);
- if (ret != 0 && slot->slot_error == 0)
- slot->slot_error = ret;
+done:
return (ret);
}
@@ -1477,12 +1497,13 @@ __log_direct_write(WT_SESSION_IMPL *session, WT_ITEM *record, WT_LSN *lsnp,
WT_LOG *log;
WT_LOGSLOT tmp;
WT_MYSLOT myslot;
- int locked;
+ int dummy, locked;
WT_DECL_SPINLOCK_ID(id); /* Must appear last */
log = S2C(session)->log;
myslot.slot = &tmp;
myslot.offset = 0;
+ dummy = 0;
WT_CLEAR(tmp);
/* Fast path the contended case. */
@@ -1498,7 +1519,7 @@ __log_direct_write(WT_SESSION_IMPL *session, WT_ITEM *record, WT_LSN *lsnp,
__wt_spin_unlock(session, &log->log_slot_lock);
locked = 0;
WT_ERR(__log_fill(session, &myslot, 1, record, lsnp));
- WT_ERR(__log_release(session, &tmp));
+ WT_ERR(__log_release(session, &tmp, &dummy));
err: if (locked)
__wt_spin_unlock(session, &log->log_slot_lock);
@@ -1626,11 +1647,11 @@ __log_write_internal(WT_SESSION_IMPL *session, WT_ITEM *record, WT_LSN *lsnp,
WT_LSN lsn;
WT_MYSLOT myslot;
uint32_t rdup_len;
- int locked;
+ int free_slot, locked;
conn = S2C(session);
log = conn->log;
- locked = 0;
+ free_slot = locked = 0;
WT_INIT_LSN(&lsn);
myslot.slot = NULL;
/*
@@ -1712,8 +1733,9 @@ __log_write_internal(WT_SESSION_IMPL *session, WT_ITEM *record, WT_LSN *lsnp,
WT_ERR(__wt_log_slot_wait(session, myslot.slot));
WT_ERR(__log_fill(session, &myslot, 0, record, &lsn));
if (__wt_log_slot_release(myslot.slot, rdup_len) == WT_LOG_SLOT_DONE) {
- WT_ERR(__log_release(session, myslot.slot));
- WT_ERR(__wt_log_slot_free(myslot.slot));
+ WT_ERR(__log_release(session, myslot.slot, &free_slot));
+ if (free_slot)
+ WT_ERR(__wt_log_slot_free(session, myslot.slot));
} else if (LF_ISSET(WT_LOG_FSYNC)) {
/* Wait for our writes to reach disk */
while (LOG_CMP(&log->sync_lsn, &lsn) <= 0 &&
diff --git a/src/log/log_slot.c b/src/log/log_slot.c
index 8dcb2f9f165..02b3056be6f 100644
--- a/src/log/log_slot.c
+++ b/src/log/log_slot.c
@@ -57,7 +57,7 @@ __wt_log_slot_init(WT_SESSION_IMPL *session)
for (i = 0; i < SLOT_POOL; i++) {
WT_ERR(__wt_buf_init(session,
&log->slot_pool[i].slot_buf, WT_LOG_SLOT_BUF_INIT_SIZE));
- F_SET(&log->slot_pool[i], SLOT_BUFFERED);
+ F_SET(&log->slot_pool[i], SLOT_INIT_FLAGS);
}
WT_STAT_FAST_CONN_INCRV(session,
log_buffer_size, WT_LOG_SLOT_BUF_INIT_SIZE * SLOT_POOL);
@@ -295,10 +295,34 @@ __wt_log_slot_release(WT_LOGSLOT *slot, uint64_t size)
* Free a slot back into the pool.
*/
int
-__wt_log_slot_free(WT_LOGSLOT *slot)
+__wt_log_slot_free(WT_SESSION_IMPL *session, WT_LOGSLOT *slot)
{
+ WT_DECL_RET;
+
+ ret = 0;
+ /*
+ * Grow the buffer if needed before returning it to the pool.
+ */
+ if (F_ISSET(slot, SLOT_BUF_GROW)) {
+ WT_STAT_FAST_CONN_INCR(session, log_buffer_grow);
+ WT_STAT_FAST_CONN_INCRV(session,
+ log_buffer_size, slot->slot_buf.memsize);
+ WT_ERR(__wt_buf_grow(session,
+ &slot->slot_buf, slot->slot_buf.memsize * 2));
+ }
+err:
+ /*
+ * No matter if there is an error, we always want to free
+ * the slot back to the pool.
+ */
+ /*
+ * Make sure flags don't get retained between uses.
+ * We have to reset them them here because multiple threads may
+ * change the flags when joining the slot.
+ */
+ slot->flags = SLOT_INIT_FLAGS;
slot->slot_state = WT_LOG_SLOT_FREE;
- return (0);
+ return (ret);
}
/*
diff --git a/src/support/stat.c b/src/support/stat.c
index 0926636a532..9d10c4d5ca6 100644
--- a/src/support/stat.c
+++ b/src/support/stat.c
@@ -447,10 +447,15 @@ __wt_stat_init_connection_stats(WT_CONNECTION_STATS *stats)
"log: log records not compressed";
stats->log_compress_small.desc =
"log: log records too small to compress";
+ stats->log_release_write_lsn.desc =
+ "log: log release advances write LSN";
stats->log_scans.desc = "log: log scan operations";
stats->log_scan_rereads.desc =
"log: log scan records requiring two reads";
+ stats->log_write_lsn.desc =
+ "log: log server thread advances write LSN";
stats->log_sync.desc = "log: log sync operations";
+ stats->log_sync_dir.desc = "log: log sync_dir operations";
stats->log_writes.desc = "log: log write operations";
stats->log_slot_consolidated.desc = "log: logging bytes consolidated";
stats->log_max_filesize.desc = "log: maximum log file size";
@@ -613,9 +618,12 @@ __wt_stat_refresh_connection_stats(void *stats_arg)
stats->log_compress_writes.v = 0;
stats->log_compress_write_fails.v = 0;
stats->log_compress_small.v = 0;
+ stats->log_release_write_lsn.v = 0;
stats->log_scans.v = 0;
stats->log_scan_rereads.v = 0;
+ stats->log_write_lsn.v = 0;
stats->log_sync.v = 0;
+ stats->log_sync_dir.v = 0;
stats->log_writes.v = 0;
stats->log_slot_consolidated.v = 0;
stats->log_prealloc_max.v = 0;