summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/lsm/lsm_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/lsm/lsm_manager.c')
-rw-r--r--src/third_party/wiredtiger/src/lsm/lsm_manager.c667
1 files changed, 667 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/src/lsm/lsm_manager.c b/src/third_party/wiredtiger/src/lsm/lsm_manager.c
new file mode 100644
index 00000000000..8f4b3ba49ef
--- /dev/null
+++ b/src/third_party/wiredtiger/src/lsm/lsm_manager.c
@@ -0,0 +1,667 @@
+/*-
+ * Copyright (c) 2008-2014 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+#include "wt_internal.h"
+
+static int __lsm_manager_aggressive_update(WT_SESSION_IMPL *, WT_LSM_TREE *);
+static int __lsm_manager_run_server(WT_SESSION_IMPL *);
+static int __lsm_manager_worker_setup(WT_SESSION_IMPL *);
+
+static void * __lsm_worker_manager(void *);
+
+/*
+ * __wt_lsm_manager_config --
+ * Configure the LSM manager.
+ */
+int
+__wt_lsm_manager_config(WT_SESSION_IMPL *session, const char **cfg)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_CONFIG_ITEM cval;
+
+ conn = S2C(session);
+
+ WT_RET(__wt_config_gets(session, cfg, "lsm_manager.merge", &cval));
+ if (cval.val)
+ F_SET(conn, WT_CONN_LSM_MERGE);
+ WT_RET(__wt_config_gets(
+ session, cfg, "lsm_manager.worker_thread_max", &cval));
+ if (cval.val)
+ conn->lsm_manager.lsm_workers_max = (uint32_t)cval.val;
+ return (0);
+}
+
+/*
+ * __lsm_general_worker_start --
+ * Start up all of the general LSM worker threads.
+ */
+static int
+__lsm_general_worker_start(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORKER_ARGS *worker_args;
+
+ conn = S2C(session);
+ manager = &conn->lsm_manager;
+
+ /*
+ * Start the remaining worker threads.
+ * This should get more sophisticated in the future - only launching
+ * as many worker threads as are required to keep up with demand.
+ */
+ WT_ASSERT(session, manager->lsm_workers > 1);
+ for (; manager->lsm_workers < manager->lsm_workers_max;
+ manager->lsm_workers++) {
+ worker_args =
+ &manager->lsm_worker_cookies[manager->lsm_workers];
+ worker_args->work_cond = manager->work_cond;
+ worker_args->id = manager->lsm_workers;
+ worker_args->type =
+ WT_LSM_WORK_BLOOM |
+ WT_LSM_WORK_DROP |
+ WT_LSM_WORK_FLUSH |
+ WT_LSM_WORK_SWITCH;
+ F_SET(worker_args, WT_LSM_WORKER_RUN);
+ /*
+ * Only allow half of the threads to run merges to avoid all
+ * all workers getting stuck in long-running merge operations.
+ * Make sure the first worker is allowed, so that there is at
+ * least one thread capable of running merges. We know the
+ * first worker is id 2, so set merges on even numbered workers.
+ */
+ if (manager->lsm_workers % 2 == 0)
+ FLD_SET(worker_args->type, WT_LSM_WORK_MERGE);
+ WT_RET(__wt_lsm_worker_start(session, worker_args));
+ }
+ return (0);
+}
+
+/*
+ * __lsm_stop_workers --
+ * Stop worker threads until the number reaches the configured amount.
+ */
+static int
+__lsm_stop_workers(WT_SESSION_IMPL *session)
+{
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORKER_ARGS *worker_args;
+ uint32_t i;
+
+ manager = &S2C(session)->lsm_manager;
+ /*
+ * Start at the end of the list of threads and stop them until we
+ * have the desired number. We want to keep all active threads
+ * packed at the front of the worker array.
+ */
+ WT_ASSERT(session, manager->lsm_workers != 0);
+ for (i = manager->lsm_workers - 1; i >= manager->lsm_workers_max; i--) {
+ worker_args = &manager->lsm_worker_cookies[i];
+ /*
+ * Clear this worker's flag so it stops.
+ */
+ F_CLR(worker_args, WT_LSM_WORKER_RUN);
+ WT_ASSERT(session, worker_args->tid != 0);
+ WT_RET(__wt_thread_join(session, worker_args->tid));
+ worker_args->tid = 0;
+ worker_args->type = 0;
+ worker_args->flags = 0;
+ manager->lsm_workers--;
+ /*
+ * We do not clear the session because they are allocated
+ * statically when the connection was opened.
+ */
+ }
+ return (0);
+}
+
+/*
+ * __wt_lsm_manager_reconfig --
+ * Re-configure the LSM manager.
+ */
+int
+__wt_lsm_manager_reconfig(WT_SESSION_IMPL *session, const char **cfg)
+{
+ WT_LSM_MANAGER *manager;
+ uint32_t orig_workers;
+
+ manager = &S2C(session)->lsm_manager;
+ orig_workers = manager->lsm_workers_max;
+
+ WT_RET(__wt_lsm_manager_config(session, cfg));
+ /*
+ * If LSM hasn't started yet, we simply reconfigured the settings
+ * and we'll let the normal code path start the threads.
+ */
+ if (manager->lsm_workers_max == 0)
+ return (0);
+ if (manager->lsm_workers == 0)
+ return (0);
+ /*
+ * If the number of workers has not changed, we're done.
+ */
+ if (orig_workers == manager->lsm_workers_max)
+ return (0);
+ /*
+ * If we want more threads, start them.
+ */
+ if (manager->lsm_workers_max > orig_workers)
+ return (__lsm_general_worker_start(session));
+
+ /*
+ * Otherwise we want to reduce the number of workers.
+ */
+ WT_ASSERT(session, manager->lsm_workers_max < orig_workers);
+ WT_RET(__lsm_stop_workers(session));
+ return (0);
+}
+
+/*
+ * __wt_lsm_manager_start --
+ * Start the LSM management infrastructure. Our queues and locks were
+ * initialized when the connection was initialized.
+ */
+int
+__wt_lsm_manager_start(WT_SESSION_IMPL *session)
+{
+ WT_DECL_RET;
+ WT_LSM_MANAGER *manager;
+ WT_SESSION_IMPL *worker_session;
+ uint32_t i;
+
+ manager = &S2C(session)->lsm_manager;
+
+ /*
+ * We need at least a manager, a switch thread and a generic
+ * worker.
+ */
+ WT_ASSERT(session, manager->lsm_workers_max > 2);
+
+ /*
+ * Open sessions for all potential worker threads here - it's not
+ * safe to have worker threads open/close sessions themselves.
+ * All the LSM worker threads do their operations on read-only
+ * files. Use read-uncommitted isolation to avoid keeping
+ * updates in cache unnecessarily.
+ */
+ for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
+ WT_ERR(__wt_open_internal_session(
+ S2C(session), "lsm-worker", 1, 0, &worker_session));
+ worker_session->isolation = TXN_ISO_READ_UNCOMMITTED;
+ manager->lsm_worker_cookies[i].session = worker_session;
+ }
+
+ /* Start the LSM manager thread. */
+ WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid,
+ __lsm_worker_manager, &manager->lsm_worker_cookies[0]));
+
+ F_SET(S2C(session), WT_CONN_SERVER_LSM);
+
+ if (0) {
+err: for (i = 0;
+ (worker_session =
+ manager->lsm_worker_cookies[i].session) != NULL;
+ i++)
+ WT_TRET((&worker_session->iface)->close(
+ &worker_session->iface, NULL));
+ }
+ return (ret);
+}
+
+/*
+ * __wt_lsm_manager_free_work_unit --
+ * Release an LSM tree work unit.
+ */
+void
+__wt_lsm_manager_free_work_unit(
+ WT_SESSION_IMPL *session, WT_LSM_WORK_UNIT *entry)
+{
+ if (entry != NULL) {
+ WT_ASSERT(session, entry->lsm_tree->queue_ref > 0);
+
+ (void)WT_ATOMIC_SUB4(entry->lsm_tree->queue_ref, 1);
+ __wt_free(session, entry);
+ }
+}
+
+/*
+ * __wt_lsm_manager_destroy --
+ * Destroy the LSM manager threads and subsystem.
+ */
+int
+__wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORK_UNIT *current, *next;
+ WT_SESSION *wt_session;
+ uint32_t i;
+ uint64_t removed;
+
+ conn = S2C(session);
+ manager = &conn->lsm_manager;
+ removed = 0;
+
+ if (manager->lsm_workers > 0) {
+ /*
+ * Stop the main LSM manager thread first.
+ */
+ while (F_ISSET(conn, WT_CONN_SERVER_LSM))
+ __wt_yield();
+
+ /* Clean up open LSM handles. */
+ ret = __wt_lsm_tree_close_all(session);
+
+ WT_TRET(__wt_thread_join(
+ session, manager->lsm_worker_cookies[0].tid));
+ manager->lsm_worker_cookies[0].tid = 0;
+
+ /* Release memory from any operations left on the queue. */
+ for (current = TAILQ_FIRST(&manager->switchqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ TAILQ_REMOVE(&manager->switchqh, current, q);
+ ++removed;
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+ for (current = TAILQ_FIRST(&manager->appqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ TAILQ_REMOVE(&manager->appqh, current, q);
+ ++removed;
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+ for (current = TAILQ_FIRST(&manager->managerqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ TAILQ_REMOVE(&manager->managerqh, current, q);
+ ++removed;
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+
+ /* Close all LSM worker sessions. */
+ for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
+ wt_session =
+ &manager->lsm_worker_cookies[i].session->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ }
+ }
+ WT_STAT_FAST_CONN_INCRV(session,
+ lsm_work_units_discarded, removed);
+
+ /* Free resources that are allocated in connection initialize */
+ __wt_spin_destroy(session, &manager->switch_lock);
+ __wt_spin_destroy(session, &manager->app_lock);
+ __wt_spin_destroy(session, &manager->manager_lock);
+ WT_TRET(__wt_cond_destroy(session, &manager->work_cond));
+
+ return (ret);
+}
+
+/*
+ * __lsm_manager_aggressive_update --
+ * Update the merge aggressiveness for a single LSM tree.
+ */
+static int
+__lsm_manager_aggressive_update(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
+{
+ struct timespec now;
+ uint64_t chunk_wait, stallms;
+ u_int new_aggressive;
+
+ WT_RET(__wt_epoch(session, &now));
+ stallms = WT_TIMEDIFF(now, lsm_tree->last_flush_ts) / WT_MILLION;
+ /*
+ * Get aggressive if more than enough chunks for a merge should have
+ * been created by now. Use 10 seconds as a default if we don't have an
+ * estimate.
+ */
+ if (lsm_tree->nchunks > 1)
+ chunk_wait = stallms / (lsm_tree->chunk_fill_ms == 0 ?
+ 10000 : lsm_tree->chunk_fill_ms);
+ else
+ chunk_wait = 0;
+ new_aggressive = (u_int)(chunk_wait / lsm_tree->merge_min);
+
+ if (new_aggressive > lsm_tree->merge_aggressiveness) {
+ WT_RET(__wt_verbose(session, WT_VERB_LSM,
+ "LSM merge %s got aggressive (old %u new %u), "
+ "merge_min %d, %u / %" PRIu64,
+ lsm_tree->name, lsm_tree->merge_aggressiveness,
+ new_aggressive, lsm_tree->merge_min, stallms,
+ lsm_tree->chunk_fill_ms));
+ lsm_tree->merge_aggressiveness = new_aggressive;
+ }
+ return (0);
+}
+
+/*
+ * __lsm_manager_worker_setup --
+ * Do setup owned by the LSM manager thread including starting the worker
+ * threads.
+ */
+static int
+__lsm_manager_worker_setup(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORKER_ARGS *worker_args;
+
+ conn = S2C(session);
+ manager = &conn->lsm_manager;
+
+ WT_ASSERT(session, manager->lsm_workers == 1);
+ /*
+ * The LSM manager is worker[0]. The switch thread is worker[1].
+ * Setup and start the switch/drop worker explicitly.
+ */
+ worker_args = &manager->lsm_worker_cookies[1];
+ worker_args->work_cond = manager->work_cond;
+ worker_args->id = manager->lsm_workers++;
+ worker_args->type = WT_LSM_WORK_DROP | WT_LSM_WORK_SWITCH;
+ F_SET(worker_args, WT_LSM_WORKER_RUN);
+ /* Start the switch thread. */
+ WT_RET(__wt_lsm_worker_start(session, worker_args));
+ WT_RET(__lsm_general_worker_start(session));
+
+ return (0);
+}
+
+/*
+ * __lsm_manager_worker_shutdown --
+ * Shutdown the LSM manager and worker threads.
+ */
+static int
+__lsm_manager_worker_shutdown(WT_SESSION_IMPL *session)
+{
+ WT_DECL_RET;
+ WT_LSM_MANAGER *manager;
+ u_int i;
+
+ manager = &S2C(session)->lsm_manager;
+
+ /*
+ * Wait for the rest of the LSM workers to shutdown. Stop at index
+ * one - since we (the manager) are at index 0.
+ */
+ for (i = 1; i < manager->lsm_workers; i++) {
+ WT_ASSERT(session, manager->lsm_worker_cookies[i].tid != 0);
+ WT_TRET(__wt_cond_signal(session, manager->work_cond));
+ WT_TRET(__wt_thread_join(
+ session, manager->lsm_worker_cookies[i].tid));
+ }
+ return (ret);
+}
+
+/*
+ * __lsm_manager_run_server --
+ * Run manager thread operations.
+ */
+static int
+__lsm_manager_run_server(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_LSM_TREE *lsm_tree;
+ struct timespec now;
+ uint64_t fillms, pushms;
+
+ conn = S2C(session);
+ while (F_ISSET(conn, WT_CONN_SERVER_RUN)) {
+ if (TAILQ_EMPTY(&conn->lsmqh)) {
+ __wt_sleep(0, 10000);
+ continue;
+ }
+ __wt_sleep(0, 10000);
+ TAILQ_FOREACH(lsm_tree, &S2C(session)->lsmqh, q) {
+ if (!F_ISSET(lsm_tree, WT_LSM_TREE_ACTIVE))
+ continue;
+ WT_RET(__lsm_manager_aggressive_update(
+ session, lsm_tree));
+ WT_RET(__wt_epoch(session, &now));
+ pushms = lsm_tree->work_push_ts.tv_sec == 0 ? 0 :
+ WT_TIMEDIFF(
+ now, lsm_tree->work_push_ts) / WT_MILLION;
+ fillms = 3 * lsm_tree->chunk_fill_ms;
+ if (fillms == 0)
+ fillms = 10000;
+ /*
+ * If the tree appears to not be triggering enough
+ * LSM maintenance, help it out. Additional work units
+ * don't hurt, and can be necessary if some work
+ * units aren't completed for some reason.
+ * If the tree hasn't been modified, and there are
+ * more than 1 chunks - try to get the tree smaller
+ * so queries run faster.
+ * If we are getting aggressive - ensure there are
+ * enough work units that we can get chunks merged.
+ * If we aren't pushing enough work units, compared
+ * to how often new chunks are being created add some
+ * more.
+ */
+ if (lsm_tree->queue_ref >= LSM_TREE_MAX_QUEUE)
+ WT_STAT_FAST_CONN_INCR(session,
+ lsm_work_queue_max);
+ else if ((!lsm_tree->modified &&
+ lsm_tree->nchunks > 1) ||
+ (lsm_tree->queue_ref == 0 &&
+ lsm_tree->nchunks > 1) ||
+ (lsm_tree->merge_aggressiveness > 3 &&
+ !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) ||
+ pushms > fillms) {
+ WT_RET(__wt_lsm_manager_push_entry(
+ session, WT_LSM_WORK_SWITCH, 0, lsm_tree));
+ WT_RET(__wt_lsm_manager_push_entry(
+ session, WT_LSM_WORK_DROP, 0, lsm_tree));
+ WT_RET(__wt_lsm_manager_push_entry(
+ session, WT_LSM_WORK_FLUSH, 0, lsm_tree));
+ WT_RET(__wt_lsm_manager_push_entry(
+ session, WT_LSM_WORK_BLOOM, 0, lsm_tree));
+ WT_RET(__wt_verbose(session, WT_VERB_LSM,
+ "MGR %s: queue %d mod %d nchunks %d"
+ " flags 0x%x aggressive %d pushms %" PRIu64
+ " fillms %" PRIu64,
+ lsm_tree->name, lsm_tree->queue_ref,
+ lsm_tree->modified, lsm_tree->nchunks,
+ lsm_tree->flags,
+ lsm_tree->merge_aggressiveness,
+ pushms, fillms));
+ WT_RET(__wt_lsm_manager_push_entry(
+ session, WT_LSM_WORK_MERGE, 0, lsm_tree));
+ }
+ }
+ }
+
+ return (0);
+}
+
+/*
+ * __lsm_worker_manager --
+ * A thread that manages all open LSM trees, and the shared LSM worker
+ * threads.
+ */
+static void *
+__lsm_worker_manager(void *arg)
+{
+ WT_DECL_RET;
+ WT_LSM_WORKER_ARGS *cookie;
+ WT_SESSION_IMPL *session;
+
+ cookie = (WT_LSM_WORKER_ARGS *)arg;
+ session = cookie->session;
+
+ WT_ERR(__lsm_manager_worker_setup(session));
+ WT_ERR(__lsm_manager_run_server(session));
+ WT_ERR(__lsm_manager_worker_shutdown(session));
+
+ if (ret != 0) {
+err: __wt_err(session, ret, "LSM worker manager thread error");
+ }
+ F_CLR(S2C(session), WT_CONN_SERVER_LSM);
+ return (NULL);
+}
+
+/*
+ * __wt_lsm_manager_clear_tree --
+ * Remove all entries for a tree from the LSM manager queues. This
+ * introduces an inefficiency if LSM trees are being opened and closed
+ * regularly.
+ */
+int
+__wt_lsm_manager_clear_tree(
+ WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
+{
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORK_UNIT *current, *next;
+ uint64_t removed;
+
+ manager = &S2C(session)->lsm_manager;
+ removed = 0;
+
+ /* Clear out the tree from the switch queue */
+ __wt_spin_lock(session, &manager->switch_lock);
+
+ /* Structure the loop so that it's safe to free as we iterate */
+ for (current = TAILQ_FIRST(&manager->switchqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ if (current->lsm_tree != lsm_tree)
+ continue;
+ ++removed;
+ TAILQ_REMOVE(&manager->switchqh, current, q);
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+ __wt_spin_unlock(session, &manager->switch_lock);
+ /* Clear out the tree from the application queue */
+ __wt_spin_lock(session, &manager->app_lock);
+ for (current = TAILQ_FIRST(&manager->appqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ if (current->lsm_tree != lsm_tree)
+ continue;
+ ++removed;
+ TAILQ_REMOVE(&manager->appqh, current, q);
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+ __wt_spin_unlock(session, &manager->app_lock);
+ /* Clear out the tree from the manager queue */
+ __wt_spin_lock(session, &manager->manager_lock);
+ for (current = TAILQ_FIRST(&manager->managerqh);
+ current != NULL; current = next) {
+ next = TAILQ_NEXT(current, q);
+ if (current->lsm_tree != lsm_tree)
+ continue;
+ ++removed;
+ TAILQ_REMOVE(&manager->managerqh, current, q);
+ __wt_lsm_manager_free_work_unit(session, current);
+ }
+ __wt_spin_unlock(session, &manager->manager_lock);
+ WT_STAT_FAST_CONN_INCRV(session, lsm_work_units_discarded, removed);
+ return (0);
+}
+
+/*
+ * We assume this is only called from __wt_lsm_manager_pop_entry and we
+ * have session, entry and type available to use. If the queue is empty
+ * we may return from the macro.
+ */
+#define LSM_POP_ENTRY(qh, qlock, qlen) do { \
+ if (TAILQ_EMPTY(qh)) \
+ return (0); \
+ __wt_spin_lock(session, qlock); \
+ TAILQ_FOREACH(entry, (qh), q) { \
+ if (FLD_ISSET(type, entry->type)) { \
+ TAILQ_REMOVE(qh, entry, q); \
+ WT_STAT_FAST_CONN_DECR(session, qlen); \
+ break; \
+ } \
+ } \
+ __wt_spin_unlock(session, (qlock)); \
+} while (0)
+
+/*
+ * __wt_lsm_manager_pop_entry --
+ * Retrieve the head of the queue, if it matches the requested work
+ * unit type.
+ */
+int
+__wt_lsm_manager_pop_entry(
+ WT_SESSION_IMPL *session, uint32_t type, WT_LSM_WORK_UNIT **entryp)
+{
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORK_UNIT *entry;
+
+ manager = &S2C(session)->lsm_manager;
+ *entryp = NULL;
+ entry = NULL;
+
+ /*
+ * Pop the entry off the correct queue based on our work type.
+ */
+ if (type == WT_LSM_WORK_SWITCH)
+ LSM_POP_ENTRY(&manager->switchqh,
+ &manager->switch_lock, lsm_work_queue_switch);
+ else if (type == WT_LSM_WORK_MERGE)
+ LSM_POP_ENTRY(&manager->managerqh,
+ &manager->manager_lock, lsm_work_queue_manager);
+ else
+ LSM_POP_ENTRY(&manager->appqh,
+ &manager->app_lock, lsm_work_queue_app);
+ if (entry != NULL)
+ WT_STAT_FAST_CONN_INCR(session, lsm_work_units_done);
+ *entryp = entry;
+ return (0);
+}
+
+/*
+ * Push a work unit onto the appropriate queue. This macro assumes we are
+ * called from __wt_lsm_manager_push_entry and we have session and entry
+ * available for use.
+ */
+#define LSM_PUSH_ENTRY(qh, qlock, qlen) do { \
+ __wt_spin_lock(session, qlock); \
+ TAILQ_INSERT_TAIL((qh), entry, q); \
+ WT_STAT_FAST_CONN_INCR(session, qlen); \
+ __wt_spin_unlock(session, qlock); \
+} while (0)
+
+/*
+ * __wt_lsm_manager_push_entry --
+ * Add an entry to the end of the switch queue.
+ */
+int
+__wt_lsm_manager_push_entry(WT_SESSION_IMPL *session,
+ uint32_t type, uint32_t flags, WT_LSM_TREE *lsm_tree)
+{
+ WT_LSM_MANAGER *manager;
+ WT_LSM_WORK_UNIT *entry;
+
+ manager = &S2C(session)->lsm_manager;
+
+ WT_RET(__wt_epoch(session, &lsm_tree->work_push_ts));
+
+ WT_RET(__wt_calloc_def(session, 1, &entry));
+ entry->type = type;
+ entry->flags = flags;
+ entry->lsm_tree = lsm_tree;
+ (void)WT_ATOMIC_ADD4(lsm_tree->queue_ref, 1);
+ WT_STAT_FAST_CONN_INCR(session, lsm_work_units_created);
+
+ if (type == WT_LSM_WORK_SWITCH)
+ LSM_PUSH_ENTRY(&manager->switchqh,
+ &manager->switch_lock, lsm_work_queue_switch);
+ else if (type == WT_LSM_WORK_MERGE)
+ LSM_PUSH_ENTRY(&manager->managerqh,
+ &manager->manager_lock, lsm_work_queue_manager);
+ else
+ LSM_PUSH_ENTRY(&manager->appqh,
+ &manager->app_lock, lsm_work_queue_app);
+
+ WT_RET(__wt_cond_signal(session, manager->work_cond));
+
+ return (0);
+}