diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/lsm/lsm_manager.c')
-rw-r--r-- | src/third_party/wiredtiger/src/lsm/lsm_manager.c | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/src/lsm/lsm_manager.c b/src/third_party/wiredtiger/src/lsm/lsm_manager.c new file mode 100644 index 00000000000..8f4b3ba49ef --- /dev/null +++ b/src/third_party/wiredtiger/src/lsm/lsm_manager.c @@ -0,0 +1,667 @@ +/*- + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +static int __lsm_manager_aggressive_update(WT_SESSION_IMPL *, WT_LSM_TREE *); +static int __lsm_manager_run_server(WT_SESSION_IMPL *); +static int __lsm_manager_worker_setup(WT_SESSION_IMPL *); + +static void * __lsm_worker_manager(void *); + +/* + * __wt_lsm_manager_config -- + * Configure the LSM manager. + */ +int +__wt_lsm_manager_config(WT_SESSION_IMPL *session, const char **cfg) +{ + WT_CONNECTION_IMPL *conn; + WT_CONFIG_ITEM cval; + + conn = S2C(session); + + WT_RET(__wt_config_gets(session, cfg, "lsm_manager.merge", &cval)); + if (cval.val) + F_SET(conn, WT_CONN_LSM_MERGE); + WT_RET(__wt_config_gets( + session, cfg, "lsm_manager.worker_thread_max", &cval)); + if (cval.val) + conn->lsm_manager.lsm_workers_max = (uint32_t)cval.val; + return (0); +} + +/* + * __lsm_general_worker_start -- + * Start up all of the general LSM worker threads. + */ +static int +__lsm_general_worker_start(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_LSM_MANAGER *manager; + WT_LSM_WORKER_ARGS *worker_args; + + conn = S2C(session); + manager = &conn->lsm_manager; + + /* + * Start the remaining worker threads. + * This should get more sophisticated in the future - only launching + * as many worker threads as are required to keep up with demand. + */ + WT_ASSERT(session, manager->lsm_workers > 1); + for (; manager->lsm_workers < manager->lsm_workers_max; + manager->lsm_workers++) { + worker_args = + &manager->lsm_worker_cookies[manager->lsm_workers]; + worker_args->work_cond = manager->work_cond; + worker_args->id = manager->lsm_workers; + worker_args->type = + WT_LSM_WORK_BLOOM | + WT_LSM_WORK_DROP | + WT_LSM_WORK_FLUSH | + WT_LSM_WORK_SWITCH; + F_SET(worker_args, WT_LSM_WORKER_RUN); + /* + * Only allow half of the threads to run merges to avoid all + * all workers getting stuck in long-running merge operations. + * Make sure the first worker is allowed, so that there is at + * least one thread capable of running merges. We know the + * first worker is id 2, so set merges on even numbered workers. + */ + if (manager->lsm_workers % 2 == 0) + FLD_SET(worker_args->type, WT_LSM_WORK_MERGE); + WT_RET(__wt_lsm_worker_start(session, worker_args)); + } + return (0); +} + +/* + * __lsm_stop_workers -- + * Stop worker threads until the number reaches the configured amount. + */ +static int +__lsm_stop_workers(WT_SESSION_IMPL *session) +{ + WT_LSM_MANAGER *manager; + WT_LSM_WORKER_ARGS *worker_args; + uint32_t i; + + manager = &S2C(session)->lsm_manager; + /* + * Start at the end of the list of threads and stop them until we + * have the desired number. We want to keep all active threads + * packed at the front of the worker array. + */ + WT_ASSERT(session, manager->lsm_workers != 0); + for (i = manager->lsm_workers - 1; i >= manager->lsm_workers_max; i--) { + worker_args = &manager->lsm_worker_cookies[i]; + /* + * Clear this worker's flag so it stops. + */ + F_CLR(worker_args, WT_LSM_WORKER_RUN); + WT_ASSERT(session, worker_args->tid != 0); + WT_RET(__wt_thread_join(session, worker_args->tid)); + worker_args->tid = 0; + worker_args->type = 0; + worker_args->flags = 0; + manager->lsm_workers--; + /* + * We do not clear the session because they are allocated + * statically when the connection was opened. + */ + } + return (0); +} + +/* + * __wt_lsm_manager_reconfig -- + * Re-configure the LSM manager. + */ +int +__wt_lsm_manager_reconfig(WT_SESSION_IMPL *session, const char **cfg) +{ + WT_LSM_MANAGER *manager; + uint32_t orig_workers; + + manager = &S2C(session)->lsm_manager; + orig_workers = manager->lsm_workers_max; + + WT_RET(__wt_lsm_manager_config(session, cfg)); + /* + * If LSM hasn't started yet, we simply reconfigured the settings + * and we'll let the normal code path start the threads. + */ + if (manager->lsm_workers_max == 0) + return (0); + if (manager->lsm_workers == 0) + return (0); + /* + * If the number of workers has not changed, we're done. + */ + if (orig_workers == manager->lsm_workers_max) + return (0); + /* + * If we want more threads, start them. + */ + if (manager->lsm_workers_max > orig_workers) + return (__lsm_general_worker_start(session)); + + /* + * Otherwise we want to reduce the number of workers. + */ + WT_ASSERT(session, manager->lsm_workers_max < orig_workers); + WT_RET(__lsm_stop_workers(session)); + return (0); +} + +/* + * __wt_lsm_manager_start -- + * Start the LSM management infrastructure. Our queues and locks were + * initialized when the connection was initialized. + */ +int +__wt_lsm_manager_start(WT_SESSION_IMPL *session) +{ + WT_DECL_RET; + WT_LSM_MANAGER *manager; + WT_SESSION_IMPL *worker_session; + uint32_t i; + + manager = &S2C(session)->lsm_manager; + + /* + * We need at least a manager, a switch thread and a generic + * worker. + */ + WT_ASSERT(session, manager->lsm_workers_max > 2); + + /* + * Open sessions for all potential worker threads here - it's not + * safe to have worker threads open/close sessions themselves. + * All the LSM worker threads do their operations on read-only + * files. Use read-uncommitted isolation to avoid keeping + * updates in cache unnecessarily. + */ + for (i = 0; i < WT_LSM_MAX_WORKERS; i++) { + WT_ERR(__wt_open_internal_session( + S2C(session), "lsm-worker", 1, 0, &worker_session)); + worker_session->isolation = TXN_ISO_READ_UNCOMMITTED; + manager->lsm_worker_cookies[i].session = worker_session; + } + + /* Start the LSM manager thread. */ + WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid, + __lsm_worker_manager, &manager->lsm_worker_cookies[0])); + + F_SET(S2C(session), WT_CONN_SERVER_LSM); + + if (0) { +err: for (i = 0; + (worker_session = + manager->lsm_worker_cookies[i].session) != NULL; + i++) + WT_TRET((&worker_session->iface)->close( + &worker_session->iface, NULL)); + } + return (ret); +} + +/* + * __wt_lsm_manager_free_work_unit -- + * Release an LSM tree work unit. + */ +void +__wt_lsm_manager_free_work_unit( + WT_SESSION_IMPL *session, WT_LSM_WORK_UNIT *entry) +{ + if (entry != NULL) { + WT_ASSERT(session, entry->lsm_tree->queue_ref > 0); + + (void)WT_ATOMIC_SUB4(entry->lsm_tree->queue_ref, 1); + __wt_free(session, entry); + } +} + +/* + * __wt_lsm_manager_destroy -- + * Destroy the LSM manager threads and subsystem. + */ +int +__wt_lsm_manager_destroy(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_LSM_MANAGER *manager; + WT_LSM_WORK_UNIT *current, *next; + WT_SESSION *wt_session; + uint32_t i; + uint64_t removed; + + conn = S2C(session); + manager = &conn->lsm_manager; + removed = 0; + + if (manager->lsm_workers > 0) { + /* + * Stop the main LSM manager thread first. + */ + while (F_ISSET(conn, WT_CONN_SERVER_LSM)) + __wt_yield(); + + /* Clean up open LSM handles. */ + ret = __wt_lsm_tree_close_all(session); + + WT_TRET(__wt_thread_join( + session, manager->lsm_worker_cookies[0].tid)); + manager->lsm_worker_cookies[0].tid = 0; + + /* Release memory from any operations left on the queue. */ + for (current = TAILQ_FIRST(&manager->switchqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + TAILQ_REMOVE(&manager->switchqh, current, q); + ++removed; + __wt_lsm_manager_free_work_unit(session, current); + } + for (current = TAILQ_FIRST(&manager->appqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + TAILQ_REMOVE(&manager->appqh, current, q); + ++removed; + __wt_lsm_manager_free_work_unit(session, current); + } + for (current = TAILQ_FIRST(&manager->managerqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + TAILQ_REMOVE(&manager->managerqh, current, q); + ++removed; + __wt_lsm_manager_free_work_unit(session, current); + } + + /* Close all LSM worker sessions. */ + for (i = 0; i < WT_LSM_MAX_WORKERS; i++) { + wt_session = + &manager->lsm_worker_cookies[i].session->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + } + } + WT_STAT_FAST_CONN_INCRV(session, + lsm_work_units_discarded, removed); + + /* Free resources that are allocated in connection initialize */ + __wt_spin_destroy(session, &manager->switch_lock); + __wt_spin_destroy(session, &manager->app_lock); + __wt_spin_destroy(session, &manager->manager_lock); + WT_TRET(__wt_cond_destroy(session, &manager->work_cond)); + + return (ret); +} + +/* + * __lsm_manager_aggressive_update -- + * Update the merge aggressiveness for a single LSM tree. + */ +static int +__lsm_manager_aggressive_update(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) +{ + struct timespec now; + uint64_t chunk_wait, stallms; + u_int new_aggressive; + + WT_RET(__wt_epoch(session, &now)); + stallms = WT_TIMEDIFF(now, lsm_tree->last_flush_ts) / WT_MILLION; + /* + * Get aggressive if more than enough chunks for a merge should have + * been created by now. Use 10 seconds as a default if we don't have an + * estimate. + */ + if (lsm_tree->nchunks > 1) + chunk_wait = stallms / (lsm_tree->chunk_fill_ms == 0 ? + 10000 : lsm_tree->chunk_fill_ms); + else + chunk_wait = 0; + new_aggressive = (u_int)(chunk_wait / lsm_tree->merge_min); + + if (new_aggressive > lsm_tree->merge_aggressiveness) { + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "LSM merge %s got aggressive (old %u new %u), " + "merge_min %d, %u / %" PRIu64, + lsm_tree->name, lsm_tree->merge_aggressiveness, + new_aggressive, lsm_tree->merge_min, stallms, + lsm_tree->chunk_fill_ms)); + lsm_tree->merge_aggressiveness = new_aggressive; + } + return (0); +} + +/* + * __lsm_manager_worker_setup -- + * Do setup owned by the LSM manager thread including starting the worker + * threads. + */ +static int +__lsm_manager_worker_setup(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_LSM_MANAGER *manager; + WT_LSM_WORKER_ARGS *worker_args; + + conn = S2C(session); + manager = &conn->lsm_manager; + + WT_ASSERT(session, manager->lsm_workers == 1); + /* + * The LSM manager is worker[0]. The switch thread is worker[1]. + * Setup and start the switch/drop worker explicitly. + */ + worker_args = &manager->lsm_worker_cookies[1]; + worker_args->work_cond = manager->work_cond; + worker_args->id = manager->lsm_workers++; + worker_args->type = WT_LSM_WORK_DROP | WT_LSM_WORK_SWITCH; + F_SET(worker_args, WT_LSM_WORKER_RUN); + /* Start the switch thread. */ + WT_RET(__wt_lsm_worker_start(session, worker_args)); + WT_RET(__lsm_general_worker_start(session)); + + return (0); +} + +/* + * __lsm_manager_worker_shutdown -- + * Shutdown the LSM manager and worker threads. + */ +static int +__lsm_manager_worker_shutdown(WT_SESSION_IMPL *session) +{ + WT_DECL_RET; + WT_LSM_MANAGER *manager; + u_int i; + + manager = &S2C(session)->lsm_manager; + + /* + * Wait for the rest of the LSM workers to shutdown. Stop at index + * one - since we (the manager) are at index 0. + */ + for (i = 1; i < manager->lsm_workers; i++) { + WT_ASSERT(session, manager->lsm_worker_cookies[i].tid != 0); + WT_TRET(__wt_cond_signal(session, manager->work_cond)); + WT_TRET(__wt_thread_join( + session, manager->lsm_worker_cookies[i].tid)); + } + return (ret); +} + +/* + * __lsm_manager_run_server -- + * Run manager thread operations. + */ +static int +__lsm_manager_run_server(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_LSM_TREE *lsm_tree; + struct timespec now; + uint64_t fillms, pushms; + + conn = S2C(session); + while (F_ISSET(conn, WT_CONN_SERVER_RUN)) { + if (TAILQ_EMPTY(&conn->lsmqh)) { + __wt_sleep(0, 10000); + continue; + } + __wt_sleep(0, 10000); + TAILQ_FOREACH(lsm_tree, &S2C(session)->lsmqh, q) { + if (!F_ISSET(lsm_tree, WT_LSM_TREE_ACTIVE)) + continue; + WT_RET(__lsm_manager_aggressive_update( + session, lsm_tree)); + WT_RET(__wt_epoch(session, &now)); + pushms = lsm_tree->work_push_ts.tv_sec == 0 ? 0 : + WT_TIMEDIFF( + now, lsm_tree->work_push_ts) / WT_MILLION; + fillms = 3 * lsm_tree->chunk_fill_ms; + if (fillms == 0) + fillms = 10000; + /* + * If the tree appears to not be triggering enough + * LSM maintenance, help it out. Additional work units + * don't hurt, and can be necessary if some work + * units aren't completed for some reason. + * If the tree hasn't been modified, and there are + * more than 1 chunks - try to get the tree smaller + * so queries run faster. + * If we are getting aggressive - ensure there are + * enough work units that we can get chunks merged. + * If we aren't pushing enough work units, compared + * to how often new chunks are being created add some + * more. + */ + if (lsm_tree->queue_ref >= LSM_TREE_MAX_QUEUE) + WT_STAT_FAST_CONN_INCR(session, + lsm_work_queue_max); + else if ((!lsm_tree->modified && + lsm_tree->nchunks > 1) || + (lsm_tree->queue_ref == 0 && + lsm_tree->nchunks > 1) || + (lsm_tree->merge_aggressiveness > 3 && + !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) || + pushms > fillms) { + WT_RET(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_SWITCH, 0, lsm_tree)); + WT_RET(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_DROP, 0, lsm_tree)); + WT_RET(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_FLUSH, 0, lsm_tree)); + WT_RET(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_BLOOM, 0, lsm_tree)); + WT_RET(__wt_verbose(session, WT_VERB_LSM, + "MGR %s: queue %d mod %d nchunks %d" + " flags 0x%x aggressive %d pushms %" PRIu64 + " fillms %" PRIu64, + lsm_tree->name, lsm_tree->queue_ref, + lsm_tree->modified, lsm_tree->nchunks, + lsm_tree->flags, + lsm_tree->merge_aggressiveness, + pushms, fillms)); + WT_RET(__wt_lsm_manager_push_entry( + session, WT_LSM_WORK_MERGE, 0, lsm_tree)); + } + } + } + + return (0); +} + +/* + * __lsm_worker_manager -- + * A thread that manages all open LSM trees, and the shared LSM worker + * threads. + */ +static void * +__lsm_worker_manager(void *arg) +{ + WT_DECL_RET; + WT_LSM_WORKER_ARGS *cookie; + WT_SESSION_IMPL *session; + + cookie = (WT_LSM_WORKER_ARGS *)arg; + session = cookie->session; + + WT_ERR(__lsm_manager_worker_setup(session)); + WT_ERR(__lsm_manager_run_server(session)); + WT_ERR(__lsm_manager_worker_shutdown(session)); + + if (ret != 0) { +err: __wt_err(session, ret, "LSM worker manager thread error"); + } + F_CLR(S2C(session), WT_CONN_SERVER_LSM); + return (NULL); +} + +/* + * __wt_lsm_manager_clear_tree -- + * Remove all entries for a tree from the LSM manager queues. This + * introduces an inefficiency if LSM trees are being opened and closed + * regularly. + */ +int +__wt_lsm_manager_clear_tree( + WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) +{ + WT_LSM_MANAGER *manager; + WT_LSM_WORK_UNIT *current, *next; + uint64_t removed; + + manager = &S2C(session)->lsm_manager; + removed = 0; + + /* Clear out the tree from the switch queue */ + __wt_spin_lock(session, &manager->switch_lock); + + /* Structure the loop so that it's safe to free as we iterate */ + for (current = TAILQ_FIRST(&manager->switchqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + if (current->lsm_tree != lsm_tree) + continue; + ++removed; + TAILQ_REMOVE(&manager->switchqh, current, q); + __wt_lsm_manager_free_work_unit(session, current); + } + __wt_spin_unlock(session, &manager->switch_lock); + /* Clear out the tree from the application queue */ + __wt_spin_lock(session, &manager->app_lock); + for (current = TAILQ_FIRST(&manager->appqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + if (current->lsm_tree != lsm_tree) + continue; + ++removed; + TAILQ_REMOVE(&manager->appqh, current, q); + __wt_lsm_manager_free_work_unit(session, current); + } + __wt_spin_unlock(session, &manager->app_lock); + /* Clear out the tree from the manager queue */ + __wt_spin_lock(session, &manager->manager_lock); + for (current = TAILQ_FIRST(&manager->managerqh); + current != NULL; current = next) { + next = TAILQ_NEXT(current, q); + if (current->lsm_tree != lsm_tree) + continue; + ++removed; + TAILQ_REMOVE(&manager->managerqh, current, q); + __wt_lsm_manager_free_work_unit(session, current); + } + __wt_spin_unlock(session, &manager->manager_lock); + WT_STAT_FAST_CONN_INCRV(session, lsm_work_units_discarded, removed); + return (0); +} + +/* + * We assume this is only called from __wt_lsm_manager_pop_entry and we + * have session, entry and type available to use. If the queue is empty + * we may return from the macro. + */ +#define LSM_POP_ENTRY(qh, qlock, qlen) do { \ + if (TAILQ_EMPTY(qh)) \ + return (0); \ + __wt_spin_lock(session, qlock); \ + TAILQ_FOREACH(entry, (qh), q) { \ + if (FLD_ISSET(type, entry->type)) { \ + TAILQ_REMOVE(qh, entry, q); \ + WT_STAT_FAST_CONN_DECR(session, qlen); \ + break; \ + } \ + } \ + __wt_spin_unlock(session, (qlock)); \ +} while (0) + +/* + * __wt_lsm_manager_pop_entry -- + * Retrieve the head of the queue, if it matches the requested work + * unit type. + */ +int +__wt_lsm_manager_pop_entry( + WT_SESSION_IMPL *session, uint32_t type, WT_LSM_WORK_UNIT **entryp) +{ + WT_LSM_MANAGER *manager; + WT_LSM_WORK_UNIT *entry; + + manager = &S2C(session)->lsm_manager; + *entryp = NULL; + entry = NULL; + + /* + * Pop the entry off the correct queue based on our work type. + */ + if (type == WT_LSM_WORK_SWITCH) + LSM_POP_ENTRY(&manager->switchqh, + &manager->switch_lock, lsm_work_queue_switch); + else if (type == WT_LSM_WORK_MERGE) + LSM_POP_ENTRY(&manager->managerqh, + &manager->manager_lock, lsm_work_queue_manager); + else + LSM_POP_ENTRY(&manager->appqh, + &manager->app_lock, lsm_work_queue_app); + if (entry != NULL) + WT_STAT_FAST_CONN_INCR(session, lsm_work_units_done); + *entryp = entry; + return (0); +} + +/* + * Push a work unit onto the appropriate queue. This macro assumes we are + * called from __wt_lsm_manager_push_entry and we have session and entry + * available for use. + */ +#define LSM_PUSH_ENTRY(qh, qlock, qlen) do { \ + __wt_spin_lock(session, qlock); \ + TAILQ_INSERT_TAIL((qh), entry, q); \ + WT_STAT_FAST_CONN_INCR(session, qlen); \ + __wt_spin_unlock(session, qlock); \ +} while (0) + +/* + * __wt_lsm_manager_push_entry -- + * Add an entry to the end of the switch queue. + */ +int +__wt_lsm_manager_push_entry(WT_SESSION_IMPL *session, + uint32_t type, uint32_t flags, WT_LSM_TREE *lsm_tree) +{ + WT_LSM_MANAGER *manager; + WT_LSM_WORK_UNIT *entry; + + manager = &S2C(session)->lsm_manager; + + WT_RET(__wt_epoch(session, &lsm_tree->work_push_ts)); + + WT_RET(__wt_calloc_def(session, 1, &entry)); + entry->type = type; + entry->flags = flags; + entry->lsm_tree = lsm_tree; + (void)WT_ATOMIC_ADD4(lsm_tree->queue_ref, 1); + WT_STAT_FAST_CONN_INCR(session, lsm_work_units_created); + + if (type == WT_LSM_WORK_SWITCH) + LSM_PUSH_ENTRY(&manager->switchqh, + &manager->switch_lock, lsm_work_queue_switch); + else if (type == WT_LSM_WORK_MERGE) + LSM_PUSH_ENTRY(&manager->managerqh, + &manager->manager_lock, lsm_work_queue_manager); + else + LSM_PUSH_ENTRY(&manager->appqh, + &manager->app_lock, lsm_work_queue_app); + + WT_RET(__wt_cond_signal(session, manager->work_cond)); + + return (0); +} |