diff options
author | unknown <andrey@lmy004.> | 2006-07-12 10:37:30 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-12 10:37:30 +0200 |
commit | 628be8a71611bc86f7f0cf809b27d63bdd9b12c8 (patch) | |
tree | 1efc1513a721e18e2fa1fe76b14936f2d9e2d603 /sql | |
parent | 42a8e2c9421854710679a0f6c3ceef6c0777ded4 (diff) | |
download | mariadb-git-628be8a71611bc86f7f0cf809b27d63bdd9b12c8.tar.gz |
WL#3337 (Event scheduler new architecture)
event_scheduler_ng.cc/h is no more
BitKeeper/deleted/.del-event_scheduler_ng.cc~8896b89040dbc4f6:
Delete: sql/event_scheduler_ng.cc
BitKeeper/deleted/.del-event_scheduler_ng.h~1431af5b185376f:
Delete: sql/event_scheduler_ng.h
mysql-test/r/not_embedded_server.result:
fix test
sql/Makefile.am:
event_scheduler_ng.cc/h is no more
sql/event_queue.cc:
event_scheduler_ng.cc/h is no more
sql/event_queue.h:
event_scheduler_ng.cc/h is no more
sql/event_scheduler.cc:
event_scheduler_ng.cc/h is no more
sql/event_scheduler.h:
event_scheduler_ng.cc/h is no more
sql/events.cc:
event_scheduler_ng.cc/h is no more
sql/events.h:
event_scheduler_ng.cc/h is no more
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 4 | ||||
-rw-r--r-- | sql/event_queue.cc | 4 | ||||
-rw-r--r-- | sql/event_queue.h | 6 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 865 | ||||
-rw-r--r-- | sql/event_scheduler.h | 104 | ||||
-rw-r--r-- | sql/event_scheduler_ng.cc | 881 | ||||
-rw-r--r-- | sql/event_scheduler_ng.h | 123 | ||||
-rw-r--r-- | sql/events.cc | 28 | ||||
-rw-r--r-- | sql/events.h | 4 |
9 files changed, 992 insertions, 1027 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index 4d76cdb5080..3518903d149 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sql_array.h sql_cursor.h events.h \ sql_plugin.h authors.h sql_partition.h event_data_objects.h \ event_queue.h event_db_repository.h \ - partition_info.h partition_element.h event_scheduler_ng.h \ + partition_info.h partition_element.h event_scheduler.h \ contributors.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -105,7 +105,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ tztime.cc my_time.c my_user.c my_decimal.cc\ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\ - event_scheduler_ng.cc events.cc event_data_objects.cc \ + events.cc event_data_objects.cc \ event_queue.cc event_db_repository.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 63dee303fc2..12eceee8cfb 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -18,7 +18,7 @@ #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" -#include "event_scheduler_ng.h" +#include "event_scheduler.h" #define EVENT_QUEUE_INITIAL_SIZE 30 @@ -123,7 +123,7 @@ Event_queue::deinit_mutexes() */ bool -Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) +Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) { int i= 0; bool ret= FALSE; diff --git a/sql/event_queue.h b/sql/event_queue.h index 15a28b4103d..b7962d14a89 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -22,7 +22,7 @@ class Event_job_data; class Event_queue_element; class THD; -class Event_scheduler_ng; +class Event_scheduler; class Event_queue { @@ -36,7 +36,7 @@ public: deinit_mutexes(); bool - init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched); + init_queue(Event_db_repository *db_repo, Event_scheduler *sched); void deinit_queue(); @@ -109,7 +109,7 @@ protected: void dbug_dump_queue(time_t now); - Event_scheduler_ng *scheduler; + Event_scheduler *scheduler; /* The sorted queue with the Event_job_data objects */ QUEUE queue; diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index f1c7d8394e3..85d357ec11e 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -14,3 +14,868 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "mysql_priv.h" +#include "events.h" +#include "event_data_objects.h" +#include "event_scheduler.h" +#include "event_queue.h" + +#ifdef __GNUC__ +#if __GNUC__ >= 2 +#define SCHED_FUNC __FUNCTION__ +#endif +#else +#define SCHED_FUNC "<unknown>" +#endif + +#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) +#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) +#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__) + +extern pthread_attr_t connection_attrib; + +struct scheduler_param +{ + THD *thd; + Event_scheduler *scheduler; +}; + +struct scheduler_param scheduler_param_value; + + + +static +LEX_STRING scheduler_states_names[] = +{ + { C_STRING_WITH_LEN("INITIALIZED")}, + { C_STRING_WITH_LEN("RUNNING")}, + { C_STRING_WITH_LEN("STOPPING")} +}; + + +/* + Prints the stack of infos, warnings, errors from thd to + the console so it can be fetched by the logs-into-tables and + checked later. + + SYNOPSIS + evex_print_warnings + thd Thread used during the execution of the event + et The event itself +*/ + +static void +evex_print_warnings(THD *thd, Event_job_data *et) +{ + MYSQL_ERROR *err; + DBUG_ENTER("evex_print_warnings"); + if (!thd->warn_list.elements) + DBUG_VOID_RETURN; + + char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; + char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; + String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); + prefix.length(0); + prefix.append("SCHEDULER: ["); + + append_identifier(thd, &prefix, et->definer.str, et->definer.length); + prefix.append("][", 2); + append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); + prefix.append('.'); + append_identifier(thd,&prefix, et->name.str, et->name.length); + prefix.append("] ", 2); + + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + while ((err= it++)) + { + String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); + /* set it to 0 or we start adding at the end. That's the trick ;) */ + err_msg.length(0); + err_msg.append(prefix); + err_msg.append(err->msg, strlen(err->msg), system_charset_info); + err_msg.append("]"); + DBUG_ASSERT(err->level < 3); + (sql_print_message_handlers[err->level])("%*s", err_msg.length(), + err_msg.c_ptr()); + } + DBUG_VOID_RETURN; +} + + +/* + Inits an scheduler thread handler, both the main and a worker + + SYNOPSIS + init_event_thread() + thd - the THD of the thread. Has to be allocated by the caller. + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + + RETURN VALUE + 0 OK + -1 Error +*/ + +static int +init_scheduler_thread(THD* thd) +{ + DBUG_ENTER("init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->master_access= 0; + thd->security_ctx->db_access= 0; + thd->security_ctx->host_or_ip= (char*)my_localhost; + thd->security_ctx->set_user((char*)"event_scheduler"); + my_net_init(&thd->net, NULL); + thd->net.read_timeout= slave_net_timeout; + thd->slave_thread= 0; + thd->options|= OPTION_AUTO_IS_NULL; + thd->client_capabilities|= CLIENT_MULTI_RESULTS; + pthread_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + threads.append(thd); + thread_count++; + thread_running++; + pthread_mutex_unlock(&LOCK_thread_count); + + /* + Guarantees that we will see the thread in SHOW PROCESSLIST though its + vio is NULL. + */ + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + + DBUG_RETURN(0); +} + + +/* + Cleans up the THD and the threaded environment of the thread. + + SYNOPSIS + deinit_event_thread() + thd Thread +*/ + +static void +deinit_event_thread(THD *thd) +{ + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); + DBUG_PRINT("exit", ("Scheduler thread finishing")); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + + my_thread_end(); +} + + +/* + Function that executes the scheduler, + + SYNOPSIS + event_scheduler_thread() + arg Pointer to `struct scheduler_param` + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_scheduler_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd= (THD *)(*(struct scheduler_param *) arg).thd; + + thd->thread_stack= (char *)&thd; // remember where our stack is + DBUG_ENTER("event_scheduler_thread"); + + my_thread_init(); + pthread_detach_this_thread(); + thd->real_id=pthread_self(); + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + goto end; + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + ((struct scheduler_param *) arg)->scheduler->run(thd); + +end: + deinit_event_thread(thd); + + DBUG_RETURN(0); // Against gcc warnings +} + + +/* + Function that executes an event in a child thread. Setups the + environment for the event execution and cleans after that. + + SYNOPSIS + event_worker_thread() + arg The Event_job_data object to be processed + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_worker_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd; + Event_job_data *event= (Event_job_data *)arg; + int ret; + + thd= event->thd; + thd->thread_stack= (char *) &thd; + + + my_thread_init(); + pthread_detach_this_thread(); + thd->real_id=pthread_self(); + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + goto end; + } + + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals)); +#endif + thd->init_for_queries(); + + DBUG_ENTER("event_worker_thread"); + DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." + "THD=0x%lx", time(NULL), thd)); + + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id); + + thd->enable_slow_log= TRUE; + + ret= event->execute(thd); + + evex_print_warnings(thd, event); + + sql_print_information("SCHEDULER: [%s.%s of %s] executed " + " in thread thread %lu. RetCode=%d", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + event->dbname.str, event->name.str, + event->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) + sql_print_information("SCHEDULER: MICROSECOND is not supported"); + +end: + DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, + event->name.str)); + delete event; + + deinit_event_thread(thd); + + DBUG_RETURN(0); // Against gcc warnings +} + + +/* + Performs initialization of the scheduler data, outside of the + threading primitives. + + SYNOPSIS + Event_scheduler::init_scheduler() +*/ + +bool +Event_scheduler::init_scheduler(Event_queue *q) +{ + LOCK_SCHEDULER_DATA(); + thread_id= 0; + state= INITIALIZED; + queue= q; + started_events= 0; + UNLOCK_SCHEDULER_DATA(); + + return FALSE; +} + + +void +Event_scheduler::deinit_scheduler() {} + + +/* + Inits scheduler's threading primitives. + + SYNOPSIS + Event_scheduler::init_mutexes() +*/ + +void +Event_scheduler::init_mutexes() +{ + pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_state, NULL); +} + + +/* + Deinits scheduler's threading primitives. + + SYNOPSIS + Event_scheduler::deinit_mutexes() +*/ + +void +Event_scheduler::deinit_mutexes() +{ + pthread_mutex_destroy(&LOCK_scheduler_state); + pthread_cond_destroy(&COND_state); +} + + +/* + Starts the scheduler (again). Creates a new THD and passes it to + a forked thread. Does not wait for acknowledgement from the new + thread that it has started. Asynchronous starting. Most of the + needed initializations are done in the current thread to minimize + the chance of failure in the spawned thread. + + SYNOPSIS + Event_scheduler::start() + + RETURN VALUE + FALSE OK + TRUE Error (not reported) +*/ + +bool +Event_scheduler::start() +{ + THD *new_thd= NULL; + bool ret= FALSE; + pthread_t th; + DBUG_ENTER("Event_scheduler::start"); + + LOCK_SCHEDULER_DATA(); + DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); + if (state > INITIALIZED) + goto end; + + if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) + { + sql_print_error("SCHEDULER: Cannot init manager event thread."); + ret= TRUE; + goto end; + } + new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER; + new_thd->command= COM_DAEMON; + + scheduler_param_value.thd= new_thd; + scheduler_param_value.scheduler= this; + + DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); + if (pthread_create(&th, &connection_attrib, event_scheduler_thread, + (void*)&scheduler_param_value)) + { + DBUG_PRINT("error", ("cannot create a new thread")); + state= INITIALIZED; + ret= TRUE; + } + DBUG_PRINT("info", ("Setting state go RUNNING")); + state= RUNNING; +end: + UNLOCK_SCHEDULER_DATA(); + + if (ret && new_thd) + { + DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); + new_thd->proc_info= "Clearing"; + DBUG_ASSERT(new_thd->net.buff != 0); + net_end(&new_thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete new_thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + DBUG_RETURN(ret); +} + + +/* + Stops the scheduler (again). Waits for acknowledgement from the + scheduler that it has stopped - synchronous stopping. + + SYNOPSIS + Event_scheduler::stop() + + RETURN VALUE + FALSE OK + TRUE Error (not reported) +*/ + +bool +Event_scheduler::stop() +{ + THD *thd= current_thd; + DBUG_ENTER("Event_scheduler::stop"); + DBUG_PRINT("enter", ("thd=0x%lx", current_thd)); + + LOCK_SCHEDULER_DATA(); + DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); + if (state != RUNNING) + goto end; + + state= STOPPING; + + DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); + sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); + + pthread_cond_signal(&COND_state); + + /* Guarantee we don't catch spurious signals */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); + do { + DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " + "thread. Current value of state is %s . " + "workers count=%d", scheduler_states_names[state].str, + workers_count())); + /* thd could be 0x0, when shutting down */ + COND_STATE_WAIT(NULL); + } while (state == STOPPING); + DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); + + thread_id= 0; +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(FALSE); +} + + +/* + The main loop of the scheduler. + + SYNOPSIS + Event_scheduler::run() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Error (Serious error) +*/ + +bool +Event_scheduler::run(THD *thd) +{ + int res; + struct timespec abstime; + Event_job_data *job_data; + DBUG_ENTER("Event_scheduler::run"); + + LOCK_SCHEDULER_DATA(); + + thread_id= thd->thread_id; + sql_print_information("SCHEDULER: Manager thread started with id %lu", + thread_id); + /* + Recalculate the values in the queue because there could have been stops + in executions of the scheduler and some times could have passed by. + */ + queue->recalculate_activation_times(thd); + while (state == RUNNING) + { + thd->end_time(); + /* Gets a minimized version */ + if (queue->get_top_for_execution_if_time(thd, thd->query_start(), + &job_data, &abstime)) + { + sql_print_information("SCHEDULER: Serious error during getting next" + " event to execute. Stopping."); + break; + } + + DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " + "abs_time.tv_sec=%d", + job_data, thd->query_start(), abstime.tv_sec)); + if (!job_data && !abstime.tv_sec) + { + DBUG_PRINT("info", ("The queue is empty. Going to sleep")); + thd->enter_cond(&COND_state, &LOCK_scheduler_state, + "Waiting on empty queue"); + COND_STATE_WAIT(NULL); + thd->exit_cond(""); + DBUG_PRINT("info", ("Woke up. Got COND_state")); + LOCK_SCHEDULER_DATA(); + } + else if (abstime.tv_sec) + { + DBUG_PRINT("info", ("Have to sleep some time %u till", + abstime.tv_sec - thd->query_start(), abstime.tv_sec)); + + thd->enter_cond(&COND_state, &LOCK_scheduler_state, + "Waiting for next activation"); + COND_STATE_WAIT(&abstime); + /* + If we get signal we should recalculate the whether it's the right time + because there could be : + 1. Spurious wake-up + 2. The top of the queue was changed (new one becase of create/update) + */ + /* This will do implicit UNLOCK_SCHEDULER_DATA() */ + thd->exit_cond(""); + DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); + LOCK_SCHEDULER_DATA(); + } + else + { + UNLOCK_SCHEDULER_DATA(); + res= execute_top(thd, job_data); + LOCK_SCHEDULER_DATA(); + if (res) + break; + ++started_events; + } + DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); + } + DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); + pthread_cond_signal(&COND_state); +error: + state= INITIALIZED; + UNLOCK_SCHEDULER_DATA(); + sql_print_information("SCHEDULER: Stopped"); + + DBUG_RETURN(res); +} + + +/* + Creates a new THD instance and then forks a new thread, while passing + the THD pointer and job_data to it. + + SYNOPSIS + Event_scheduler::execute_top() + + RETURN VALUE + FALSE OK + TRUE Error (Serious error) +*/ + +bool +Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) +{ + THD *new_thd; + pthread_t th; + int res= 0; + DBUG_ENTER("Event_scheduler::execute_top"); + if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) + goto error; + + new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; + job_data->thd= new_thd; + DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition", + job_data->dbname.str, job_data->name.str)); + + /* Major failure */ + if ((res= pthread_create(&th, &connection_attrib, event_worker_thread, + job_data))) + goto error; + + DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); + DBUG_RETURN(FALSE); + +error: + DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res)); + if (new_thd) + { + new_thd->proc_info= "Clearing"; + DBUG_ASSERT(new_thd->net.buff != 0); + net_end(&new_thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete new_thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + delete job_data; + DBUG_RETURN(TRUE); +} + + +/* + Returns the current state of the scheduler + + SYNOPSIS + Event_scheduler::get_state() + + RETURN VALUE + The state of the scheduler (INITIALIZED | RUNNING | STOPPING) +*/ + +enum Event_scheduler::enum_state +Event_scheduler::get_state() +{ + enum Event_scheduler::enum_state ret; + LOCK_SCHEDULER_DATA(); + ret= state; + UNLOCK_SCHEDULER_DATA(); + return ret; +} + + +/* + Returns the number of living event worker threads. + + SYNOPSIS + Event_scheduler::workers_count() +*/ + +uint +Event_scheduler::workers_count() +{ + THD *tmp; + uint count= 0; + + DBUG_ENTER("Event_scheduler::workers_count"); + pthread_mutex_lock(&LOCK_thread_count); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + ++count; + } + pthread_mutex_unlock(&LOCK_thread_count); + DBUG_PRINT("exit", ("%d", count)); + DBUG_RETURN(count); +} + + +/* + Signals the main scheduler thread that the queue has changed + its state. + + SYNOPSIS + Event_scheduler::queue_changed() +*/ + +void +Event_scheduler::queue_changed() +{ + DBUG_ENTER("Event_scheduler::queue_changed"); + DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", + scheduler_states_names[state].str)); + pthread_cond_signal(&COND_state); + DBUG_VOID_RETURN; +} + + +/* + Auxiliary function for locking LOCK_scheduler_state. Used + by the LOCK_SCHEDULER_DATA macro. + + SYNOPSIS + Event_scheduler::lock_data() + func Which function is requesting mutex lock + line On which line mutex lock is requested +*/ + +void +Event_scheduler::lock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::lock_data"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); + pthread_mutex_lock(&LOCK_scheduler_state); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_scheduler_data_locked= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Auxiliary function for unlocking LOCK_scheduler_state. Used + by the UNLOCK_SCHEDULER_DATA macro. + + SYNOPSIS + Event_scheduler::unlock_data() + func Which function is requesting mutex unlock + line On which line mutex unlock is requested +*/ + +void +Event_scheduler::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::unlock_data"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); + mutex_last_unlocked_at_line= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + pthread_mutex_unlock(&LOCK_scheduler_state); + DBUG_VOID_RETURN; +} + + +/* + Wrapper for pthread_cond_wait/timedwait + + SYNOPSIS + Event_scheduler::cond_wait() + cond Conditional to wait for + mutex Mutex of the conditional + + RETURN VALUE + Error code of pthread_cond_wait() +*/ + +void +Event_scheduler::cond_wait(struct timespec *abstime, + const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::cond_wait"); + waiting_on_cond= TRUE; + mutex_last_unlocked_at_line= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + + if (abstime) + pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime); + else + pthread_cond_wait(&COND_state, &LOCK_scheduler_state); + + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_scheduler_data_locked= TRUE; + waiting_on_cond= FALSE; + + DBUG_VOID_RETURN; +} + + +/* + Dumps the internal status of the scheduler + + SYNOPSIS + Event_scheduler::dump_internal_status() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_scheduler::dump_internal_status(THD *thd) +{ + int ret= 0; + DBUG_ENTER("Event_scheduler::dump_internal_status"); + +#ifndef DBUG_OFF + CHARSET_INFO *scs= system_charset_info; + Protocol *protocol= thd->protocol; + char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + int_string.length(0); + + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler state"), scs); + protocol->store(scheduler_states_names[state].str, + scheduler_states_names[state].length, scs); + + if ((ret= protocol->write())) + goto end; + + /* thread_id */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("thread_id"), scs); + if (thread_id) + { + int_string.set((longlong) thread_id, scs); + protocol->store(&int_string); + } + else + protocol->store_null(); + if ((ret= protocol->write())) + goto end; + + /* last locked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_locked_in_func, + mutex_last_locked_at_line)); + protocol->store(&tmp_string); + if ((ret= protocol->write())) + goto end; + + /* last unlocked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_unlocked_in_func, + mutex_last_unlocked_at_line)); + protocol->store(&tmp_string); + if ((ret= protocol->write())) + goto end; + + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs); + int_string.set((longlong) waiting_on_cond, scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + goto end; + + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler workers count"), scs); + int_string.set((longlong) workers_count(), scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + goto end; + + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler executed events"), scs); + int_string.set((longlong) started_events, scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + goto end; + + /* scheduler_data_locked */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); + int_string.set((longlong) mutex_scheduler_data_locked, scs); + protocol->store(&int_string); + ret= protocol->write(); +end: +#endif + + DBUG_RETURN(ret); +} diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index acd0debe391..bf3e8e63e11 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -16,4 +16,108 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +class Event_queue; +class Event_job_data; + +class Event_scheduler +{ +public: + Event_scheduler(){} + ~Event_scheduler(){} + + enum enum_state + { + INITIALIZED = 0, + RUNNING, + STOPPING + }; + + /* State changing methods follow */ + + bool + start(); + + bool + stop(); + + /* + Need to be public because has to be called from the function + passed to pthread_create. + */ + bool + run(THD *thd); + + bool + init_scheduler(Event_queue *queue); + + void + deinit_scheduler(); + + void + init_mutexes(); + + void + deinit_mutexes(); + + /* Information retrieving methods follow */ + + enum enum_state + get_state(); + + void + queue_changed(); + + bool + dump_internal_status(THD *thd); + +private: + uint + workers_count(); + + /* helper functions */ + bool + execute_top(THD *thd, Event_job_data *job_data); + + /* helper functions for working with mutexes & conditionals */ + void + lock_data(const char *func, uint line); + + void + unlock_data(const char *func, uint line); + + void + cond_wait(struct timespec *abstime, const char *func, uint line); + + pthread_mutex_t LOCK_scheduler_state; + + /* This is the current status of the life-cycle of the scheduler. */ + enum enum_state state; + + /* + Holds the thread id of the executor thread or 0 if the scheduler is not + running. It is used by ::shutdown() to know which thread to kill with + kill_one_thread(). The latter wake ups a thread if it is waiting on a + conditional variable and sets thd->killed to non-zero. + */ + ulong thread_id; + + pthread_cond_t COND_state; + + Event_queue *queue; + + uint mutex_last_locked_at_line; + uint mutex_last_unlocked_at_line; + const char* mutex_last_locked_in_func; + const char* mutex_last_unlocked_in_func; + bool mutex_scheduler_data_locked; + bool waiting_on_cond; + + ulonglong started_events; + +private: + /* Prevent use of these */ + Event_scheduler(const Event_scheduler &); + void operator=(Event_scheduler &); +}; + #endif /* _EVENT_SCHEDULER_H_ */ diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc deleted file mode 100644 index 1f004d0b05e..00000000000 --- a/sql/event_scheduler_ng.cc +++ /dev/null @@ -1,881 +0,0 @@ -/* Copyright (C) 2004-2006 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "mysql_priv.h" -#include "events.h" -#include "event_data_objects.h" -#include "event_scheduler_ng.h" -#include "event_queue.h" - -#ifdef __GNUC__ -#if __GNUC__ >= 2 -#define SCHED_FUNC __FUNCTION__ -#endif -#else -#define SCHED_FUNC "<unknown>" -#endif - -#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) -#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) -#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__) - -extern pthread_attr_t connection_attrib; - -struct scheduler_param -{ - THD *thd; - Event_scheduler_ng *scheduler; -}; - -struct scheduler_param scheduler_param_value; - - - -static -LEX_STRING scheduler_states_names[] = -{ - { C_STRING_WITH_LEN("INITIALIZED")}, - { C_STRING_WITH_LEN("RUNNING")}, - { C_STRING_WITH_LEN("STOPPING")} -}; - - -/* - Prints the stack of infos, warnings, errors from thd to - the console so it can be fetched by the logs-into-tables and - checked later. - - SYNOPSIS - evex_print_warnings - thd Thread used during the execution of the event - et The event itself -*/ - -static void -evex_print_warnings(THD *thd, Event_job_data *et) -{ - MYSQL_ERROR *err; - DBUG_ENTER("evex_print_warnings"); - if (!thd->warn_list.elements) - DBUG_VOID_RETURN; - - char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; - char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; - String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); - prefix.length(0); - prefix.append("SCHEDULER: ["); - - append_identifier(thd, &prefix, et->definer.str, et->definer.length); - prefix.append("][", 2); - append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); - prefix.append('.'); - append_identifier(thd,&prefix, et->name.str, et->name.length); - prefix.append("] ", 2); - - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); - while ((err= it++)) - { - String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); - /* set it to 0 or we start adding at the end. That's the trick ;) */ - err_msg.length(0); - err_msg.append(prefix); - err_msg.append(err->msg, strlen(err->msg), system_charset_info); - err_msg.append("]"); - DBUG_ASSERT(err->level < 3); - (sql_print_message_handlers[err->level])("%*s", err_msg.length(), - err_msg.c_ptr()); - } - DBUG_VOID_RETURN; -} - - -/* - Inits an scheduler thread handler, both the main and a worker - - SYNOPSIS - init_event_thread() - thd - the THD of the thread. Has to be allocated by the caller. - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. - - RETURN VALUE - 0 OK - -1 Error -*/ - -static int -init_scheduler_thread(THD* thd) -{ - DBUG_ENTER("init_event_thread"); - thd->client_capabilities= 0; - thd->security_ctx->master_access= 0; - thd->security_ctx->db_access= 0; - thd->security_ctx->host_or_ip= (char*)my_localhost; - thd->security_ctx->set_user((char*)"event_scheduler"); - my_net_init(&thd->net, NULL); - thd->net.read_timeout= slave_net_timeout; - thd->slave_thread= 0; - thd->options|= OPTION_AUTO_IS_NULL; - thd->client_capabilities|= CLIENT_MULTI_RESULTS; - pthread_mutex_lock(&LOCK_thread_count); - thd->thread_id= thread_id++; - threads.append(thd); - thread_count++; - thread_running++; - pthread_mutex_unlock(&LOCK_thread_count); - - /* - Guarantees that we will see the thread in SHOW PROCESSLIST though its - vio is NULL. - */ - - thd->proc_info= "Initialized"; - thd->version= refresh_version; - thd->set_time(); - - DBUG_RETURN(0); -} - - -/* - Cleans up the THD and the threaded environment of the thread. - - SYNOPSIS - deinit_event_thread() - thd Thread -*/ - -static void -deinit_event_thread(THD *thd) -{ - thd->proc_info= "Clearing"; - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); - DBUG_PRINT("exit", ("Scheduler thread finishing")); - pthread_mutex_lock(&LOCK_thread_count); - thread_count--; - thread_running--; - delete thd; - pthread_mutex_unlock(&LOCK_thread_count); - - my_thread_end(); -} - - -/* - Function that executes the scheduler, - - SYNOPSIS - event_scheduler_ng_thread() - arg Pointer to `struct scheduler_param` - - RETURN VALUE - 0 OK -*/ - -pthread_handler_t -event_scheduler_ng_thread(void *arg) -{ - /* needs to be first for thread_stack */ - THD *thd= (THD *)(*(struct scheduler_param *) arg).thd; - - thd->thread_stack= (char *)&thd; // remember where our stack is - DBUG_ENTER("event_scheduler_ng_thread"); - - my_thread_init(); - pthread_detach_this_thread(); - thd->real_id=pthread_self(); - if (init_thr_lock() || thd->store_globals()) - { - thd->cleanup(); - goto end; - } - -#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) - sigset_t set; - VOID(sigemptyset(&set)); // Get mask in use - VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); -#endif - - ((struct scheduler_param *) arg)->scheduler->run(thd); - -end: - deinit_event_thread(thd); - - DBUG_RETURN(0); // Against gcc warnings -} - - -/* - Function that executes an event in a child thread. Setups the - environment for the event execution and cleans after that. - - SYNOPSIS - event_worker_ng_thread() - arg The Event_job_data object to be processed - - RETURN VALUE - 0 OK -*/ - -pthread_handler_t -event_worker_ng_thread(void *arg) -{ - /* needs to be first for thread_stack */ - THD *thd; - Event_job_data *event= (Event_job_data *)arg; - int ret; - - thd= event->thd; - thd->thread_stack= (char *) &thd; - - - my_thread_init(); - pthread_detach_this_thread(); - thd->real_id=pthread_self(); - if (init_thr_lock() || thd->store_globals()) - { - thd->cleanup(); - goto end; - } - - -#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) - sigset_t set; - VOID(sigemptyset(&set)); // Get mask in use - VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals)); -#endif - thd->init_for_queries(); - - DBUG_ENTER("event_worker_ng_thread"); - DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." - "THD=0x%lx", time(NULL), thd)); - - sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id); - - thd->enable_slow_log= TRUE; - - ret= event->execute(thd); - - evex_print_warnings(thd, event); - - sql_print_information("SCHEDULER: [%s.%s of %s] executed " - " in thread thread %lu. RetCode=%d", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id, ret); - if (ret == EVEX_COMPILE_ERROR) - sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", - event->dbname.str, event->name.str, - event->definer.str); - else if (ret == EVEX_MICROSECOND_UNSUP) - sql_print_information("SCHEDULER: MICROSECOND is not supported"); - -end: - DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, - event->name.str)); - delete event; - - deinit_event_thread(thd); - - DBUG_RETURN(0); // Against gcc warnings -} - - -/* - Performs initialization of the scheduler data, outside of the - threading primitives. - - SYNOPSIS - Event_scheduler_ng::init_scheduler() -*/ - -bool -Event_scheduler_ng::init_scheduler(Event_queue *q) -{ - LOCK_SCHEDULER_DATA(); - thread_id= 0; - state= INITIALIZED; - queue= q; - started_events= 0; - UNLOCK_SCHEDULER_DATA(); - - return FALSE; -} - - -void -Event_scheduler_ng::deinit_scheduler() {} - - -/* - Inits scheduler's threading primitives. - - SYNOPSIS - Event_scheduler_ng::init_mutexes() -*/ - -void -Event_scheduler_ng::init_mutexes() -{ - pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST); - pthread_cond_init(&COND_state, NULL); -} - - -/* - Deinits scheduler's threading primitives. - - SYNOPSIS - Event_scheduler_ng::deinit_mutexes() -*/ - -void -Event_scheduler_ng::deinit_mutexes() -{ - pthread_mutex_destroy(&LOCK_scheduler_state); - pthread_cond_destroy(&COND_state); -} - - -/* - Starts the scheduler (again). Creates a new THD and passes it to - a forked thread. Does not wait for acknowledgement from the new - thread that it has started. Asynchronous starting. Most of the - needed initializations are done in the current thread to minimize - the chance of failure in the spawned thread. - - SYNOPSIS - Event_scheduler_ng::start() - - RETURN VALUE - FALSE OK - TRUE Error (not reported) -*/ - -bool -Event_scheduler_ng::start() -{ - THD *new_thd= NULL; - bool ret= FALSE; - pthread_t th; - DBUG_ENTER("Event_scheduler_ng::start"); - - LOCK_SCHEDULER_DATA(); - DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); - if (state > INITIALIZED) - goto end; - - if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) - { - sql_print_error("SCHEDULER: Cannot init manager event thread."); - ret= TRUE; - goto end; - } - new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER; - new_thd->command= COM_DAEMON; - - scheduler_param_value.thd= new_thd; - scheduler_param_value.scheduler= this; - - DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); - if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread, - (void*)&scheduler_param_value)) - { - DBUG_PRINT("error", ("cannot create a new thread")); - state= INITIALIZED; - ret= TRUE; - } - DBUG_PRINT("info", ("Setting state go RUNNING")); - state= RUNNING; -end: - UNLOCK_SCHEDULER_DATA(); - - if (ret && new_thd) - { - DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); - new_thd->proc_info= "Clearing"; - DBUG_ASSERT(new_thd->net.buff != 0); - net_end(&new_thd->net); - pthread_mutex_lock(&LOCK_thread_count); - thread_count--; - thread_running--; - delete new_thd; - pthread_mutex_unlock(&LOCK_thread_count); - } - DBUG_RETURN(ret); -} - - -/* - Stops the scheduler (again). Waits for acknowledgement from the - scheduler that it has stopped - synchronous stopping. - - SYNOPSIS - Event_scheduler_ng::stop() - - RETURN VALUE - FALSE OK - TRUE Error (not reported) -*/ - -bool -Event_scheduler_ng::stop() -{ - THD *thd= current_thd; - DBUG_ENTER("Event_scheduler_ng::stop"); - DBUG_PRINT("enter", ("thd=0x%lx", current_thd)); - - LOCK_SCHEDULER_DATA(); - DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); - if (state != RUNNING) - goto end; - - state= STOPPING; - - DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); - sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); - - pthread_cond_signal(&COND_state); - - /* Guarantee we don't catch spurious signals */ - sql_print_information("SCHEDULER: Waiting the manager thread to reply"); - do { - DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " - "thread. Current value of state is %s . " - "workers count=%d", scheduler_states_names[state].str, - workers_count())); - /* thd could be 0x0, when shutting down */ - COND_STATE_WAIT(NULL); - } while (state == STOPPING); - DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); - - thread_id= 0; -end: - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(FALSE); -} - - -/* - The main loop of the scheduler. - - SYNOPSIS - Event_scheduler_ng::run() - thd Thread - - RETURN VALUE - FALSE OK - TRUE Error (Serious error) -*/ - -bool -Event_scheduler_ng::run(THD *thd) -{ - int res; - struct timespec abstime; - Event_job_data *job_data; - DBUG_ENTER("Event_scheduler_ng::run"); - - LOCK_SCHEDULER_DATA(); - - thread_id= thd->thread_id; - sql_print_information("SCHEDULER: Manager thread started with id %lu", - thread_id); - /* - Recalculate the values in the queue because there could have been stops - in executions of the scheduler and some times could have passed by. - */ - queue->recalculate_activation_times(thd); - while (state == RUNNING) - { - thd->end_time(); - /* Gets a minimized version */ - if (queue->get_top_for_execution_if_time(thd, thd->query_start(), - &job_data, &abstime)) - { - sql_print_information("SCHEDULER: Serious error during getting next" - " event to execute. Stopping."); - break; - } - - DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " - "abs_time.tv_sec=%d", - job_data, thd->query_start(), abstime.tv_sec)); - if (!job_data && !abstime.tv_sec) - { - DBUG_PRINT("info", ("The queue is empty. Going to sleep")); - thd->enter_cond(&COND_state, &LOCK_scheduler_state, - "Waiting on empty queue"); - COND_STATE_WAIT(NULL); - thd->exit_cond(""); - DBUG_PRINT("info", ("Woke up. Got COND_state")); - LOCK_SCHEDULER_DATA(); - } - else if (abstime.tv_sec) - { - DBUG_PRINT("info", ("Have to sleep some time %u till", - abstime.tv_sec - thd->query_start(), abstime.tv_sec)); - - thd->enter_cond(&COND_state, &LOCK_scheduler_state, - "Waiting for next activation"); - COND_STATE_WAIT(&abstime); - /* - If we get signal we should recalculate the whether it's the right time - because there could be : - 1. Spurious wake-up - 2. The top of the queue was changed (new one becase of create/update) - */ - /* This will do implicit UNLOCK_SCHEDULER_DATA() */ - thd->exit_cond(""); - DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); - LOCK_SCHEDULER_DATA(); - } - else - { - UNLOCK_SCHEDULER_DATA(); - res= execute_top(thd, job_data); - LOCK_SCHEDULER_DATA(); - if (res) - break; - ++started_events; - } - DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); - } - DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); - pthread_cond_signal(&COND_state); -error: - state= INITIALIZED; - UNLOCK_SCHEDULER_DATA(); - sql_print_information("SCHEDULER: Stopped"); - - DBUG_RETURN(res); -} - - -/* - Creates a new THD instance and then forks a new thread, while passing - the THD pointer and job_data to it. - - SYNOPSIS - Event_scheduler_ng::execute_top() - - RETURN VALUE - FALSE OK - TRUE Error (Serious error) -*/ - -bool -Event_scheduler_ng::execute_top(THD *thd, Event_job_data *job_data) -{ - THD *new_thd; - pthread_t th; - int res= 0; - DBUG_ENTER("Event_scheduler_ng::execute_top"); - if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) - goto error; - - new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; - job_data->thd= new_thd; - DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition", - job_data->dbname.str, job_data->name.str)); - - /* Major failure */ - if ((res= pthread_create(&th, &connection_attrib, event_worker_ng_thread, - job_data))) - goto error; - - DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); - DBUG_RETURN(FALSE); - -error: - DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res)); - if (new_thd) - { - new_thd->proc_info= "Clearing"; - DBUG_ASSERT(new_thd->net.buff != 0); - net_end(&new_thd->net); - pthread_mutex_lock(&LOCK_thread_count); - thread_count--; - thread_running--; - delete new_thd; - pthread_mutex_unlock(&LOCK_thread_count); - } - delete job_data; - DBUG_RETURN(TRUE); -} - - -/* - Returns the current state of the scheduler - - SYNOPSIS - Event_scheduler_ng::get_state() - - RETURN VALUE - The state of the scheduler (INITIALIZED | RUNNING | STOPPING) -*/ - -enum Event_scheduler_ng::enum_state -Event_scheduler_ng::get_state() -{ - enum Event_scheduler_ng::enum_state ret; - LOCK_SCHEDULER_DATA(); - ret= state; - UNLOCK_SCHEDULER_DATA(); - return ret; -} - - -/* - Returns the number of living event worker threads. - - SYNOPSIS - Event_scheduler_ng::workers_count() -*/ - -uint -Event_scheduler_ng::workers_count() -{ - THD *tmp; - uint count= 0; - - DBUG_ENTER("Event_scheduler_ng::workers_count"); - pthread_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; - if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) - ++count; - } - pthread_mutex_unlock(&LOCK_thread_count); - DBUG_PRINT("exit", ("%d", count)); - DBUG_RETURN(count); -} - - -/* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_scheduler_ng::queue_changed() -*/ - -void -Event_scheduler_ng::queue_changed() -{ - DBUG_ENTER("Event_scheduler_ng::queue_changed"); - DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", - scheduler_states_names[state].str)); - pthread_cond_signal(&COND_state); - DBUG_VOID_RETURN; -} - - -/* - Auxiliary function for locking LOCK_scheduler_state. Used - by the LOCK_SCHEDULER_DATA macro. - - SYNOPSIS - Event_scheduler_ng::lock_data() - func Which function is requesting mutex lock - line On which line mutex lock is requested -*/ - -void -Event_scheduler_ng::lock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler_ng::lock_data"); - DBUG_PRINT("enter", ("func=%s line=%u", func, line)); - pthread_mutex_lock(&LOCK_scheduler_state); - mutex_last_locked_in_func= func; - mutex_last_locked_at_line= line; - mutex_scheduler_data_locked= TRUE; - DBUG_VOID_RETURN; -} - - -/* - Auxiliary function for unlocking LOCK_scheduler_state. Used - by the UNLOCK_SCHEDULER_DATA macro. - - SYNOPSIS - Event_scheduler_ng::unlock_data() - func Which function is requesting mutex unlock - line On which line mutex unlock is requested -*/ - -void -Event_scheduler_ng::unlock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler_ng::unlock_data"); - DBUG_PRINT("enter", ("func=%s line=%u", func, line)); - mutex_last_unlocked_at_line= line; - mutex_scheduler_data_locked= FALSE; - mutex_last_unlocked_in_func= func; - pthread_mutex_unlock(&LOCK_scheduler_state); - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_cond_wait/timedwait - - SYNOPSIS - Event_scheduler_ng::cond_wait() - cond Conditional to wait for - mutex Mutex of the conditional - - RETURN VALUE - Error code of pthread_cond_wait() -*/ - -void -Event_scheduler_ng::cond_wait(struct timespec *abstime, - const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler_ng::cond_wait"); - waiting_on_cond= TRUE; - mutex_last_unlocked_at_line= line; - mutex_scheduler_data_locked= FALSE; - mutex_last_unlocked_in_func= func; - - if (abstime) - pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime); - else - pthread_cond_wait(&COND_state, &LOCK_scheduler_state); - - mutex_last_locked_in_func= func; - mutex_last_locked_at_line= line; - mutex_scheduler_data_locked= TRUE; - waiting_on_cond= FALSE; - - DBUG_VOID_RETURN; -} - - -/* - Dumps the internal status of the scheduler - - SYNOPSIS - Event_scheduler_ng::dump_internal_status() - thd Thread - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -bool -Event_scheduler_ng::dump_internal_status(THD *thd) -{ - int ret= 0; - DBUG_ENTER("Event_scheduler_ng::dump_internal_status"); - -#ifndef DBUG_OFF - CHARSET_INFO *scs= system_charset_info; - Protocol *protocol= thd->protocol; - char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; - char int_buff[STRING_BUFFER_USUAL_SIZE]; - String tmp_string(tmp_buff, sizeof(tmp_buff), scs); - String int_string(int_buff, sizeof(int_buff), scs); - tmp_string.length(0); - int_string.length(0); - - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler state"), scs); - protocol->store(scheduler_states_names[state].str, - scheduler_states_names[state].length, scs); - - if ((ret= protocol->write())) - goto end; - - /* thread_id */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("thread_id"), scs); - if (thread_id) - { - int_string.set((longlong) thread_id, scs); - protocol->store(&int_string); - } - else - protocol->store_null(); - if ((ret= protocol->write())) - goto end; - - /* last locked at*/ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - mutex_last_locked_in_func, - mutex_last_locked_at_line)); - protocol->store(&tmp_string); - if ((ret= protocol->write())) - goto end; - - /* last unlocked at*/ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - mutex_last_unlocked_in_func, - mutex_last_unlocked_at_line)); - protocol->store(&tmp_string); - if ((ret= protocol->write())) - goto end; - - /* waiting on */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs); - int_string.set((longlong) waiting_on_cond, scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; - - /* workers_count */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler workers count"), scs); - int_string.set((longlong) workers_count(), scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; - - /* workers_count */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler executed events"), scs); - int_string.set((longlong) started_events, scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; - - /* scheduler_data_locked */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); - int_string.set((longlong) mutex_scheduler_data_locked, scs); - protocol->store(&int_string); - ret= protocol->write(); -end: -#endif - - DBUG_RETURN(ret); -} diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h deleted file mode 100644 index e4f3f0588f9..00000000000 --- a/sql/event_scheduler_ng.h +++ /dev/null @@ -1,123 +0,0 @@ -#ifndef _EVENT_SCHEDULER_NG_H_ -#define _EVENT_SCHEDULER_NG_H_ -/* Copyright (C) 2004-2006 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -class Event_queue; -class Event_job_data; - -class Event_scheduler_ng -{ -public: - Event_scheduler_ng(){} - ~Event_scheduler_ng(){} - - enum enum_state - { - INITIALIZED = 0, - RUNNING, - STOPPING - }; - - /* State changing methods follow */ - - bool - start(); - - bool - stop(); - - /* - Need to be public because has to be called from the function - passed to pthread_create. - */ - bool - run(THD *thd); - - bool - init_scheduler(Event_queue *queue); - - void - deinit_scheduler(); - - void - init_mutexes(); - - void - deinit_mutexes(); - - /* Information retrieving methods follow */ - - enum enum_state - get_state(); - - void - queue_changed(); - - bool - dump_internal_status(THD *thd); - -private: - uint - workers_count(); - - /* helper functions */ - bool - execute_top(THD *thd, Event_job_data *job_data); - - /* helper functions for working with mutexes & conditionals */ - void - lock_data(const char *func, uint line); - - void - unlock_data(const char *func, uint line); - - void - cond_wait(struct timespec *abstime, const char *func, uint line); - - pthread_mutex_t LOCK_scheduler_state; - - /* This is the current status of the life-cycle of the scheduler. */ - enum enum_state state; - - /* - Holds the thread id of the executor thread or 0 if the scheduler is not - running. It is used by ::shutdown() to know which thread to kill with - kill_one_thread(). The latter wake ups a thread if it is waiting on a - conditional variable and sets thd->killed to non-zero. - */ - ulong thread_id; - - pthread_cond_t COND_state; - - Event_queue *queue; - - uint mutex_last_locked_at_line; - uint mutex_last_unlocked_at_line; - const char* mutex_last_locked_in_func; - const char* mutex_last_unlocked_in_func; - bool mutex_scheduler_data_locked; - bool waiting_on_cond; - - ulonglong started_events; - -private: - /* Prevent use of these */ - Event_scheduler_ng(const Event_scheduler_ng &); - void operator=(Event_scheduler_ng &); -}; - -#endif /* _EVENT_SCHEDULER_NG_H_ */ diff --git a/sql/events.cc b/sql/events.cc index cfb7a067d63..2a4ccbaf5ef 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -19,7 +19,7 @@ #include "event_data_objects.h" #include "event_db_repository.h" #include "event_queue.h" -#include "event_scheduler_ng.h" +#include "event_scheduler.h" #include "sp_head.h" /* @@ -560,15 +560,15 @@ int Events::init() { DBUG_ENTER("Events::init"); - event_queue->init_queue(db_repository, scheduler_ng); - scheduler_ng->init_scheduler(event_queue); + event_queue->init_queue(db_repository, scheduler); + scheduler->init_scheduler(event_queue); /* it should be an assignment! */ if (opt_event_scheduler) { DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2); if (opt_event_scheduler == 1) - DBUG_RETURN(scheduler_ng->start()); + DBUG_RETURN(scheduler->start()); } DBUG_RETURN(0); @@ -590,8 +590,8 @@ Events::deinit() { DBUG_ENTER("Events::deinit"); - scheduler_ng->stop(); - scheduler_ng->deinit_scheduler(); + scheduler->stop(); + scheduler->deinit_scheduler(); event_queue->deinit_queue(); @@ -617,8 +617,8 @@ Events::init_mutexes() event_queue= new Event_queue; event_queue->init_mutexes(); - scheduler_ng= new Event_scheduler_ng(); - scheduler_ng->init_mutexes(); + scheduler= new Event_scheduler(); + scheduler->init_mutexes(); } @@ -633,9 +633,9 @@ void Events::destroy_mutexes() { event_queue->deinit_mutexes(); - scheduler_ng->deinit_mutexes(); + scheduler->deinit_mutexes(); - delete scheduler_ng; + delete scheduler; delete db_repository; pthread_mutex_destroy(&LOCK_event_metadata); @@ -670,7 +670,7 @@ Events::dump_internal_status(THD *thd) Protocol::SEND_EOF)) DBUG_RETURN(TRUE); - if (scheduler_ng->dump_internal_status(thd) || + if (scheduler->dump_internal_status(thd) || event_queue->dump_internal_status(thd)) DBUG_RETURN(TRUE); @@ -694,7 +694,7 @@ bool Events::start_execution_of_events() { DBUG_ENTER("Events::start_execution_of_events"); - DBUG_RETURN(scheduler_ng->start()); + DBUG_RETURN(scheduler->start()); } @@ -715,7 +715,7 @@ bool Events::stop_execution_of_events() { DBUG_ENTER("Events::stop_execution_of_events"); - DBUG_RETURN(scheduler_ng->stop()); + DBUG_RETURN(scheduler->stop()); } @@ -734,5 +734,5 @@ bool Events::is_started() { DBUG_ENTER("Events::is_started"); - DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING); + DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING); } diff --git a/sql/events.h b/sql/events.h index 58a1081caa9..5f46c7bb7c5 100644 --- a/sql/events.h +++ b/sql/events.h @@ -21,7 +21,7 @@ class Event_parse_data; class Event_db_repository; class Event_queue; class Event_queue_element; -class Event_scheduler_ng; +class Event_scheduler; /* Return codes */ enum enum_events_error_code @@ -117,7 +117,7 @@ private: static Events singleton; Event_queue *event_queue; - Event_scheduler_ng *scheduler_ng; + Event_scheduler *scheduler; Event_db_repository *db_repository; pthread_mutex_t LOCK_event_metadata; |