diff options
Diffstat (limited to 'storage/tokudb/tokudb_background.cc')
-rw-r--r-- | storage/tokudb/tokudb_background.cc | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/storage/tokudb/tokudb_background.cc b/storage/tokudb/tokudb_background.cc new file mode 100644 index 00000000000..d8ef54a5972 --- /dev/null +++ b/storage/tokudb/tokudb_background.cc @@ -0,0 +1,253 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "$Id$" +/*====== +This file is part of TokuDB + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + TokuDBis is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + TokuDB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with TokuDB. If not, see <http://www.gnu.org/licenses/>. + +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "tokudb_background.h" +#include "tokudb_sysvars.h" + +namespace tokudb { +namespace background { + + +std::atomic<uint64_t> job_manager_t::job_t::_next_id(1); + +job_manager_t::job_t::job_t(bool user_scheduled) : + _running(false), + _cancelled(false), + _id(_next_id++), + _user_scheduled(user_scheduled), + _scheduled_time(::time(0)), + _started_time(0) { +} +job_manager_t::job_t::~job_t() { +} +void* job_manager_t::operator new(size_t sz) { + return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); +} +void job_manager_t::operator delete(void* p) { + tokudb::memory::free(p); +} +job_manager_t::job_manager_t() : + _sem(0, 65535), + _shutdown(false) { +} +job_manager_t::~job_manager_t() { +} +void job_manager_t::initialize() { + int r = _thread.start(thread_func, this); + assert_always(r == 0); +} +void job_manager_t::destroy() { + assert_always(!_shutdown); + assert_always(_foreground_jobs.size() == 0); + _shutdown = true; + _sem.set_interrupt(); + + while (_background_jobs.size()) { + _mutex.lock(); + job_t* job = _background_jobs.front(); + cancel(job); + _background_jobs.pop_front(); + delete job; + _mutex.unlock(); + } + + void* result; + int r = _thread.join(&result); + assert_always(r == 0); +} +bool job_manager_t::run_job(job_t* newjob, bool background) { + bool ret = false; + const char* jobkey = newjob->key(); + + _mutex.lock(); + assert_always(!_shutdown); + + for (jobs_t::iterator it = _background_jobs.begin(); + it != _background_jobs.end(); + it++) { + job_t* job = *it; + if (!job->cancelled() && strcmp(job->key(), jobkey) == 0) { + // if this is a foreground job being run and + // there is an existing background job of the same type + // and it is not running yet, we can cancel the background job + // and just run this one in the foreground, might have different + // params, but that is up to the user to figure out. + if (!background && !job->running()) { + job->cancel(); + } else { + // can't schedule or run another job on the same key + goto cleanup; + } + } + } + for (jobs_t::iterator it = _foreground_jobs.begin(); + it != _foreground_jobs.end(); + it++) { + job_t* job = *it; + if (strcmp(job->key(), jobkey) == 0) { + // can't schedule or run another job on the same key + // as an existing foreground job + goto cleanup; + } + } + + if (background) { + _background_jobs.push_back(newjob); + _sem.signal(); + ret = true; + } else { + _foreground_jobs.push_back(newjob); + + run(newjob); + + for (jobs_t::iterator it = _foreground_jobs.begin(); + it != _foreground_jobs.end(); + it++) { + job_t* job = *it; + if (job == newjob) { + _foreground_jobs.erase(it); + delete job; + break; + } + } + ret = true; + } + +cleanup: + _mutex.unlock(); + return ret; +} +bool job_manager_t::cancel_job(const char* key) { + bool ret = false; + _mutex.lock(); + + for (jobs_t::iterator it = _background_jobs.begin(); + it != _background_jobs.end(); it++) { + job_t* job = *it; + + if (!job->cancelled() && + strcmp(job->key(), key) == 0) { + + cancel(job); + + ret = true; + } + } + + _mutex.unlock(); + return ret; +} +void job_manager_t::iterate_jobs(pfn_iterate_t callback, void* extra) const { + + char database[256], table[256], type[256], params[256], status[256]; + + _mutex.lock(); + + for (jobs_t::const_iterator it = _background_jobs.begin(); + it != _background_jobs.end(); + it++) { + job_t* job = *it; + if (!job->cancelled()) { + database[0] = table[0] = type[0] = params[0] = status[0] = '\0'; + job->status(database, table, type, params, status); + callback( + job->id(), + database, + table, + type, + params, + status, + job->user_scheduled(), + job->scheduled_time(), + job->started_time(), + extra); + } + } + + _mutex.unlock(); +} +void* job_manager_t::thread_func(void* v) { + return ((tokudb::background::job_manager_t*)v)->real_thread_func(); +} +void* job_manager_t::real_thread_func() { + while (_shutdown == false) { + tokudb::thread::semaphore_t::E_WAIT res = _sem.wait(); + if (res == tokudb::thread::semaphore_t::E_INTERRUPTED || _shutdown) { + break; + } else if (res == tokudb::thread::semaphore_t::E_SIGNALLED) { +#if TOKUDB_DEBUG + if (TOKUDB_UNLIKELY( + tokudb::sysvars::debug_pause_background_job_manager)) { + _sem.signal(); + tokudb::time::sleep_microsec(250000); + continue; + } +#endif // TOKUDB_DEBUG + + _mutex.lock(); + assert_debug(_background_jobs.size() > 0); + job_t* job = _background_jobs.front(); + run(job); + _background_jobs.pop_front(); + _mutex.unlock(); + delete job; + } + } + return NULL; +} +void job_manager_t::run(job_t* job) { + assert_debug(_mutex.is_owned_by_me()); + if (!job->cancelled()) { + _mutex.unlock(); + // do job + job->run(); + // done job + _mutex.lock(); + } + if (!job->cancelled()) { + job->destroy(); + } +} +void job_manager_t::cancel(job_t* job) { + assert_debug(_mutex.is_owned_by_me()); + job->cancel(); +} +job_manager_t* _job_manager = NULL; + +bool initialize() { + assert_always(_job_manager == NULL); + _job_manager = new job_manager_t; + _job_manager->initialize(); + return true; +} +bool destroy() { + _job_manager->destroy(); + delete _job_manager; + _job_manager = NULL; + return true; +} +} // namespace background +} // namespace tokudb |