diff options
Diffstat (limited to 'src/log/log_slot.c')
-rw-r--r-- | src/log/log_slot.c | 579 |
1 files changed, 370 insertions, 209 deletions
diff --git a/src/log/log_slot.c b/src/log/log_slot.c index 0b580af4526..216a594ce3d 100644 --- a/src/log/log_slot.c +++ b/src/log/log_slot.c @@ -9,325 +9,486 @@ #include "wt_internal.h" /* - * This file implements the consolidated array algorithm as described in - * the paper: - * Scalability of write-ahead logging on multicore and multisocket hardware - * by Ryan Johnson, Ippokratis Pandis, Radu Stoica, Manos Athanassoulis - * and Anastasia Ailamaki. - * - * It appeared in The VLDB Journal, DOI 10.1007/s00778-011-0260-8 and can - * be found at: - * http://infoscience.epfl.ch/record/170505/files/aether-smpfulltext.pdf + * __wt_log_slot_activate -- + * Initialize a slot to become active. */ - -/* - * __wt_log_slot_init -- - * Initialize the slot array. - */ -int -__wt_log_slot_init(WT_SESSION_IMPL *session) +void +__wt_log_slot_activate(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) { WT_CONNECTION_IMPL *conn; - WT_DECL_RET; WT_LOG *log; - WT_LOGSLOT *slot; - int32_t i; conn = S2C(session); log = conn->log; - for (i = 0; i < WT_SLOT_POOL; i++) { - log->slot_pool[i].slot_state = WT_LOG_SLOT_FREE; - log->slot_pool[i].slot_index = WT_SLOT_INVALID_INDEX; - } - /* - * Set up the available slots from the pool the first time. - */ - for (i = 0; i < WT_SLOT_ACTIVE; i++) { - slot = &log->slot_pool[i]; - slot->slot_index = (uint32_t)i; - slot->slot_state = WT_LOG_SLOT_READY; - log->slot_array[i] = slot; - } - - /* - * Allocate memory for buffers now that the arrays are setup. Split - * this out to make error handling simpler. - * - * Cap the slot buffer to the log file size. - */ - log->slot_buf_size = - WT_MIN((size_t)conn->log_file_max, WT_LOG_SLOT_BUF_SIZE); - for (i = 0; i < WT_SLOT_POOL; i++) { - WT_ERR(__wt_buf_init(session, - &log->slot_pool[i].slot_buf, log->slot_buf_size)); - F_SET(&log->slot_pool[i], WT_SLOT_INIT_FLAGS); - } - WT_STAT_FAST_CONN_INCRV(session, - log_buffer_size, log->slot_buf_size * WT_SLOT_POOL); - if (0) { -err: while (--i >= 0) - __wt_buf_free(session, &log->slot_pool[i].slot_buf); - } - return (ret); + slot->slot_state = 0; + slot->slot_start_lsn = slot->slot_end_lsn = log->alloc_lsn; + slot->slot_start_offset = log->alloc_lsn.offset; + slot->slot_last_offset = log->alloc_lsn.offset; + slot->slot_fh = log->log_fh; + slot->slot_error = 0; + slot->slot_unbuffered = 0; } /* - * __wt_log_slot_destroy -- - * Clean up the slot array on shutdown. + * __wt_log_slot_close -- + * Close out the slot the caller is using. The slot may already be + * closed or freed by another thread. */ int -__wt_log_slot_destroy(WT_SESSION_IMPL *session) +__wt_log_slot_close( + WT_SESSION_IMPL *session, WT_LOGSLOT *slot, int *releasep, int forced) { WT_CONNECTION_IMPL *conn; WT_LOG *log; - int i; + int64_t end_offset, new_state, old_state; + WT_ASSERT(session, F_ISSET(session, WT_SESSION_LOCKED_SLOT)); conn = S2C(session); log = conn->log; - - for (i = 0; i < WT_SLOT_POOL; i++) - __wt_buf_free(session, &log->slot_pool[i].slot_buf); + if (releasep != NULL) + *releasep = 0; + if (slot == NULL) + return (WT_NOTFOUND); +retry: + old_state = slot->slot_state; + /* + * If this close is coming from a forced close and a thread is in + * the middle of using the slot, return EBUSY. The caller can + * decide if retrying is necessary or not. + */ + if (forced && WT_LOG_SLOT_INPROGRESS(old_state)) + return (EBUSY); + /* + * If someone else is switching out this slot we lost. Nothing to + * do but return. Return WT_NOTFOUND anytime the given slot was + * processed by another closing thread. Only return 0 when we + * actually closed the slot. + */ + if (WT_LOG_SLOT_CLOSED(old_state)) + return (WT_NOTFOUND); + /* + * If someone completely processed this slot, we're done. + */ + if (FLD64_ISSET((uint64_t)slot->slot_state, WT_LOG_SLOT_RESERVED)) + return (WT_NOTFOUND); + new_state = (old_state | WT_LOG_SLOT_CLOSE); + /* + * Close this slot. If we lose the race retry. + */ + if (!__wt_atomic_casiv64(&slot->slot_state, old_state, new_state)) + goto retry; + /* + * We own the slot now. No one else can join. + * Set the end LSN. + */ + WT_STAT_FAST_CONN_INCR(session, log_slot_closes); + if (WT_LOG_SLOT_DONE(new_state) && releasep != NULL) + *releasep = 1; + slot->slot_end_lsn = slot->slot_start_lsn; + end_offset = + WT_LOG_SLOT_JOINED_BUFFERED(old_state) + slot->slot_unbuffered; + slot->slot_end_lsn.offset += (wt_off_t)end_offset; + WT_STAT_FAST_CONN_INCRV(session, + log_slot_consolidated, end_offset); + /* + * XXX Would like to change so one piece of code advances the LSN. + */ + log->alloc_lsn = slot->slot_end_lsn; + WT_ASSERT(session, log->alloc_lsn.file >= log->write_lsn.file); return (0); } /* - * __wt_log_slot_join -- - * Join a consolidated logging slot. Callers should be prepared to deal - * with a ENOMEM return - which indicates no slots could accommodate - * the log record. + * __log_slot_switch_internal -- + * Switch out the current slot and set up a new one. */ -int -__wt_log_slot_join(WT_SESSION_IMPL *session, uint64_t mysize, - uint32_t flags, WT_MYSLOT *myslotp) +static int +__log_slot_switch_internal( + WT_SESSION_IMPL *session, WT_MYSLOT *myslot, int forced) { - WT_CONNECTION_IMPL *conn; + WT_DECL_RET; WT_LOG *log; WT_LOGSLOT *slot; - int64_t new_state, old_state; - uint32_t allocated_slot, slot_attempts; + int free_slot, release; - conn = S2C(session); - log = conn->log; - slot_attempts = 0; + log = S2C(session)->log; + release = 0; + slot = myslot->slot; + + WT_ASSERT(session, F_ISSET(session, WT_SESSION_LOCKED_SLOT)); - if (mysize >= (uint64_t)log->slot_buf_size) { - WT_STAT_FAST_CONN_INCR(session, log_slot_toobig); - return (ENOMEM); - } -find_slot: -#if WT_SLOT_ACTIVE == 1 - allocated_slot = 0; -#else - allocated_slot = __wt_random(&session->rnd) % WT_SLOT_ACTIVE; -#endif - /* - * Get the selected slot. Use a barrier to prevent the compiler from - * caching this read. - */ - WT_BARRIER(); - slot = log->slot_array[allocated_slot]; -join_slot: - /* - * Read the current slot state. Use a barrier to prevent the compiler - * from caching this read. - */ - WT_BARRIER(); - old_state = slot->slot_state; - /* - * WT_LOG_SLOT_READY and higher means the slot is available for - * joining. Any other state means it is in use and transitioning - * from the active array. - */ - if (old_state < WT_LOG_SLOT_READY) { - WT_STAT_FAST_CONN_INCR(session, log_slot_transitions); - goto find_slot; - } /* - * Add in our size to the state and then atomically swap that - * into place if it is still the same value. + * If someone else raced us to closing this specific slot, we're + * done here. */ - new_state = old_state + (int64_t)mysize; - if (new_state < old_state) { - /* Our size doesn't fit here. */ - WT_STAT_FAST_CONN_INCR(session, log_slot_toobig); - goto find_slot; - } + if (slot != log->active_slot) + return (0); + /* - * If the slot buffer isn't big enough to hold this update, try - * to find another slot. + * If close returns WT_NOTFOUND, it means that someone else is + * processing the slot change. However, we could have retried + * from a busy time creating a new slot. If so, we are that + * someone else and we need to try setting up a new slot again. */ - if (new_state > (int64_t)slot->slot_buf.memsize) { - if (++slot_attempts > 5) { - WT_STAT_FAST_CONN_INCR(session, log_slot_toosmall); - return (ENOMEM); + if (!F_ISSET(myslot, WT_MYSLOT_CLOSE)) { + ret = __wt_log_slot_close( + session, slot, &release, forced); + if (ret == WT_NOTFOUND) + return (0); + WT_RET(ret); + if (release) { + WT_RET(__wt_log_release(session, slot, &free_slot)); + if (free_slot) + __wt_log_slot_free(session, slot); } - goto find_slot; } /* - * We lost a race to add our size into this slot. Check the state - * and try again. + * Set that we have closed this slot because we may call in here + * multiple times if we retry creating a new slot. */ - if (!WT_ATOMIC_CAS8(slot->slot_state, old_state, new_state)) { - WT_STAT_FAST_CONN_INCR(session, log_slot_races); - goto join_slot; - } - WT_ASSERT(session, myslotp != NULL); + F_SET(myslot, WT_MYSLOT_CLOSE); + WT_RET(__wt_log_slot_new(session)); + F_CLR(myslot, WT_MYSLOT_CLOSE); + return (0); +} + +/* + * __wt_log_slot_switch -- + * Switch out the current slot and set up a new one. + */ +int +__wt_log_slot_switch( + WT_SESSION_IMPL *session, WT_MYSLOT *myslot, int retry, int forced) +{ + WT_DECL_RET; + WT_LOG *log; + + log = S2C(session)->log; /* - * We joined this slot. Fill in our information to return to - * the caller. + * !!! Since the WT_WITH_SLOT_LOCK macro is a do-while loop, the + * compiler does not like it combined directly with the while loop + * here. */ - WT_STAT_FAST_CONN_INCR(session, log_slot_joins); - if (LF_ISSET(WT_LOG_DSYNC | WT_LOG_FSYNC)) - F_SET(slot, WT_SLOT_SYNC_DIR); - if (LF_ISSET(WT_LOG_FSYNC)) - F_SET(slot, WT_SLOT_SYNC); - myslotp->slot = slot; - myslotp->offset = (wt_off_t)old_state - WT_LOG_SLOT_READY; - return (0); + do { + WT_WITH_SLOT_LOCK(session, log, + ret = __log_slot_switch_internal( + session, myslot, forced)); + if (ret == EBUSY) { + WT_STAT_FAST_CONN_INCR(session, log_slot_switch_busy); + __wt_yield(); + } + } while (F_ISSET(myslot, WT_MYSLOT_CLOSE) || (retry && ret == EBUSY)); + return (ret); } /* - * __log_slot_find_free -- - * Find and return a free log slot. + * __wt_log_slot_new -- + * Find a free slot and switch it as the new active slot. + * Must be called holding the slot lock. */ -static int -__log_slot_find_free(WT_SESSION_IMPL *session, WT_LOGSLOT **slot) +int +__wt_log_slot_new(WT_SESSION_IMPL *session) { WT_CONNECTION_IMPL *conn; WT_LOG *log; - uint32_t pool_i; + WT_LOGSLOT *slot; + int32_t i; + WT_ASSERT(session, F_ISSET(session, WT_SESSION_LOCKED_SLOT)); conn = S2C(session); log = conn->log; - WT_ASSERT(session, slot != NULL); /* - * Encourage processing and moving the write LSN forward. - * That process has to walk the slots anyway, so do that - * work and let it give us the index of a free slot along - * the way. + * Although this function is single threaded, multiple threads could + * be trying to set a new active slot sequentially. If we find an + * active slot that is valid, return. */ - WT_RET(__wt_log_wrlsn(session, &pool_i, NULL)); - while (pool_i == WT_SLOT_POOL) { + if ((slot = log->active_slot) != NULL && + WT_LOG_SLOT_OPEN(slot->slot_state)) + return (0); + + /* + * Keep trying until we can find a free slot. + */ + for (;;) { + /* + * For now just restart at 0. We could use log->pool_index + * if that is inefficient. + */ + for (i = 0; i < WT_SLOT_POOL; i++) { + slot = &log->slot_pool[i]; + if (slot->slot_state == WT_LOG_SLOT_FREE) { + /* + * Make sure that the next buffer size can + * fit in the file. Proactively switch if + * it cannot. This reduces, but does not + * eliminate, log files that exceed the + * maximum file size. + * + * We want to minimize the risk of an + * error due to no space. + */ + WT_RET(__wt_log_acquire(session, + log->slot_buf_size, slot)); + /* + * We have a new, free slot to use. + * Set it as the active slot. + */ + WT_STAT_FAST_CONN_INCR(session, + log_slot_transitions); + log->active_slot = slot; + return (0); + } + } + /* + * If we didn't find any free slots signal the worker thread. + */ + (void)__wt_cond_signal(session, conn->log_wrlsn_cond); __wt_yield(); - WT_RET(__wt_log_wrlsn(session, &pool_i, NULL)); } - *slot = &log->slot_pool[pool_i]; - WT_ASSERT(session, (*slot)->slot_state == WT_LOG_SLOT_FREE); - return (0); + /* NOTREACHED */ } /* - * __wt_log_slot_close -- - * Close a slot and do not allow any other threads to join this slot. - * Remove this from the active slot array and move a new slot from - * the pool into its place. Set up the size of this group; - * Must be called with the logging spinlock held. + * __wt_log_slot_init -- + * Initialize the slot array. */ int -__wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) +__wt_log_slot_init(WT_SESSION_IMPL *session) { WT_CONNECTION_IMPL *conn; + WT_DECL_RET; WT_LOG *log; - WT_LOGSLOT *newslot; - int64_t old_state; + WT_LOGSLOT *slot; + int32_t i; conn = S2C(session); log = conn->log; - /* - * Find an unused slot in the pool. - */ - WT_RET(__log_slot_find_free(session, &newslot)); + WT_CACHE_LINE_ALIGNMENT_VERIFY(session, log->slot_pool); + for (i = 0; i < WT_SLOT_POOL; i++) + log->slot_pool[i].slot_state = WT_LOG_SLOT_FREE; /* - * Swap out the slot we're going to use and put a free one in the - * slot array in its place so that threads can use it right away. + * Allocate memory for buffers now that the arrays are setup. Split + * this out to make error handling simpler. */ - WT_STAT_FAST_CONN_INCR(session, log_slot_closes); - newslot->slot_state = WT_LOG_SLOT_READY; - newslot->slot_index = slot->slot_index; - log->slot_array[newslot->slot_index] = newslot; - old_state = WT_ATOMIC_STORE8(slot->slot_state, WT_LOG_SLOT_PENDING); - slot->slot_group_size = (uint64_t)(old_state - WT_LOG_SLOT_READY); /* - * Note that this statistic may be much bigger than in reality, - * especially when compared with the total bytes written in - * __log_fill. The reason is that this size reflects any - * rounding up that is needed and the total bytes in __log_fill - * is the amount of user bytes. + * Cap the slot buffer to the log file size times two if needed. + * That means we try to fill to half the buffer but allow some + * extra space. + * + * !!! If the buffer size is too close to the log file size, we will + * switch log files very aggressively. Scale back the buffer for + * small log file sizes. */ + log->slot_buf_size = (uint32_t)WT_MIN( + (size_t)conn->log_file_max/10, WT_LOG_SLOT_BUF_SIZE); + for (i = 0; i < WT_SLOT_POOL; i++) { + WT_ERR(__wt_buf_init(session, + &log->slot_pool[i].slot_buf, log->slot_buf_size)); + F_SET(&log->slot_pool[i], WT_SLOT_INIT_FLAGS); + } WT_STAT_FAST_CONN_INCRV(session, - log_slot_consolidated, (uint64_t)slot->slot_group_size); - return (0); + log_buffer_size, log->slot_buf_size * WT_SLOT_POOL); + /* + * Set up the available slot from the pool the first time. + */ + slot = &log->slot_pool[0]; + /* + * We cannot initialize the release LSN in the activate function + * because that is called after a log file switch. + */ + slot->slot_release_lsn = log->alloc_lsn; + __wt_log_slot_activate(session, slot); + log->active_slot = slot; + + if (0) { +err: while (--i >= 0) + __wt_buf_free(session, &log->slot_pool[i].slot_buf); + } + return (ret); } /* - * __wt_log_slot_notify -- - * Notify all threads waiting for the state to be < WT_LOG_SLOT_DONE. + * __wt_log_slot_destroy -- + * Clean up the slot array on shutdown. */ int -__wt_log_slot_notify(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) +__wt_log_slot_destroy(WT_SESSION_IMPL *session) { - WT_UNUSED(session); + WT_CONNECTION_IMPL *conn; + WT_LOG *log; + WT_LOGSLOT *slot; + int64_t rel; + int i; - slot->slot_state = - (int64_t)WT_LOG_SLOT_DONE - (int64_t)slot->slot_group_size; + conn = S2C(session); + log = conn->log; + + /* + * Write out any remaining buffers. Free the buffer. + */ + for (i = 0; i < WT_SLOT_POOL; i++) { + slot = &log->slot_pool[i]; + if (!FLD64_ISSET( + (uint64_t)slot->slot_state, WT_LOG_SLOT_RESERVED)) { + rel = WT_LOG_SLOT_RELEASED_BUFFERED(slot->slot_state); + if (rel != 0) + WT_RET(__wt_write(session, slot->slot_fh, + slot->slot_start_offset, (size_t)rel, + slot->slot_buf.mem)); + } + __wt_buf_free(session, &log->slot_pool[i].slot_buf); + } return (0); } /* - * __wt_log_slot_wait -- - * Wait for slot leader to allocate log area and tell us our log offset. + * __wt_log_slot_join -- + * Join a consolidated logging slot. Must be called with + * the read lock held. */ -int -__wt_log_slot_wait(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) +void +__wt_log_slot_join(WT_SESSION_IMPL *session, uint64_t mysize, + uint32_t flags, WT_MYSLOT *myslot) { - int yield_count; + WT_CONNECTION_IMPL *conn; + WT_LOG *log; + WT_LOGSLOT *slot; + int64_t flag_state, new_state, old_state, released; + int32_t join_offset, new_join; +#ifdef HAVE_DIAGNOSTIC + int unbuf_force; +#endif - yield_count = 0; - WT_UNUSED(session); + conn = S2C(session); + log = conn->log; - while (slot->slot_state > WT_LOG_SLOT_DONE) - if (++yield_count < 1000) - __wt_yield(); - else - __wt_sleep(0, 200); - return (0); + /* + * Make sure the length cannot overflow. The caller should not + * even call this function if it doesn't fit but use direct + * writes. + */ + WT_ASSERT(session, !F_ISSET(session, WT_SESSION_LOCKED_SLOT)); + + /* + * There should almost always be a slot open. + */ +#ifdef HAVE_DIAGNOSTIC + unbuf_force = ((++log->write_calls % 1000) == 0); +#endif + for (;;) { + WT_BARRIER(); + slot = log->active_slot; + old_state = slot->slot_state; + /* + * Try to join our size into the existing size and + * atomically write it back into the state. + */ + flag_state = WT_LOG_SLOT_FLAGS(old_state); + released = WT_LOG_SLOT_RELEASED(old_state); + join_offset = WT_LOG_SLOT_JOINED(old_state); +#ifdef HAVE_DIAGNOSTIC + if (unbuf_force || mysize > WT_LOG_SLOT_BUF_MAX) { +#else + if (mysize > WT_LOG_SLOT_BUF_MAX) { +#endif + new_join = join_offset + WT_LOG_SLOT_UNBUFFERED; + F_SET(myslot, WT_MYSLOT_UNBUFFERED); + myslot->slot = slot; + } else + new_join = join_offset + (int32_t)mysize; + new_state = (int64_t)WT_LOG_SLOT_JOIN_REL( + (int64_t)new_join, (int64_t)released, (int64_t)flag_state); + + /* + * Check if the slot is open for joining and we are able to + * swap in our size into the state. + */ + if (WT_LOG_SLOT_OPEN(old_state) && + __wt_atomic_casiv64( + &slot->slot_state, old_state, new_state)) + break; + /* + * The slot is no longer open or we lost the race to + * update it. Yield and try again. + */ + WT_STAT_FAST_CONN_INCR(session, log_slot_races); + __wt_yield(); + } + /* + * We joined this slot. Fill in our information to return to + * the caller. + */ + if (mysize != 0) + WT_STAT_FAST_CONN_INCR(session, log_slot_joins); + if (LF_ISSET(WT_LOG_DSYNC | WT_LOG_FSYNC)) + F_SET(slot, WT_SLOT_SYNC_DIR); + if (LF_ISSET(WT_LOG_FSYNC)) + F_SET(slot, WT_SLOT_SYNC); + if (F_ISSET(myslot, WT_MYSLOT_UNBUFFERED)) { + WT_ASSERT(session, slot->slot_unbuffered == 0); + WT_STAT_FAST_CONN_INCR(session, log_slot_unbuffered); + slot->slot_unbuffered = (int64_t)mysize; + } + myslot->slot = slot; + myslot->offset = join_offset; + myslot->end_offset = (wt_off_t)((uint64_t)join_offset + mysize); } /* * __wt_log_slot_release -- * Each thread in a consolidated group releases its portion to - * signal it has completed writing its piece of the log. + * signal it has completed copying its piece of the log into + * the memory buffer. */ int64_t -__wt_log_slot_release(WT_LOGSLOT *slot, uint64_t size) +__wt_log_slot_release(WT_SESSION_IMPL *session, WT_MYSLOT *myslot, int64_t size) { - int64_t newsize; + WT_LOGSLOT *slot; + wt_off_t cur_offset, my_start; + int64_t my_size, rel_size; + WT_UNUSED(session); + slot = myslot->slot; + my_start = slot->slot_start_offset + myslot->offset; + while ((cur_offset = slot->slot_last_offset) < my_start) { + /* + * Set our offset if we are larger. + */ + if (__wt_atomic_casiv64( + &slot->slot_last_offset, cur_offset, my_start)) + break; + /* + * If we raced another thread updating this, try again. + */ + WT_BARRIER(); + } /* - * Add my size into the state. When it reaches WT_LOG_SLOT_DONE - * all participatory threads have completed copying their piece. + * Add my size into the state and return the new size. */ - newsize = WT_ATOMIC_ADD8(slot->slot_state, (int64_t)size); - return (newsize); + rel_size = size; + if (F_ISSET(myslot, WT_MYSLOT_UNBUFFERED)) + rel_size = WT_LOG_SLOT_UNBUFFERED; + my_size = (int64_t)WT_LOG_SLOT_JOIN_REL((int64_t)0, rel_size, 0); + return (__wt_atomic_addiv64(&slot->slot_state, my_size)); } /* * __wt_log_slot_free -- * Free a slot back into the pool. */ -int +void __wt_log_slot_free(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) { - WT_UNUSED(session); /* * Make sure flags don't get retained between uses. * We have to reset them them here because multiple threads may * change the flags when joining the slot. */ + WT_UNUSED(session); slot->flags = WT_SLOT_INIT_FLAGS; + slot->slot_error = 0; slot->slot_state = WT_LOG_SLOT_FREE; - return (0); } |