summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-12 10:37:30 +0200
committerunknown <andrey@lmy004.>2006-07-12 10:37:30 +0200
commit628be8a71611bc86f7f0cf809b27d63bdd9b12c8 (patch)
tree1efc1513a721e18e2fa1fe76b14936f2d9e2d603 /sql
parent42a8e2c9421854710679a0f6c3ceef6c0777ded4 (diff)
downloadmariadb-git-628be8a71611bc86f7f0cf809b27d63bdd9b12c8.tar.gz
WL#3337 (Event scheduler new architecture)
event_scheduler_ng.cc/h is no more BitKeeper/deleted/.del-event_scheduler_ng.cc~8896b89040dbc4f6: Delete: sql/event_scheduler_ng.cc BitKeeper/deleted/.del-event_scheduler_ng.h~1431af5b185376f: Delete: sql/event_scheduler_ng.h mysql-test/r/not_embedded_server.result: fix test sql/Makefile.am: event_scheduler_ng.cc/h is no more sql/event_queue.cc: event_scheduler_ng.cc/h is no more sql/event_queue.h: event_scheduler_ng.cc/h is no more sql/event_scheduler.cc: event_scheduler_ng.cc/h is no more sql/event_scheduler.h: event_scheduler_ng.cc/h is no more sql/events.cc: event_scheduler_ng.cc/h is no more sql/events.h: event_scheduler_ng.cc/h is no more
Diffstat (limited to 'sql')
-rw-r--r--sql/Makefile.am4
-rw-r--r--sql/event_queue.cc4
-rw-r--r--sql/event_queue.h6
-rw-r--r--sql/event_scheduler.cc865
-rw-r--r--sql/event_scheduler.h104
-rw-r--r--sql/event_scheduler_ng.cc881
-rw-r--r--sql/event_scheduler_ng.h123
-rw-r--r--sql/events.cc28
-rw-r--r--sql/events.h4
9 files changed, 992 insertions, 1027 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am
index 4d76cdb5080..3518903d149 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_array.h sql_cursor.h events.h \
sql_plugin.h authors.h sql_partition.h event_data_objects.h \
event_queue.h event_db_repository.h \
- partition_info.h partition_element.h event_scheduler_ng.h \
+ partition_info.h partition_element.h event_scheduler.h \
contributors.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -105,7 +105,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\
- event_scheduler_ng.cc events.cc event_data_objects.cc \
+ events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 63dee303fc2..12eceee8cfb 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -18,7 +18,7 @@
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
-#include "event_scheduler_ng.h"
+#include "event_scheduler.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
@@ -123,7 +123,7 @@ Event_queue::deinit_mutexes()
*/
bool
-Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
+Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
{
int i= 0;
bool ret= FALSE;
diff --git a/sql/event_queue.h b/sql/event_queue.h
index 15a28b4103d..b7962d14a89 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -22,7 +22,7 @@ class Event_job_data;
class Event_queue_element;
class THD;
-class Event_scheduler_ng;
+class Event_scheduler;
class Event_queue
{
@@ -36,7 +36,7 @@ public:
deinit_mutexes();
bool
- init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched);
+ init_queue(Event_db_repository *db_repo, Event_scheduler *sched);
void
deinit_queue();
@@ -109,7 +109,7 @@ protected:
void
dbug_dump_queue(time_t now);
- Event_scheduler_ng *scheduler;
+ Event_scheduler *scheduler;
/* The sorted queue with the Event_job_data objects */
QUEUE queue;
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index f1c7d8394e3..85d357ec11e 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -14,3 +14,868 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "mysql_priv.h"
+#include "events.h"
+#include "event_data_objects.h"
+#include "event_scheduler.h"
+#include "event_queue.h"
+
+#ifdef __GNUC__
+#if __GNUC__ >= 2
+#define SCHED_FUNC __FUNCTION__
+#endif
+#else
+#define SCHED_FUNC "<unknown>"
+#endif
+
+#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
+#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
+#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
+
+extern pthread_attr_t connection_attrib;
+
+struct scheduler_param
+{
+ THD *thd;
+ Event_scheduler *scheduler;
+};
+
+struct scheduler_param scheduler_param_value;
+
+
+
+static
+LEX_STRING scheduler_states_names[] =
+{
+ { C_STRING_WITH_LEN("INITIALIZED")},
+ { C_STRING_WITH_LEN("RUNNING")},
+ { C_STRING_WITH_LEN("STOPPING")}
+};
+
+
+/*
+ Prints the stack of infos, warnings, errors from thd to
+ the console so it can be fetched by the logs-into-tables and
+ checked later.
+
+ SYNOPSIS
+ evex_print_warnings
+ thd Thread used during the execution of the event
+ et The event itself
+*/
+
+static void
+evex_print_warnings(THD *thd, Event_job_data *et)
+{
+ MYSQL_ERROR *err;
+ DBUG_ENTER("evex_print_warnings");
+ if (!thd->warn_list.elements)
+ DBUG_VOID_RETURN;
+
+ char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
+ char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
+ String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
+ prefix.length(0);
+ prefix.append("SCHEDULER: [");
+
+ append_identifier(thd, &prefix, et->definer.str, et->definer.length);
+ prefix.append("][", 2);
+ append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
+ prefix.append('.');
+ append_identifier(thd,&prefix, et->name.str, et->name.length);
+ prefix.append("] ", 2);
+
+ List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ while ((err= it++))
+ {
+ String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
+ /* set it to 0 or we start adding at the end. That's the trick ;) */
+ err_msg.length(0);
+ err_msg.append(prefix);
+ err_msg.append(err->msg, strlen(err->msg), system_charset_info);
+ err_msg.append("]");
+ DBUG_ASSERT(err->level < 3);
+ (sql_print_message_handlers[err->level])("%*s", err_msg.length(),
+ err_msg.c_ptr());
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Inits an scheduler thread handler, both the main and a worker
+
+ SYNOPSIS
+ init_event_thread()
+ thd - the THD of the thread. Has to be allocated by the caller.
+
+ NOTES
+ 1. The host of the thead is my_localhost
+ 2. thd->net is initted with NULL - no communication.
+
+ RETURN VALUE
+ 0 OK
+ -1 Error
+*/
+
+static int
+init_scheduler_thread(THD* thd)
+{
+ DBUG_ENTER("init_event_thread");
+ thd->client_capabilities= 0;
+ thd->security_ctx->master_access= 0;
+ thd->security_ctx->db_access= 0;
+ thd->security_ctx->host_or_ip= (char*)my_localhost;
+ thd->security_ctx->set_user((char*)"event_scheduler");
+ my_net_init(&thd->net, NULL);
+ thd->net.read_timeout= slave_net_timeout;
+ thd->slave_thread= 0;
+ thd->options|= OPTION_AUTO_IS_NULL;
+ thd->client_capabilities|= CLIENT_MULTI_RESULTS;
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id= thread_id++;
+ threads.append(thd);
+ thread_count++;
+ thread_running++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ /*
+ Guarantees that we will see the thread in SHOW PROCESSLIST though its
+ vio is NULL.
+ */
+
+ thd->proc_info= "Initialized";
+ thd->version= refresh_version;
+ thd->set_time();
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Cleans up the THD and the threaded environment of the thread.
+
+ SYNOPSIS
+ deinit_event_thread()
+ thd Thread
+*/
+
+static void
+deinit_event_thread(THD *thd)
+{
+ thd->proc_info= "Clearing";
+ DBUG_ASSERT(thd->net.buff != 0);
+ net_end(&thd->net);
+ DBUG_PRINT("exit", ("Scheduler thread finishing"));
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ my_thread_end();
+}
+
+
+/*
+ Function that executes the scheduler,
+
+ SYNOPSIS
+ event_scheduler_thread()
+ arg Pointer to `struct scheduler_param`
+
+ RETURN VALUE
+ 0 OK
+*/
+
+pthread_handler_t
+event_scheduler_thread(void *arg)
+{
+ /* needs to be first for thread_stack */
+ THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
+
+ thd->thread_stack= (char *)&thd; // remember where our stack is
+ DBUG_ENTER("event_scheduler_thread");
+
+ my_thread_init();
+ pthread_detach_this_thread();
+ thd->real_id=pthread_self();
+ if (init_thr_lock() || thd->store_globals())
+ {
+ thd->cleanup();
+ goto end;
+ }
+
+#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
+#endif
+
+ ((struct scheduler_param *) arg)->scheduler->run(thd);
+
+end:
+ deinit_event_thread(thd);
+
+ DBUG_RETURN(0); // Against gcc warnings
+}
+
+
+/*
+ Function that executes an event in a child thread. Setups the
+ environment for the event execution and cleans after that.
+
+ SYNOPSIS
+ event_worker_thread()
+ arg The Event_job_data object to be processed
+
+ RETURN VALUE
+ 0 OK
+*/
+
+pthread_handler_t
+event_worker_thread(void *arg)
+{
+ /* needs to be first for thread_stack */
+ THD *thd;
+ Event_job_data *event= (Event_job_data *)arg;
+ int ret;
+
+ thd= event->thd;
+ thd->thread_stack= (char *) &thd;
+
+
+ my_thread_init();
+ pthread_detach_this_thread();
+ thd->real_id=pthread_self();
+ if (init_thr_lock() || thd->store_globals())
+ {
+ thd->cleanup();
+ goto end;
+ }
+
+
+#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
+#endif
+ thd->init_for_queries();
+
+ DBUG_ENTER("event_worker_thread");
+ DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
+ "THD=0x%lx", time(NULL), thd));
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
+ event->dbname.str, event->name.str,
+ event->definer.str, thd->thread_id);
+
+ thd->enable_slow_log= TRUE;
+
+ ret= event->execute(thd);
+
+ evex_print_warnings(thd, event);
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed "
+ " in thread thread %lu. RetCode=%d",
+ event->dbname.str, event->name.str,
+ event->definer.str, thd->thread_id, ret);
+ if (ret == EVEX_COMPILE_ERROR)
+ sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ event->dbname.str, event->name.str,
+ event->definer.str);
+ else if (ret == EVEX_MICROSECOND_UNSUP)
+ sql_print_information("SCHEDULER: MICROSECOND is not supported");
+
+end:
+ DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
+ event->name.str));
+ delete event;
+
+ deinit_event_thread(thd);
+
+ DBUG_RETURN(0); // Against gcc warnings
+}
+
+
+/*
+ Performs initialization of the scheduler data, outside of the
+ threading primitives.
+
+ SYNOPSIS
+ Event_scheduler::init_scheduler()
+*/
+
+bool
+Event_scheduler::init_scheduler(Event_queue *q)
+{
+ LOCK_SCHEDULER_DATA();
+ thread_id= 0;
+ state= INITIALIZED;
+ queue= q;
+ started_events= 0;
+ UNLOCK_SCHEDULER_DATA();
+
+ return FALSE;
+}
+
+
+void
+Event_scheduler::deinit_scheduler() {}
+
+
+/*
+ Inits scheduler's threading primitives.
+
+ SYNOPSIS
+ Event_scheduler::init_mutexes()
+*/
+
+void
+Event_scheduler::init_mutexes()
+{
+ pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_state, NULL);
+}
+
+
+/*
+ Deinits scheduler's threading primitives.
+
+ SYNOPSIS
+ Event_scheduler::deinit_mutexes()
+*/
+
+void
+Event_scheduler::deinit_mutexes()
+{
+ pthread_mutex_destroy(&LOCK_scheduler_state);
+ pthread_cond_destroy(&COND_state);
+}
+
+
+/*
+ Starts the scheduler (again). Creates a new THD and passes it to
+ a forked thread. Does not wait for acknowledgement from the new
+ thread that it has started. Asynchronous starting. Most of the
+ needed initializations are done in the current thread to minimize
+ the chance of failure in the spawned thread.
+
+ SYNOPSIS
+ Event_scheduler::start()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (not reported)
+*/
+
+bool
+Event_scheduler::start()
+{
+ THD *new_thd= NULL;
+ bool ret= FALSE;
+ pthread_t th;
+ DBUG_ENTER("Event_scheduler::start");
+
+ LOCK_SCHEDULER_DATA();
+ DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
+ if (state > INITIALIZED)
+ goto end;
+
+ if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
+ {
+ sql_print_error("SCHEDULER: Cannot init manager event thread.");
+ ret= TRUE;
+ goto end;
+ }
+ new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
+ new_thd->command= COM_DAEMON;
+
+ scheduler_param_value.thd= new_thd;
+ scheduler_param_value.scheduler= this;
+
+ DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
+ if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
+ (void*)&scheduler_param_value))
+ {
+ DBUG_PRINT("error", ("cannot create a new thread"));
+ state= INITIALIZED;
+ ret= TRUE;
+ }
+ DBUG_PRINT("info", ("Setting state go RUNNING"));
+ state= RUNNING;
+end:
+ UNLOCK_SCHEDULER_DATA();
+
+ if (ret && new_thd)
+ {
+ DBUG_PRINT("info", ("There was an error during THD creation. Clean up"));
+ new_thd->proc_info= "Clearing";
+ DBUG_ASSERT(new_thd->net.buff != 0);
+ net_end(&new_thd->net);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete new_thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ }
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Stops the scheduler (again). Waits for acknowledgement from the
+ scheduler that it has stopped - synchronous stopping.
+
+ SYNOPSIS
+ Event_scheduler::stop()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (not reported)
+*/
+
+bool
+Event_scheduler::stop()
+{
+ THD *thd= current_thd;
+ DBUG_ENTER("Event_scheduler::stop");
+ DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
+
+ LOCK_SCHEDULER_DATA();
+ DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
+ if (state != RUNNING)
+ goto end;
+
+ state= STOPPING;
+
+ DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
+ sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
+
+ pthread_cond_signal(&COND_state);
+
+ /* Guarantee we don't catch spurious signals */
+ sql_print_information("SCHEDULER: Waiting the manager thread to reply");
+ do {
+ DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
+ "thread. Current value of state is %s . "
+ "workers count=%d", scheduler_states_names[state].str,
+ workers_count()));
+ /* thd could be 0x0, when shutting down */
+ COND_STATE_WAIT(NULL);
+ } while (state == STOPPING);
+ DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
+
+ thread_id= 0;
+end:
+ UNLOCK_SCHEDULER_DATA();
+ DBUG_RETURN(FALSE);
+}
+
+
+/*
+ The main loop of the scheduler.
+
+ SYNOPSIS
+ Event_scheduler::run()
+ thd Thread
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (Serious error)
+*/
+
+bool
+Event_scheduler::run(THD *thd)
+{
+ int res;
+ struct timespec abstime;
+ Event_job_data *job_data;
+ DBUG_ENTER("Event_scheduler::run");
+
+ LOCK_SCHEDULER_DATA();
+
+ thread_id= thd->thread_id;
+ sql_print_information("SCHEDULER: Manager thread started with id %lu",
+ thread_id);
+ /*
+ Recalculate the values in the queue because there could have been stops
+ in executions of the scheduler and some times could have passed by.
+ */
+ queue->recalculate_activation_times(thd);
+ while (state == RUNNING)
+ {
+ thd->end_time();
+ /* Gets a minimized version */
+ if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
+ &job_data, &abstime))
+ {
+ sql_print_information("SCHEDULER: Serious error during getting next"
+ " event to execute. Stopping.");
+ break;
+ }
+
+ DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
+ "abs_time.tv_sec=%d",
+ job_data, thd->query_start(), abstime.tv_sec));
+ if (!job_data && !abstime.tv_sec)
+ {
+ DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
+ thd->enter_cond(&COND_state, &LOCK_scheduler_state,
+ "Waiting on empty queue");
+ COND_STATE_WAIT(NULL);
+ thd->exit_cond("");
+ DBUG_PRINT("info", ("Woke up. Got COND_state"));
+ LOCK_SCHEDULER_DATA();
+ }
+ else if (abstime.tv_sec)
+ {
+ DBUG_PRINT("info", ("Have to sleep some time %u till",
+ abstime.tv_sec - thd->query_start(), abstime.tv_sec));
+
+ thd->enter_cond(&COND_state, &LOCK_scheduler_state,
+ "Waiting for next activation");
+ COND_STATE_WAIT(&abstime);
+ /*
+ If we get signal we should recalculate the whether it's the right time
+ because there could be :
+ 1. Spurious wake-up
+ 2. The top of the queue was changed (new one becase of create/update)
+ */
+ /* This will do implicit UNLOCK_SCHEDULER_DATA() */
+ thd->exit_cond("");
+ DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
+ LOCK_SCHEDULER_DATA();
+ }
+ else
+ {
+ UNLOCK_SCHEDULER_DATA();
+ res= execute_top(thd, job_data);
+ LOCK_SCHEDULER_DATA();
+ if (res)
+ break;
+ ++started_events;
+ }
+ DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
+ }
+ DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
+ pthread_cond_signal(&COND_state);
+error:
+ state= INITIALIZED;
+ UNLOCK_SCHEDULER_DATA();
+ sql_print_information("SCHEDULER: Stopped");
+
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Creates a new THD instance and then forks a new thread, while passing
+ the THD pointer and job_data to it.
+
+ SYNOPSIS
+ Event_scheduler::execute_top()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (Serious error)
+*/
+
+bool
+Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
+{
+ THD *new_thd;
+ pthread_t th;
+ int res= 0;
+ DBUG_ENTER("Event_scheduler::execute_top");
+ if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
+ goto error;
+
+ new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
+ job_data->thd= new_thd;
+ DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
+ job_data->dbname.str, job_data->name.str));
+
+ /* Major failure */
+ if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
+ job_data)))
+ goto error;
+
+ DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd));
+ DBUG_RETURN(FALSE);
+
+error:
+ DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res));
+ if (new_thd)
+ {
+ new_thd->proc_info= "Clearing";
+ DBUG_ASSERT(new_thd->net.buff != 0);
+ net_end(&new_thd->net);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete new_thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ }
+ delete job_data;
+ DBUG_RETURN(TRUE);
+}
+
+
+/*
+ Returns the current state of the scheduler
+
+ SYNOPSIS
+ Event_scheduler::get_state()
+
+ RETURN VALUE
+ The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
+*/
+
+enum Event_scheduler::enum_state
+Event_scheduler::get_state()
+{
+ enum Event_scheduler::enum_state ret;
+ LOCK_SCHEDULER_DATA();
+ ret= state;
+ UNLOCK_SCHEDULER_DATA();
+ return ret;
+}
+
+
+/*
+ Returns the number of living event worker threads.
+
+ SYNOPSIS
+ Event_scheduler::workers_count()
+*/
+
+uint
+Event_scheduler::workers_count()
+{
+ THD *tmp;
+ uint count= 0;
+
+ DBUG_ENTER("Event_scheduler::workers_count");
+ pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (tmp->command == COM_DAEMON)
+ continue;
+ if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
+ ++count;
+ }
+ pthread_mutex_unlock(&LOCK_thread_count);
+ DBUG_PRINT("exit", ("%d", count));
+ DBUG_RETURN(count);
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_scheduler::queue_changed()
+*/
+
+void
+Event_scheduler::queue_changed()
+{
+ DBUG_ENTER("Event_scheduler::queue_changed");
+ DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ",
+ scheduler_states_names[state].str));
+ pthread_cond_signal(&COND_state);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Auxiliary function for locking LOCK_scheduler_state. Used
+ by the LOCK_SCHEDULER_DATA macro.
+
+ SYNOPSIS
+ Event_scheduler::lock_data()
+ func Which function is requesting mutex lock
+ line On which line mutex lock is requested
+*/
+
+void
+Event_scheduler::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::lock_data");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
+ pthread_mutex_lock(&LOCK_scheduler_state);
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_scheduler_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Auxiliary function for unlocking LOCK_scheduler_state. Used
+ by the UNLOCK_SCHEDULER_DATA macro.
+
+ SYNOPSIS
+ Event_scheduler::unlock_data()
+ func Which function is requesting mutex unlock
+ line On which line mutex unlock is requested
+*/
+
+void
+Event_scheduler::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::unlock_data");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
+ mutex_last_unlocked_at_line= line;
+ mutex_scheduler_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+ pthread_mutex_unlock(&LOCK_scheduler_state);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_cond_wait/timedwait
+
+ SYNOPSIS
+ Event_scheduler::cond_wait()
+ cond Conditional to wait for
+ mutex Mutex of the conditional
+
+ RETURN VALUE
+ Error code of pthread_cond_wait()
+*/
+
+void
+Event_scheduler::cond_wait(struct timespec *abstime,
+ const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::cond_wait");
+ waiting_on_cond= TRUE;
+ mutex_last_unlocked_at_line= line;
+ mutex_scheduler_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+
+ if (abstime)
+ pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
+ else
+ pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
+
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_scheduler_data_locked= TRUE;
+ waiting_on_cond= FALSE;
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Dumps the internal status of the scheduler
+
+ SYNOPSIS
+ Event_scheduler::dump_internal_status()
+ thd Thread
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error
+*/
+
+bool
+Event_scheduler::dump_internal_status(THD *thd)
+{
+ int ret= 0;
+ DBUG_ENTER("Event_scheduler::dump_internal_status");
+
+#ifndef DBUG_OFF
+ CHARSET_INFO *scs= system_charset_info;
+ Protocol *protocol= thd->protocol;
+ char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
+ char int_buff[STRING_BUFFER_USUAL_SIZE];
+ String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
+ String int_string(int_buff, sizeof(int_buff), scs);
+ tmp_string.length(0);
+ int_string.length(0);
+
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler state"), scs);
+ protocol->store(scheduler_states_names[state].str,
+ scheduler_states_names[state].length, scs);
+
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* thread_id */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("thread_id"), scs);
+ if (thread_id)
+ {
+ int_string.set((longlong) thread_id, scs);
+ protocol->store(&int_string);
+ }
+ else
+ protocol->store_null();
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* last locked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_locked_in_func,
+ mutex_last_locked_at_line));
+ protocol->store(&tmp_string);
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* last unlocked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_unlocked_in_func,
+ mutex_last_unlocked_at_line));
+ protocol->store(&tmp_string);
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* waiting on */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
+ int_string.set((longlong) waiting_on_cond, scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* workers_count */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
+ int_string.set((longlong) workers_count(), scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* workers_count */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
+ int_string.set((longlong) started_events, scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ goto end;
+
+ /* scheduler_data_locked */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
+ int_string.set((longlong) mutex_scheduler_data_locked, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+end:
+#endif
+
+ DBUG_RETURN(ret);
+}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index acd0debe391..bf3e8e63e11 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -16,4 +16,108 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+class Event_queue;
+class Event_job_data;
+
+class Event_scheduler
+{
+public:
+ Event_scheduler(){}
+ ~Event_scheduler(){}
+
+ enum enum_state
+ {
+ INITIALIZED = 0,
+ RUNNING,
+ STOPPING
+ };
+
+ /* State changing methods follow */
+
+ bool
+ start();
+
+ bool
+ stop();
+
+ /*
+ Need to be public because has to be called from the function
+ passed to pthread_create.
+ */
+ bool
+ run(THD *thd);
+
+ bool
+ init_scheduler(Event_queue *queue);
+
+ void
+ deinit_scheduler();
+
+ void
+ init_mutexes();
+
+ void
+ deinit_mutexes();
+
+ /* Information retrieving methods follow */
+
+ enum enum_state
+ get_state();
+
+ void
+ queue_changed();
+
+ bool
+ dump_internal_status(THD *thd);
+
+private:
+ uint
+ workers_count();
+
+ /* helper functions */
+ bool
+ execute_top(THD *thd, Event_job_data *job_data);
+
+ /* helper functions for working with mutexes & conditionals */
+ void
+ lock_data(const char *func, uint line);
+
+ void
+ unlock_data(const char *func, uint line);
+
+ void
+ cond_wait(struct timespec *abstime, const char *func, uint line);
+
+ pthread_mutex_t LOCK_scheduler_state;
+
+ /* This is the current status of the life-cycle of the scheduler. */
+ enum enum_state state;
+
+ /*
+ Holds the thread id of the executor thread or 0 if the scheduler is not
+ running. It is used by ::shutdown() to know which thread to kill with
+ kill_one_thread(). The latter wake ups a thread if it is waiting on a
+ conditional variable and sets thd->killed to non-zero.
+ */
+ ulong thread_id;
+
+ pthread_cond_t COND_state;
+
+ Event_queue *queue;
+
+ uint mutex_last_locked_at_line;
+ uint mutex_last_unlocked_at_line;
+ const char* mutex_last_locked_in_func;
+ const char* mutex_last_unlocked_in_func;
+ bool mutex_scheduler_data_locked;
+ bool waiting_on_cond;
+
+ ulonglong started_events;
+
+private:
+ /* Prevent use of these */
+ Event_scheduler(const Event_scheduler &);
+ void operator=(Event_scheduler &);
+};
+
#endif /* _EVENT_SCHEDULER_H_ */
diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc
deleted file mode 100644
index 1f004d0b05e..00000000000
--- a/sql/event_scheduler_ng.cc
+++ /dev/null
@@ -1,881 +0,0 @@
-/* Copyright (C) 2004-2006 MySQL AB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-#include "mysql_priv.h"
-#include "events.h"
-#include "event_data_objects.h"
-#include "event_scheduler_ng.h"
-#include "event_queue.h"
-
-#ifdef __GNUC__
-#if __GNUC__ >= 2
-#define SCHED_FUNC __FUNCTION__
-#endif
-#else
-#define SCHED_FUNC "<unknown>"
-#endif
-
-#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
-#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
-#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
-
-extern pthread_attr_t connection_attrib;
-
-struct scheduler_param
-{
- THD *thd;
- Event_scheduler_ng *scheduler;
-};
-
-struct scheduler_param scheduler_param_value;
-
-
-
-static
-LEX_STRING scheduler_states_names[] =
-{
- { C_STRING_WITH_LEN("INITIALIZED")},
- { C_STRING_WITH_LEN("RUNNING")},
- { C_STRING_WITH_LEN("STOPPING")}
-};
-
-
-/*
- Prints the stack of infos, warnings, errors from thd to
- the console so it can be fetched by the logs-into-tables and
- checked later.
-
- SYNOPSIS
- evex_print_warnings
- thd Thread used during the execution of the event
- et The event itself
-*/
-
-static void
-evex_print_warnings(THD *thd, Event_job_data *et)
-{
- MYSQL_ERROR *err;
- DBUG_ENTER("evex_print_warnings");
- if (!thd->warn_list.elements)
- DBUG_VOID_RETURN;
-
- char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
- char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
- String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
- prefix.length(0);
- prefix.append("SCHEDULER: [");
-
- append_identifier(thd, &prefix, et->definer.str, et->definer.length);
- prefix.append("][", 2);
- append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
- prefix.append('.');
- append_identifier(thd,&prefix, et->name.str, et->name.length);
- prefix.append("] ", 2);
-
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
- while ((err= it++))
- {
- String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
- /* set it to 0 or we start adding at the end. That's the trick ;) */
- err_msg.length(0);
- err_msg.append(prefix);
- err_msg.append(err->msg, strlen(err->msg), system_charset_info);
- err_msg.append("]");
- DBUG_ASSERT(err->level < 3);
- (sql_print_message_handlers[err->level])("%*s", err_msg.length(),
- err_msg.c_ptr());
- }
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Inits an scheduler thread handler, both the main and a worker
-
- SYNOPSIS
- init_event_thread()
- thd - the THD of the thread. Has to be allocated by the caller.
-
- NOTES
- 1. The host of the thead is my_localhost
- 2. thd->net is initted with NULL - no communication.
-
- RETURN VALUE
- 0 OK
- -1 Error
-*/
-
-static int
-init_scheduler_thread(THD* thd)
-{
- DBUG_ENTER("init_event_thread");
- thd->client_capabilities= 0;
- thd->security_ctx->master_access= 0;
- thd->security_ctx->db_access= 0;
- thd->security_ctx->host_or_ip= (char*)my_localhost;
- thd->security_ctx->set_user((char*)"event_scheduler");
- my_net_init(&thd->net, NULL);
- thd->net.read_timeout= slave_net_timeout;
- thd->slave_thread= 0;
- thd->options|= OPTION_AUTO_IS_NULL;
- thd->client_capabilities|= CLIENT_MULTI_RESULTS;
- pthread_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thread_id++;
- threads.append(thd);
- thread_count++;
- thread_running++;
- pthread_mutex_unlock(&LOCK_thread_count);
-
- /*
- Guarantees that we will see the thread in SHOW PROCESSLIST though its
- vio is NULL.
- */
-
- thd->proc_info= "Initialized";
- thd->version= refresh_version;
- thd->set_time();
-
- DBUG_RETURN(0);
-}
-
-
-/*
- Cleans up the THD and the threaded environment of the thread.
-
- SYNOPSIS
- deinit_event_thread()
- thd Thread
-*/
-
-static void
-deinit_event_thread(THD *thd)
-{
- thd->proc_info= "Clearing";
- DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net);
- DBUG_PRINT("exit", ("Scheduler thread finishing"));
- pthread_mutex_lock(&LOCK_thread_count);
- thread_count--;
- thread_running--;
- delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
-
- my_thread_end();
-}
-
-
-/*
- Function that executes the scheduler,
-
- SYNOPSIS
- event_scheduler_ng_thread()
- arg Pointer to `struct scheduler_param`
-
- RETURN VALUE
- 0 OK
-*/
-
-pthread_handler_t
-event_scheduler_ng_thread(void *arg)
-{
- /* needs to be first for thread_stack */
- THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
-
- thd->thread_stack= (char *)&thd; // remember where our stack is
- DBUG_ENTER("event_scheduler_ng_thread");
-
- my_thread_init();
- pthread_detach_this_thread();
- thd->real_id=pthread_self();
- if (init_thr_lock() || thd->store_globals())
- {
- thd->cleanup();
- goto end;
- }
-
-#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
- sigset_t set;
- VOID(sigemptyset(&set)); // Get mask in use
- VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
-#endif
-
- ((struct scheduler_param *) arg)->scheduler->run(thd);
-
-end:
- deinit_event_thread(thd);
-
- DBUG_RETURN(0); // Against gcc warnings
-}
-
-
-/*
- Function that executes an event in a child thread. Setups the
- environment for the event execution and cleans after that.
-
- SYNOPSIS
- event_worker_ng_thread()
- arg The Event_job_data object to be processed
-
- RETURN VALUE
- 0 OK
-*/
-
-pthread_handler_t
-event_worker_ng_thread(void *arg)
-{
- /* needs to be first for thread_stack */
- THD *thd;
- Event_job_data *event= (Event_job_data *)arg;
- int ret;
-
- thd= event->thd;
- thd->thread_stack= (char *) &thd;
-
-
- my_thread_init();
- pthread_detach_this_thread();
- thd->real_id=pthread_self();
- if (init_thr_lock() || thd->store_globals())
- {
- thd->cleanup();
- goto end;
- }
-
-
-#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
- sigset_t set;
- VOID(sigemptyset(&set)); // Get mask in use
- VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
-#endif
- thd->init_for_queries();
-
- DBUG_ENTER("event_worker_ng_thread");
- DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
- "THD=0x%lx", time(NULL), thd));
-
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id);
-
- thd->enable_slow_log= TRUE;
-
- ret= event->execute(thd);
-
- evex_print_warnings(thd, event);
-
- sql_print_information("SCHEDULER: [%s.%s of %s] executed "
- " in thread thread %lu. RetCode=%d",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id, ret);
- if (ret == EVEX_COMPILE_ERROR)
- sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
- event->dbname.str, event->name.str,
- event->definer.str);
- else if (ret == EVEX_MICROSECOND_UNSUP)
- sql_print_information("SCHEDULER: MICROSECOND is not supported");
-
-end:
- DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
- event->name.str));
- delete event;
-
- deinit_event_thread(thd);
-
- DBUG_RETURN(0); // Against gcc warnings
-}
-
-
-/*
- Performs initialization of the scheduler data, outside of the
- threading primitives.
-
- SYNOPSIS
- Event_scheduler_ng::init_scheduler()
-*/
-
-bool
-Event_scheduler_ng::init_scheduler(Event_queue *q)
-{
- LOCK_SCHEDULER_DATA();
- thread_id= 0;
- state= INITIALIZED;
- queue= q;
- started_events= 0;
- UNLOCK_SCHEDULER_DATA();
-
- return FALSE;
-}
-
-
-void
-Event_scheduler_ng::deinit_scheduler() {}
-
-
-/*
- Inits scheduler's threading primitives.
-
- SYNOPSIS
- Event_scheduler_ng::init_mutexes()
-*/
-
-void
-Event_scheduler_ng::init_mutexes()
-{
- pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&COND_state, NULL);
-}
-
-
-/*
- Deinits scheduler's threading primitives.
-
- SYNOPSIS
- Event_scheduler_ng::deinit_mutexes()
-*/
-
-void
-Event_scheduler_ng::deinit_mutexes()
-{
- pthread_mutex_destroy(&LOCK_scheduler_state);
- pthread_cond_destroy(&COND_state);
-}
-
-
-/*
- Starts the scheduler (again). Creates a new THD and passes it to
- a forked thread. Does not wait for acknowledgement from the new
- thread that it has started. Asynchronous starting. Most of the
- needed initializations are done in the current thread to minimize
- the chance of failure in the spawned thread.
-
- SYNOPSIS
- Event_scheduler_ng::start()
-
- RETURN VALUE
- FALSE OK
- TRUE Error (not reported)
-*/
-
-bool
-Event_scheduler_ng::start()
-{
- THD *new_thd= NULL;
- bool ret= FALSE;
- pthread_t th;
- DBUG_ENTER("Event_scheduler_ng::start");
-
- LOCK_SCHEDULER_DATA();
- DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
- if (state > INITIALIZED)
- goto end;
-
- if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
- {
- sql_print_error("SCHEDULER: Cannot init manager event thread.");
- ret= TRUE;
- goto end;
- }
- new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
- new_thd->command= COM_DAEMON;
-
- scheduler_param_value.thd= new_thd;
- scheduler_param_value.scheduler= this;
-
- DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
- if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread,
- (void*)&scheduler_param_value))
- {
- DBUG_PRINT("error", ("cannot create a new thread"));
- state= INITIALIZED;
- ret= TRUE;
- }
- DBUG_PRINT("info", ("Setting state go RUNNING"));
- state= RUNNING;
-end:
- UNLOCK_SCHEDULER_DATA();
-
- if (ret && new_thd)
- {
- DBUG_PRINT("info", ("There was an error during THD creation. Clean up"));
- new_thd->proc_info= "Clearing";
- DBUG_ASSERT(new_thd->net.buff != 0);
- net_end(&new_thd->net);
- pthread_mutex_lock(&LOCK_thread_count);
- thread_count--;
- thread_running--;
- delete new_thd;
- pthread_mutex_unlock(&LOCK_thread_count);
- }
- DBUG_RETURN(ret);
-}
-
-
-/*
- Stops the scheduler (again). Waits for acknowledgement from the
- scheduler that it has stopped - synchronous stopping.
-
- SYNOPSIS
- Event_scheduler_ng::stop()
-
- RETURN VALUE
- FALSE OK
- TRUE Error (not reported)
-*/
-
-bool
-Event_scheduler_ng::stop()
-{
- THD *thd= current_thd;
- DBUG_ENTER("Event_scheduler_ng::stop");
- DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
-
- LOCK_SCHEDULER_DATA();
- DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
- if (state != RUNNING)
- goto end;
-
- state= STOPPING;
-
- DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
- sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
-
- pthread_cond_signal(&COND_state);
-
- /* Guarantee we don't catch spurious signals */
- sql_print_information("SCHEDULER: Waiting the manager thread to reply");
- do {
- DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
- "thread. Current value of state is %s . "
- "workers count=%d", scheduler_states_names[state].str,
- workers_count()));
- /* thd could be 0x0, when shutting down */
- COND_STATE_WAIT(NULL);
- } while (state == STOPPING);
- DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
-
- thread_id= 0;
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(FALSE);
-}
-
-
-/*
- The main loop of the scheduler.
-
- SYNOPSIS
- Event_scheduler_ng::run()
- thd Thread
-
- RETURN VALUE
- FALSE OK
- TRUE Error (Serious error)
-*/
-
-bool
-Event_scheduler_ng::run(THD *thd)
-{
- int res;
- struct timespec abstime;
- Event_job_data *job_data;
- DBUG_ENTER("Event_scheduler_ng::run");
-
- LOCK_SCHEDULER_DATA();
-
- thread_id= thd->thread_id;
- sql_print_information("SCHEDULER: Manager thread started with id %lu",
- thread_id);
- /*
- Recalculate the values in the queue because there could have been stops
- in executions of the scheduler and some times could have passed by.
- */
- queue->recalculate_activation_times(thd);
- while (state == RUNNING)
- {
- thd->end_time();
- /* Gets a minimized version */
- if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
- &job_data, &abstime))
- {
- sql_print_information("SCHEDULER: Serious error during getting next"
- " event to execute. Stopping.");
- break;
- }
-
- DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
- "abs_time.tv_sec=%d",
- job_data, thd->query_start(), abstime.tv_sec));
- if (!job_data && !abstime.tv_sec)
- {
- DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
- thd->enter_cond(&COND_state, &LOCK_scheduler_state,
- "Waiting on empty queue");
- COND_STATE_WAIT(NULL);
- thd->exit_cond("");
- DBUG_PRINT("info", ("Woke up. Got COND_state"));
- LOCK_SCHEDULER_DATA();
- }
- else if (abstime.tv_sec)
- {
- DBUG_PRINT("info", ("Have to sleep some time %u till",
- abstime.tv_sec - thd->query_start(), abstime.tv_sec));
-
- thd->enter_cond(&COND_state, &LOCK_scheduler_state,
- "Waiting for next activation");
- COND_STATE_WAIT(&abstime);
- /*
- If we get signal we should recalculate the whether it's the right time
- because there could be :
- 1. Spurious wake-up
- 2. The top of the queue was changed (new one becase of create/update)
- */
- /* This will do implicit UNLOCK_SCHEDULER_DATA() */
- thd->exit_cond("");
- DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
- LOCK_SCHEDULER_DATA();
- }
- else
- {
- UNLOCK_SCHEDULER_DATA();
- res= execute_top(thd, job_data);
- LOCK_SCHEDULER_DATA();
- if (res)
- break;
- ++started_events;
- }
- DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
- }
- DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
- pthread_cond_signal(&COND_state);
-error:
- state= INITIALIZED;
- UNLOCK_SCHEDULER_DATA();
- sql_print_information("SCHEDULER: Stopped");
-
- DBUG_RETURN(res);
-}
-
-
-/*
- Creates a new THD instance and then forks a new thread, while passing
- the THD pointer and job_data to it.
-
- SYNOPSIS
- Event_scheduler_ng::execute_top()
-
- RETURN VALUE
- FALSE OK
- TRUE Error (Serious error)
-*/
-
-bool
-Event_scheduler_ng::execute_top(THD *thd, Event_job_data *job_data)
-{
- THD *new_thd;
- pthread_t th;
- int res= 0;
- DBUG_ENTER("Event_scheduler_ng::execute_top");
- if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
- goto error;
-
- new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
- job_data->thd= new_thd;
- DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
- job_data->dbname.str, job_data->name.str));
-
- /* Major failure */
- if ((res= pthread_create(&th, &connection_attrib, event_worker_ng_thread,
- job_data)))
- goto error;
-
- DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd));
- DBUG_RETURN(FALSE);
-
-error:
- DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res));
- if (new_thd)
- {
- new_thd->proc_info= "Clearing";
- DBUG_ASSERT(new_thd->net.buff != 0);
- net_end(&new_thd->net);
- pthread_mutex_lock(&LOCK_thread_count);
- thread_count--;
- thread_running--;
- delete new_thd;
- pthread_mutex_unlock(&LOCK_thread_count);
- }
- delete job_data;
- DBUG_RETURN(TRUE);
-}
-
-
-/*
- Returns the current state of the scheduler
-
- SYNOPSIS
- Event_scheduler_ng::get_state()
-
- RETURN VALUE
- The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
-*/
-
-enum Event_scheduler_ng::enum_state
-Event_scheduler_ng::get_state()
-{
- enum Event_scheduler_ng::enum_state ret;
- LOCK_SCHEDULER_DATA();
- ret= state;
- UNLOCK_SCHEDULER_DATA();
- return ret;
-}
-
-
-/*
- Returns the number of living event worker threads.
-
- SYNOPSIS
- Event_scheduler_ng::workers_count()
-*/
-
-uint
-Event_scheduler_ng::workers_count()
-{
- THD *tmp;
- uint count= 0;
-
- DBUG_ENTER("Event_scheduler_ng::workers_count");
- pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (tmp->command == COM_DAEMON)
- continue;
- if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
- ++count;
- }
- pthread_mutex_unlock(&LOCK_thread_count);
- DBUG_PRINT("exit", ("%d", count));
- DBUG_RETURN(count);
-}
-
-
-/*
- Signals the main scheduler thread that the queue has changed
- its state.
-
- SYNOPSIS
- Event_scheduler_ng::queue_changed()
-*/
-
-void
-Event_scheduler_ng::queue_changed()
-{
- DBUG_ENTER("Event_scheduler_ng::queue_changed");
- DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ",
- scheduler_states_names[state].str));
- pthread_cond_signal(&COND_state);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Auxiliary function for locking LOCK_scheduler_state. Used
- by the LOCK_SCHEDULER_DATA macro.
-
- SYNOPSIS
- Event_scheduler_ng::lock_data()
- func Which function is requesting mutex lock
- line On which line mutex lock is requested
-*/
-
-void
-Event_scheduler_ng::lock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler_ng::lock_data");
- DBUG_PRINT("enter", ("func=%s line=%u", func, line));
- pthread_mutex_lock(&LOCK_scheduler_state);
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_scheduler_data_locked= TRUE;
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Auxiliary function for unlocking LOCK_scheduler_state. Used
- by the UNLOCK_SCHEDULER_DATA macro.
-
- SYNOPSIS
- Event_scheduler_ng::unlock_data()
- func Which function is requesting mutex unlock
- line On which line mutex unlock is requested
-*/
-
-void
-Event_scheduler_ng::unlock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler_ng::unlock_data");
- DBUG_PRINT("enter", ("func=%s line=%u", func, line));
- mutex_last_unlocked_at_line= line;
- mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
- pthread_mutex_unlock(&LOCK_scheduler_state);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_cond_wait/timedwait
-
- SYNOPSIS
- Event_scheduler_ng::cond_wait()
- cond Conditional to wait for
- mutex Mutex of the conditional
-
- RETURN VALUE
- Error code of pthread_cond_wait()
-*/
-
-void
-Event_scheduler_ng::cond_wait(struct timespec *abstime,
- const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler_ng::cond_wait");
- waiting_on_cond= TRUE;
- mutex_last_unlocked_at_line= line;
- mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
-
- if (abstime)
- pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
- else
- pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
-
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_scheduler_data_locked= TRUE;
- waiting_on_cond= FALSE;
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Dumps the internal status of the scheduler
-
- SYNOPSIS
- Event_scheduler_ng::dump_internal_status()
- thd Thread
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-bool
-Event_scheduler_ng::dump_internal_status(THD *thd)
-{
- int ret= 0;
- DBUG_ENTER("Event_scheduler_ng::dump_internal_status");
-
-#ifndef DBUG_OFF
- CHARSET_INFO *scs= system_charset_info;
- Protocol *protocol= thd->protocol;
- char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
- char int_buff[STRING_BUFFER_USUAL_SIZE];
- String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
- String int_string(int_buff, sizeof(int_buff), scs);
- tmp_string.length(0);
- int_string.length(0);
-
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler state"), scs);
- protocol->store(scheduler_states_names[state].str,
- scheduler_states_names[state].length, scs);
-
- if ((ret= protocol->write()))
- goto end;
-
- /* thread_id */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("thread_id"), scs);
- if (thread_id)
- {
- int_string.set((longlong) thread_id, scs);
- protocol->store(&int_string);
- }
- else
- protocol->store_null();
- if ((ret= protocol->write()))
- goto end;
-
- /* last locked at*/
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- mutex_last_locked_in_func,
- mutex_last_locked_at_line));
- protocol->store(&tmp_string);
- if ((ret= protocol->write()))
- goto end;
-
- /* last unlocked at*/
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- mutex_last_unlocked_in_func,
- mutex_last_unlocked_at_line));
- protocol->store(&tmp_string);
- if ((ret= protocol->write()))
- goto end;
-
- /* waiting on */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
- int_string.set((longlong) waiting_on_cond, scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
-
- /* workers_count */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
- int_string.set((longlong) workers_count(), scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
-
- /* workers_count */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
- int_string.set((longlong) started_events, scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
-
- /* scheduler_data_locked */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
- int_string.set((longlong) mutex_scheduler_data_locked, scs);
- protocol->store(&int_string);
- ret= protocol->write();
-end:
-#endif
-
- DBUG_RETURN(ret);
-}
diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h
deleted file mode 100644
index e4f3f0588f9..00000000000
--- a/sql/event_scheduler_ng.h
+++ /dev/null
@@ -1,123 +0,0 @@
-#ifndef _EVENT_SCHEDULER_NG_H_
-#define _EVENT_SCHEDULER_NG_H_
-/* Copyright (C) 2004-2006 MySQL AB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-class Event_queue;
-class Event_job_data;
-
-class Event_scheduler_ng
-{
-public:
- Event_scheduler_ng(){}
- ~Event_scheduler_ng(){}
-
- enum enum_state
- {
- INITIALIZED = 0,
- RUNNING,
- STOPPING
- };
-
- /* State changing methods follow */
-
- bool
- start();
-
- bool
- stop();
-
- /*
- Need to be public because has to be called from the function
- passed to pthread_create.
- */
- bool
- run(THD *thd);
-
- bool
- init_scheduler(Event_queue *queue);
-
- void
- deinit_scheduler();
-
- void
- init_mutexes();
-
- void
- deinit_mutexes();
-
- /* Information retrieving methods follow */
-
- enum enum_state
- get_state();
-
- void
- queue_changed();
-
- bool
- dump_internal_status(THD *thd);
-
-private:
- uint
- workers_count();
-
- /* helper functions */
- bool
- execute_top(THD *thd, Event_job_data *job_data);
-
- /* helper functions for working with mutexes & conditionals */
- void
- lock_data(const char *func, uint line);
-
- void
- unlock_data(const char *func, uint line);
-
- void
- cond_wait(struct timespec *abstime, const char *func, uint line);
-
- pthread_mutex_t LOCK_scheduler_state;
-
- /* This is the current status of the life-cycle of the scheduler. */
- enum enum_state state;
-
- /*
- Holds the thread id of the executor thread or 0 if the scheduler is not
- running. It is used by ::shutdown() to know which thread to kill with
- kill_one_thread(). The latter wake ups a thread if it is waiting on a
- conditional variable and sets thd->killed to non-zero.
- */
- ulong thread_id;
-
- pthread_cond_t COND_state;
-
- Event_queue *queue;
-
- uint mutex_last_locked_at_line;
- uint mutex_last_unlocked_at_line;
- const char* mutex_last_locked_in_func;
- const char* mutex_last_unlocked_in_func;
- bool mutex_scheduler_data_locked;
- bool waiting_on_cond;
-
- ulonglong started_events;
-
-private:
- /* Prevent use of these */
- Event_scheduler_ng(const Event_scheduler_ng &);
- void operator=(Event_scheduler_ng &);
-};
-
-#endif /* _EVENT_SCHEDULER_NG_H_ */
diff --git a/sql/events.cc b/sql/events.cc
index cfb7a067d63..2a4ccbaf5ef 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -19,7 +19,7 @@
#include "event_data_objects.h"
#include "event_db_repository.h"
#include "event_queue.h"
-#include "event_scheduler_ng.h"
+#include "event_scheduler.h"
#include "sp_head.h"
/*
@@ -560,15 +560,15 @@ int
Events::init()
{
DBUG_ENTER("Events::init");
- event_queue->init_queue(db_repository, scheduler_ng);
- scheduler_ng->init_scheduler(event_queue);
+ event_queue->init_queue(db_repository, scheduler);
+ scheduler->init_scheduler(event_queue);
/* it should be an assignment! */
if (opt_event_scheduler)
{
DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2);
if (opt_event_scheduler == 1)
- DBUG_RETURN(scheduler_ng->start());
+ DBUG_RETURN(scheduler->start());
}
DBUG_RETURN(0);
@@ -590,8 +590,8 @@ Events::deinit()
{
DBUG_ENTER("Events::deinit");
- scheduler_ng->stop();
- scheduler_ng->deinit_scheduler();
+ scheduler->stop();
+ scheduler->deinit_scheduler();
event_queue->deinit_queue();
@@ -617,8 +617,8 @@ Events::init_mutexes()
event_queue= new Event_queue;
event_queue->init_mutexes();
- scheduler_ng= new Event_scheduler_ng();
- scheduler_ng->init_mutexes();
+ scheduler= new Event_scheduler();
+ scheduler->init_mutexes();
}
@@ -633,9 +633,9 @@ void
Events::destroy_mutexes()
{
event_queue->deinit_mutexes();
- scheduler_ng->deinit_mutexes();
+ scheduler->deinit_mutexes();
- delete scheduler_ng;
+ delete scheduler;
delete db_repository;
pthread_mutex_destroy(&LOCK_event_metadata);
@@ -670,7 +670,7 @@ Events::dump_internal_status(THD *thd)
Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
- if (scheduler_ng->dump_internal_status(thd) ||
+ if (scheduler->dump_internal_status(thd) ||
event_queue->dump_internal_status(thd))
DBUG_RETURN(TRUE);
@@ -694,7 +694,7 @@ bool
Events::start_execution_of_events()
{
DBUG_ENTER("Events::start_execution_of_events");
- DBUG_RETURN(scheduler_ng->start());
+ DBUG_RETURN(scheduler->start());
}
@@ -715,7 +715,7 @@ bool
Events::stop_execution_of_events()
{
DBUG_ENTER("Events::stop_execution_of_events");
- DBUG_RETURN(scheduler_ng->stop());
+ DBUG_RETURN(scheduler->stop());
}
@@ -734,5 +734,5 @@ bool
Events::is_started()
{
DBUG_ENTER("Events::is_started");
- DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING);
+ DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING);
}
diff --git a/sql/events.h b/sql/events.h
index 58a1081caa9..5f46c7bb7c5 100644
--- a/sql/events.h
+++ b/sql/events.h
@@ -21,7 +21,7 @@ class Event_parse_data;
class Event_db_repository;
class Event_queue;
class Event_queue_element;
-class Event_scheduler_ng;
+class Event_scheduler;
/* Return codes */
enum enum_events_error_code
@@ -117,7 +117,7 @@ private:
static Events singleton;
Event_queue *event_queue;
- Event_scheduler_ng *scheduler_ng;
+ Event_scheduler *scheduler;
Event_db_repository *db_repository;
pthread_mutex_t LOCK_event_metadata;