summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-02-16 02:13:37 +0100
committerunknown <andrey@lmy004.>2006-02-16 02:13:37 +0100
commit51e1a5f8109e03ee6ec7be832fda88244d02d1da (patch)
tree56753ced0ae56219d74d6d60ea3c97f4b8a66955 /sql
parenta7da76f24e5e670b44d01bd7f4b47f592741bf24 (diff)
parent74b2989dc4fbf8c2aebc6121539f493ecdd0c879 (diff)
downloadmariadb-git-51e1a5f8109e03ee6ec7be832fda88244d02d1da.tar.gz
Merge ahristov@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into lmy004.:/work/mysql-5.1-bug16406
Diffstat (limited to 'sql')
-rw-r--r--sql/event.cc229
-rw-r--r--sql/event.h60
-rw-r--r--sql/event_executor.cc263
-rw-r--r--sql/event_priv.h12
-rw-r--r--sql/event_timed.cc139
-rw-r--r--sql/sql_db.cc2
6 files changed, 578 insertions, 127 deletions
diff --git a/sql/event.cc b/sql/event.cc
index c0a043aaf09..76d802b9ac0 100644
--- a/sql/event.cc
+++ b/sql/event.cc
@@ -515,6 +515,28 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
SYNOPSIS
evex_db_find_event_aux()
thd Thread context
+ et evet_timed object containing dbname, name & definer
+ table TABLE object for open mysql.event table.
+
+ RETURN VALUE
+ 0 - Routine found
+ EVEX_KEY_NOT_FOUND - No routine with given name
+*/
+
+inline int
+evex_db_find_event_aux(THD *thd, event_timed *et, TABLE *table)
+{
+ return evex_db_find_event_by_name(thd, et->dbname, et->name,
+ et->definer, table);
+}
+
+
+/*
+ Find row in open mysql.event table representing event
+
+ SYNOPSIS
+ evex_db_find_event_by_name()
+ thd Thread context
dbname Name of event's database
rname Name of the event inside the db
table TABLE object for open mysql.event table.
@@ -525,13 +547,13 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
*/
int
-evex_db_find_event_aux(THD *thd, const LEX_STRING dbname,
- const LEX_STRING ev_name,
- const LEX_STRING user_name,
- TABLE *table)
+evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname,
+ const LEX_STRING ev_name,
+ const LEX_STRING user_name,
+ TABLE *table)
{
byte key[MAX_KEY_LENGTH];
- DBUG_ENTER("evex_db_find_event_aux");
+ DBUG_ENTER("evex_db_find_event_by_name");
DBUG_PRINT("enter", ("name: %.*s", ev_name.length, ev_name.str));
/*
@@ -710,7 +732,7 @@ db_create_event(THD *thd, event_timed *et, my_bool create_if_not,
}
DBUG_PRINT("info", ("check existance of an event with the same name"));
- if (!evex_db_find_event_aux(thd, et->dbname, et->name, et->definer, table))
+ if (!evex_db_find_event_aux(thd, et, table))
{
if (create_if_not)
{
@@ -848,7 +870,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
goto err;
}
- if (!evex_db_find_event_aux(thd, new_name->m_db, new_name->m_name,
+ if (!evex_db_find_event_by_name(thd, new_name->m_db, new_name->m_name,
et->definer, table))
{
my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->m_name.str);
@@ -861,8 +883,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
overwrite the key and SE will tell us that it cannot find the already found
row (copied into record[1] later
*/
- if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et->dbname, et->name,
- et->definer, table))
+ if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et, table))
{
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), et->name.str);
goto err;
@@ -943,8 +964,8 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, event_timed **ett,
goto done;
}
- if ((ret= evex_db_find_event_aux(thd, name->m_db, name->m_name, *definer,
- table)))
+ if ((ret= evex_db_find_event_by_name(thd, name->m_db, name->m_name, *definer,
+ table)))
{
my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name->m_name.str);
goto done;
@@ -1089,7 +1110,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock,
if (!sortcmp_lex_string(*name, et->name, system_charset_info) &&
!sortcmp_lex_string(*db, et->dbname, system_charset_info))
{
- if (!et->is_running())
+ if (et->can_spawn_now())
{
DBUG_PRINT("evex_remove_from_cache", ("not running - free and delete"));
et->free_sp();
@@ -1393,3 +1414,187 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer)
DBUG_RETURN(ret);
}
+
+
+/*
+ evex_drop_db_events - Drops all events in the selected database
+
+ thd - Thread
+ db - ASCIIZ the name of the database
+
+ Returns:
+ 0 - OK
+ 1 - Failed to delete a specific row
+ 2 - Got NULL while reading db name from a row
+
+ Note:
+ The algo is the following
+ 1. Go through the in-memory cache, if the scheduler is working
+ and for every event whose dbname matches the database we drop
+ check whether is currently in execution:
+ - event_timed::can_spawn() returns true -> the event is not
+ being executed in a child thread. The reason not to use
+ event_timed::is_running() is that the latter shows only if
+ it is being executed, which is 99% of the time in the thread
+ but there are some initiliazations before and after the
+ anonymous SP is being called. So if we delete in this moment
+ -=> *boom*, so we have to check whether the thread has been
+ spawned and can_spawn() is the right method.
+ - event_timed::can_spawn() returns false -> being runned ATM
+ just set the flags so it should drop itself.
+
+*/
+
+int
+evex_drop_db_events(THD *thd, char *db)
+{
+ TABLE *table;
+ READ_RECORD read_record_info;
+ MYSQL_LOCK *lock;
+ int ret= 0;
+ int i;
+ LEX_STRING db_lex= {db, strlen(db)};
+
+ DBUG_ENTER("evex_drop_db_events");
+ DBUG_PRINT("info",("dropping events from %s", db));
+
+
+ VOID(pthread_mutex_lock(&LOCK_event_arrays));
+
+ if ((ret= evex_open_event_table(thd, TL_WRITE, &table)))
+ {
+ sql_print_error("Table mysql.event is damaged.");
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ DBUG_RETURN(SP_OPEN_TABLE_FAILED);
+ }
+
+ DBUG_PRINT("info",("%d elements in the queue",
+ evex_queue_num_elements(EVEX_EQ_NAME)));
+ VOID(pthread_mutex_lock(&LOCK_evex_running));
+ if (!evex_is_running)
+ goto skip_memory;
+
+ for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
+ {
+ event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
+ if (sortcmp_lex_string(et->dbname, db_lex, system_charset_info))
+ continue;
+
+ if (et->can_spawn_now_n_lock(thd))
+ {
+ DBUG_PRINT("info",("event %s not running - direct delete", et->name.str));
+ if (!(ret= evex_db_find_event_aux(thd, et, table)))
+ {
+ DBUG_PRINT("info",("event %s found on disk", et->name.str));
+ if ((ret= table->file->ha_delete_row(table->record[0])))
+ {
+ sql_print_error("Error while deleting a row - dropping "
+ "a database. Skipping the rest.");
+ my_error(ER_EVENT_DROP_FAILED, MYF(0), et->name.str);
+ goto end;
+ }
+ DBUG_PRINT("info",("deleted event [%s] num [%d]. Time to free mem",
+ et->name.str, i));
+ }
+ else if (ret == EVEX_KEY_NOT_FOUND)
+ {
+ sql_print_error("Expected to find event %s.%s of %s on disk-not there.",
+ et->dbname.str, et->name.str, et->definer.str);
+ }
+ et->free_sp();
+ delete et;
+ et= 0;
+ /* no need to call et->spawn_unlock because we already cleaned et */
+ }
+ else
+ {
+ DBUG_PRINT("info",("event %s is running. setting exec_no_more and dropped",
+ et->name.str));
+ et->flags|= EVENT_EXEC_NO_MORE;
+ et->dropped= TRUE;
+ }
+ DBUG_PRINT("info",("%d elements in the queue",
+ evex_queue_num_elements(EVEX_EQ_NAME)));
+ evex_queue_delete_element(&EVEX_EQ_NAME, i);// 1 is top
+ DBUG_PRINT("info",("%d elements in the queue",
+ evex_queue_num_elements(EVEX_EQ_NAME)));
+ /*
+ decrease so we start at the same position, there will be
+ less elements in the queue, it will still be ordered so on
+ next iteration it will be again i the current element or if
+ no more we finish.
+ */
+ --i;
+ }
+
+skip_memory:
+ /*
+ The reasoning behind having two loops is the following:
+ If there was only one loop, the table-scan, then for every element which
+ matches, the queue in memory has to be searched to remove the element.
+ While if we go first over the queue and remove what's in there we have only
+ one pass over it and after finishing it, moving to table-scan for the disabled
+ events. This needs quite less time and means quite less locking on
+ LOCK_event_arrays.
+ */
+ DBUG_PRINT("info",("Mem-cache checked, now going to db for disabled events"));
+ /* only enabled events are in memory, so we go now and delete the rest */
+ init_read_record(&read_record_info, thd, table ,NULL,1,0);
+ while (!(read_record_info.read_record(&read_record_info)) && !ret)
+ {
+ char *et_db;
+
+ if ((et_db= get_field(thd->mem_root, table->field[EVEX_FIELD_DB])) == NULL)
+ {
+ ret= 2;
+ break;
+ }
+
+ LEX_STRING et_db_lex= {et_db, strlen(et_db)};
+ if (!sortcmp_lex_string(et_db_lex, db_lex, system_charset_info))
+ {
+ event_timed ett;
+ char *ptr;
+
+ if ((ptr= get_field(thd->mem_root, table->field[EVEX_FIELD_STATUS]))
+ == NullS)
+ {
+ sql_print_error("Error while loading from mysql.event. "
+ "Table probably corrupted");
+ goto end;
+ }
+ /*
+ When not running nothing is in memory so we have to clean
+ everything.
+ We don't delete EVENT_ENABLED events when the scheduler is running
+ because maybe this is an event which we asked to drop itself when
+ it is finished and it hasn't finished yet, so we don't touch it.
+ It will drop itself. The not running ENABLED events has been already
+ deleted from ha_delete_row() above in the loop over the QUEUE
+ (in case the executor is running).
+ 'D' stands for DISABLED, 'E' for ENABLED - it's an enum
+ */
+ if ((evex_is_running && ptr[0] == 'D') || !evex_is_running)
+ {
+ DBUG_PRINT("info", ("Dropping %s.%s", et_db, ett.name.str));
+ if ((ret= table->file->ha_delete_row(table->record[0])))
+ {
+ my_error(ER_EVENT_DROP_FAILED, MYF(0), ett.name.str);
+ goto end;
+ }
+ }
+ }
+ }
+ DBUG_PRINT("info",("Disk checked for disabled events. Finishing."));
+
+end:
+ VOID(pthread_mutex_unlock(&LOCK_evex_running));
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ end_read_record(&read_record_info);
+
+ thd->version--; // Force close to free memory
+
+ close_thread_tables(thd);
+
+ DBUG_RETURN(ret);
+}
diff --git a/sql/event.h b/sql/event.h
index 9218d34f6bc..f6e1ecd3188 100644
--- a/sql/event.h
+++ b/sql/event.h
@@ -79,6 +79,8 @@ class event_timed
{
event_timed(const event_timed &); /* Prevent use of these */
void operator=(event_timed &);
+ my_bool in_spawned_thread;
+ ulong locked_by_thread_id;
my_bool running;
pthread_mutex_t LOCK_running;
@@ -117,13 +119,14 @@ public:
bool free_sphead_on_delete;
uint flags;//all kind of purposes
- event_timed():running(0), status_changed(false), last_executed_changed(false),
- expression(0), created(0), modified(0),
- on_completion(MYSQL_EVENT_ON_COMPLETION_DROP),
- status(MYSQL_EVENT_ENABLED), sphead(0), sql_mode(0),
- body_begin(0), dropped(false), free_sphead_on_delete(true),
- flags(0)
-
+ event_timed():in_spawned_thread(0),locked_by_thread_id(0),
+ running(0), status_changed(false),
+ last_executed_changed(false), expression(0), created(0),
+ modified(0), on_completion(MYSQL_EVENT_ON_COMPLETION_DROP),
+ status(MYSQL_EVENT_ENABLED), sphead(0), sql_mode(0),
+ body_begin(0), dropped(false),
+ free_sphead_on_delete(true), flags(0)
+
{
pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST);
init();
@@ -200,7 +203,44 @@ public:
return ret;
}
- void free_sp()
+ /*
+ Checks whether the object is being used in a spawned thread.
+ This method is for very basic checking. Use ::can_spawn_now_n_lock()
+ for most of the cases.
+ */
+
+ my_bool
+ can_spawn_now()
+ {
+ my_bool ret;
+ VOID(pthread_mutex_lock(&this->LOCK_running));
+ ret= !in_spawned_thread;
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+ return ret;
+ }
+
+ /*
+ Checks whether this thread can lock the object for modification ->
+ preventing being spawned for execution, and locks if possible.
+ use ::can_spawn_now() only for basic checking because a race
+ condition may occur between the check and eventual modification (deletion)
+ of the object.
+ */
+
+ my_bool
+ can_spawn_now_n_lock(THD *thd);
+
+ int
+ spawn_unlock(THD *thd);
+
+ int
+ spawn_now(void * (*thread_func)(void*));
+
+ void
+ spawn_thread_finish(THD *thd);
+
+ void
+ free_sp()
{
delete sphead;
sphead= 0;
@@ -240,6 +280,10 @@ event_reconstruct_interval_expression(String *buf,
longlong expression);
int
+evex_drop_db_events(THD *thd, char *db);
+
+
+int
init_events();
void
diff --git a/sql/event_executor.cc b/sql/event_executor.cc
index 43be372e96c..9483c2ab165 100644
--- a/sql/event_executor.cc
+++ b/sql/event_executor.cc
@@ -18,6 +18,11 @@
#include "event.h"
#include "sp.h"
+#define WAIT_STATUS_READY 0
+#define WAIT_STATUS_EMPTY_QUEUE 1
+#define WAIT_STATUS_NEW_TOP_EVENT 2
+#define WAIT_STATUS_STOP_EXECUTOR 3
+
/*
Make this define DBUG_FAULTY_THR to be able to put breakpoints inside
@@ -295,6 +300,85 @@ init_event_thread(THD* thd)
/*
+ This function waits till the time next event in the queue should be
+ executed.
+
+ Returns
+ WAIT_STATUS_READY There is an event to be executed right now
+ WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped.
+ WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled
+ on top. Restart ticking.
+ WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0;
+*/
+
+static int
+executor_wait_till_next_event_exec(THD *thd)
+{
+ event_timed *et;
+ TIME time_now;
+ int t2sleep;
+
+ DBUG_ENTER("executor_wait_till_next_event_exec");
+ /*
+ now let's see how much time to sleep, we know there is at least 1
+ element in the queue.
+ */
+ VOID(pthread_mutex_lock(&LOCK_event_arrays));
+ if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ {
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ DBUG_RETURN(1);
+ }
+ et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
+ DBUG_ASSERT(et);
+ if (et->status == MYSQL_EVENT_DISABLED)
+ {
+ DBUG_PRINT("evex main thread",("Now it is disabled-exec no more"));
+ if (et->dropped)
+ et->drop(thd);
+ delete et;
+ evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ sql_print_information("Event found disabled, dropping.");
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
+ // set the internal clock of thd
+ thd->end_time();
+ my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start());
+ t2sleep= evex_time_diff(&et->execute_at, &time_now);
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+
+ DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays"));
+ if (t2sleep > 0)
+ {
+ /*
+ We sleep t2sleep seconds but we check every second whether this thread
+ has been killed, or there is a new candidate
+ */
+ while (t2sleep-- && !thd->killed && event_executor_running_global_var &&
+ evex_queue_num_elements(EVEX_EQ_NAME) &&
+ (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
+ {
+ DBUG_PRINT("evex main thread",("will sleep a bit more"));
+ my_sleep(1000000);
+ }
+ }
+
+ int ret= 0;
+ if (!evex_queue_num_elements(EVEX_EQ_NAME))
+ ret= 1;
+ else if (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) != et)
+ ret= 2;
+ if (thd->killed && event_executor_running_global_var)
+ ret= 3;
+
+ DBUG_RETURN(ret);
+}
+
+
+/*
The main scheduler thread. Inits the priority queue on start and
destroys it on thread shutdown. Forks child threads for every event
execution. Sleeps between thread forking and does not do a busy wait.
@@ -313,9 +397,9 @@ pthread_handler_t
event_executor_main(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
- ulonglong iter_num= 0;
uint i=0, j=0;
my_ulonglong cnt= 0;
+ TIME time_now;
DBUG_ENTER("event_executor_main");
DBUG_PRINT("event_executor_main", ("EVEX thread started"));
@@ -330,15 +414,16 @@ event_executor_main(void *arg)
if (sizeof(my_time_t) != sizeof(time_t))
{
- sql_print_error("sizeof(my_time_t) != sizeof(time_t) ."
+ sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping.");
+ DBUG_ASSERT(0);
goto err_no_thd;
}
//TODO Andrey: Check for NULL
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
{
- sql_print_error("Cannot create THD for event_executor_main");
+ sql_print_error("SCHEDULER: Cannot create THD for the main thread.");
goto err_no_thd;
}
thd->thread_stack = (char*)&thd; // remember where our stack is
@@ -346,7 +431,7 @@ event_executor_main(void *arg)
pthread_detach_this_thread();
if (init_event_thread(thd))
- goto err;
+ goto finish;
/*
make this thread visible it has no vio -> show processlist won't see it
@@ -360,7 +445,7 @@ event_executor_main(void *arg)
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
- DBUG_PRINT("EVEX main thread", ("Initing events_queuey"));
+ DBUG_PRINT("EVEX main thread", ("Initing events_queue"));
/*
eventually manifest that we are running, not to crashe because of
@@ -376,15 +461,14 @@ event_executor_main(void *arg)
thd->security_ctx->user= my_strdup("event_scheduler", MYF(0));
if (evex_load_events_from_db(thd))
- goto err;
+ goto finish;
evex_main_thread_id= thd->thread_id;
- sql_print_information("Scheduler thread started");
+ sql_print_information("SCHEDULER: Main thread started");
while (!thd->killed)
{
TIME time_now;
- my_time_t now;
event_timed *et;
cnt++;
@@ -393,7 +477,7 @@ event_executor_main(void *arg)
thd->proc_info = "Sleeping";
if (!event_executor_running_global_var)
{
- sql_print_information("Scheduler asked to stop.");
+ sql_print_information("SCHEDULER: Asked to stop.");
break;
}
@@ -402,62 +486,31 @@ event_executor_main(void *arg)
my_sleep(1000000);// sleep 1s
continue;
}
-
- {
- int t2sleep;
- /*
- now let's see how much time to sleep, we know there is at least 1
- element in the queue.
- */
- VOID(pthread_mutex_lock(&LOCK_event_arrays));
- if (!evex_queue_num_elements(EVEX_EQ_NAME))
- {
- VOID(pthread_mutex_unlock(&LOCK_event_arrays));
- continue;
- }
- et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
- if (et->status == MYSQL_EVENT_DISABLED)
- {
- DBUG_PRINT("evex main thread",("Now it is disabled-exec no more"));
- if (et->dropped)
- et->drop(thd);
- delete et;
- evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
- VOID(pthread_mutex_unlock(&LOCK_event_arrays));
- sql_print_information("Event found disabled, dropping.");
- continue;
- }
-
- DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
- time((time_t *)&now);
- my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
- t2sleep= evex_time_diff(&et->execute_at, &time_now);
- VOID(pthread_mutex_unlock(&LOCK_event_arrays));
-
- DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays"));
- if (t2sleep > 0)
- {
- /*
- We sleep t2sleep seconds but we check every second whether this thread
- has been killed, or there is a new candidate
- */
- while (t2sleep-- && !thd->killed && event_executor_running_global_var &&
- evex_queue_num_elements(EVEX_EQ_NAME) &&
- (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
- {
- DBUG_PRINT("evex main thread",("will sleep a bit more"));
- my_sleep(1000000);
- }
- }
- if (!event_executor_running_global_var)
- {
- sql_print_information("Scheduler asked to stop.");
- break;
- }
+
+restart_ticking:
+ switch (executor_wait_till_next_event_exec(thd)) {
+ case WAIT_STATUS_READY: // time to execute the event on top
+ DBUG_PRINT("evex main thread",("time to execute an event"));
+ break;
+ case WAIT_STATUS_EMPTY_QUEUE: // no more events
+ DBUG_PRINT("evex main thread",("no more events"));
+ continue;
+ break;
+ case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue
+ DBUG_PRINT("evex main thread",("restart ticking"));
+ goto restart_ticking;
+ case WAIT_STATUS_STOP_EXECUTOR:
+ sql_print_information("SCHEDULER: Asked to stop.");
+ goto finish;
+ break;
+ default:
+ DBUG_ASSERT(0);
}
VOID(pthread_mutex_lock(&LOCK_event_arrays));
+ thd->end_time();
+ my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start());
if (!evex_queue_num_elements(EVEX_EQ_NAME))
{
@@ -479,14 +532,14 @@ event_executor_main(void *arg)
DBUG_PRINT("evex main thread",("it's right time"));
if (et->status == MYSQL_EVENT_ENABLED)
{
- pthread_t th;
+ int fork_ret_code;
DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str,
TIME_to_ulonglong_datetime(&et->execute_at)));
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
{
- sql_print_error("Error while computing time of %s.%s . "
+ sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
"Disabling after execution.",
et->dbname.str, et->name.str);
et->status= MYSQL_EVENT_DISABLED;
@@ -495,13 +548,23 @@ event_executor_main(void *arg)
TIME_to_ulonglong_datetime(&et->execute_at)));
et->update_fields(thd);
- ++iter_num;
- DBUG_PRINT("info", (" Spawning a thread %d", iter_num));
#ifndef DBUG_FAULTY_THR
- if (pthread_create(&th,&connection_attrib,event_executor_worker,(void*)et))
- {
- sql_print_error("Problem while trying to create a thread");
- UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
+ thread_safe_increment(workers_count, &LOCK_workers_count);
+ switch ((fork_ret_code= et->spawn_now(event_executor_worker))) {
+ case EVENT_EXEC_CANT_FORK:
+ thread_safe_decrement(workers_count, &LOCK_workers_count);
+ sql_print_error("SCHEDULER: Problem while trying to create a thread");
+ UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish);
+ case EVENT_EXEC_ALREADY_EXEC:
+ thread_safe_decrement(workers_count, &LOCK_workers_count);
+ sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
+ et->dbname.str, et->name.str);
+ break;
+ default:
+ DBUG_ASSERT(!fork_ret_code);
+ if (fork_ret_code)
+ thread_safe_decrement(workers_count, &LOCK_workers_count);
+ break;
}
#else
event_executor_worker((void *) et);
@@ -511,22 +574,21 @@ event_executor_main(void *arg)
et->flags |= EVENT_EXEC_NO_MORE;
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
- evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
+ evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1
else
evex_queue_first_updated(&EVEX_EQ_NAME);
}
DBUG_PRINT("evex main thread",("unlocking"));
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
}// while
+finish:
-err:
// First manifest that this thread does not work and then destroy
VOID(pthread_mutex_lock(&LOCK_evex_running));
evex_is_running= false;
evex_main_thread_id= 0;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
- sql_print_information("Event scheduler stopping. Waiting for worker threads to finish.");
/*
TODO: A better will be with a conditional variable
@@ -535,21 +597,33 @@ err:
Read workers_count without lock, no need for locking.
In the worst case we have to wait 1sec more.
*/
- while (workers_count)
- my_sleep(1000000);// 1s
+ sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish.");
+ while (1)
+ {
+ VOID(pthread_mutex_lock(&LOCK_workers_count));
+ if (!workers_count)
+ {
+ VOID(pthread_mutex_unlock(&LOCK_workers_count));
+ break;
+ }
+ VOID(pthread_mutex_unlock(&LOCK_workers_count));
+ my_sleep(1000000);// 1s
+ }
/*
- LEX_STRINGs reside in the memory root and will be destroyed with it.
- Hence no need of delete but only freeing of SP
+ First we free all objects ...
+ Lock because a DROP DATABASE could be running in parallel and it locks on these
*/
- // First we free all objects ...
+ sql_print_information("SCHEDULER: Emptying the queue.");
+ VOID(pthread_mutex_lock(&LOCK_event_arrays));
for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
{
event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
et->free_sp();
delete et;
}
- // ... then we can thras the whole queue at once
+ VOID(pthread_mutex_unlock(&LOCK_event_arrays));
+ // ... then we can thrash the whole queue at once
evex_queue_destroy(&EVEX_EQ_NAME);
thd->proc_info = "Clearing";
@@ -573,7 +647,7 @@ err_no_thd:
VOID(pthread_mutex_unlock(&LOCK_evex_running));
free_root(&evex_mem_root, MYF(0));
- sql_print_information("Event scheduler stopped.");
+ sql_print_information("SCHEDULER: Stopped.");
#ifndef DBUG_FAULTY_THR
my_thread_end();
@@ -600,9 +674,6 @@ event_executor_worker(void *event_void)
MEM_ROOT worker_mem_root;
DBUG_ENTER("event_executor_worker");
- VOID(pthread_mutex_lock(&LOCK_workers_count));
- ++workers_count;
- VOID(pthread_mutex_unlock(&LOCK_workers_count));
init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
@@ -611,7 +682,7 @@ event_executor_worker(void *event_void)
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
{
- sql_print_error("Cannot create a THD structure in a scheduler worker thread");
+ sql_print_error("SCHEDULER: Cannot create a THD structure in an worker.");
goto err_no_thd;
}
thd->thread_stack = (char*)&thd; // remember where our stack is
@@ -653,14 +724,7 @@ event_executor_worker(void *event_void)
event->dbname.str, event->name.str,
event->definer.str);
}
- if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED)
- {
- DBUG_PRINT("event_executor_worker",
- ("%s exec no more. to drop=%d",event->name.str, event->dropped));
- if (event->dropped)
- event->drop(thd);
- delete event;
- }
+ event->spawn_thread_finish(thd);
err:
@@ -689,10 +753,7 @@ err:
err_no_thd:
free_root(&worker_mem_root, MYF(0));
-
- VOID(pthread_mutex_lock(&LOCK_workers_count));
- --workers_count;
- VOID(pthread_mutex_unlock(&LOCK_workers_count));
+ thread_safe_decrement(workers_count, &LOCK_workers_count);
#ifndef DBUG_FAULTY_THR
my_thread_end();
@@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= evex_open_event_table(thd, TL_READ, &table)))
{
- sql_print_error("Table mysql.event is damaged.");
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
DBUG_RETURN(SP_OPEN_TABLE_FAILED);
}
@@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= et->load_from_row(&evex_mem_root, table)))
{
- sql_print_error("Error while loading from mysql.event. "
+ sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
goto end;
}
@@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= et->compile(thd, &evex_mem_root)))
{
- sql_print_error("Error while compiling %s.%s. Aborting load.",
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
et->dbname.str, et->name.str);
goto end;
}
@@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd)
// let's find when to be executed
if (et->compute_next_execution_time())
{
- sql_print_error("Error while computing execution time of %s.%s. Skipping",
- et->dbname.str, et->name.str);
+ sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
+ " Skipping", et->dbname.str, et->name.str);
continue;
}
@@ -799,7 +860,7 @@ end:
thd->version--; // Force close to free memory
close_thread_tables(thd);
- sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s");
+ sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
diff --git a/sql/event_priv.h b/sql/event_priv.h
index 0d4cf3a2993..4f575a96212 100644
--- a/sql/event_priv.h
+++ b/sql/event_priv.h
@@ -19,6 +19,10 @@
#include "mysql_priv.h"
+#define EVENT_EXEC_STARTED 0
+#define EVENT_EXEC_ALREADY_EXEC 1
+#define EVENT_EXEC_CANT_FORK 2
+
#define EVEX_USE_QUEUE
#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
@@ -32,10 +36,10 @@ int
my_time_compare(TIME *a, TIME *b);
int
-evex_db_find_event_aux(THD *thd, const LEX_STRING dbname,
- const LEX_STRING rname,
- const LEX_STRING definer,
- TABLE *table);
+evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname,
+ const LEX_STRING ev_name,
+ const LEX_STRING user_name,
+ TABLE *table);
int
event_timed_compare_q(void *vptr, byte* a, byte *b);
diff --git a/sql/event_timed.cc b/sql/event_timed.cc
index 34e9b50f7a7..d76a9777d4a 100644
--- a/sql/event_timed.cc
+++ b/sql/event_timed.cc
@@ -908,7 +908,7 @@ event_timed::drop(THD *thd)
Saves status and last_executed_at to the disk if changed.
SYNOPSIS
- event_timed::drop()
+ event_timed::update_fields()
thd - thread context
RETURN VALUE
@@ -945,7 +945,7 @@ event_timed::update_fields(THD *thd)
}
- if ((ret= evex_db_find_event_aux(thd, dbname, name, definer, table)))
+ if ((ret= evex_db_find_event_by_name(thd, dbname, name, definer, table)))
goto done;
store_record(table,record[1]);
@@ -1204,6 +1204,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
MEM_ROOT *tmp_mem_root= 0;
LEX *old_lex= thd->lex, lex;
char *old_db;
+ int old_db_length;
event_timed *ett;
sp_name *spn;
char *old_query;
@@ -1237,7 +1238,9 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
old_query_len= thd->query_length;
old_query= thd->query;
old_db= thd->db;
+ old_db_length= thd->db_length;
thd->db= dbname.str;
+ thd->db_length= dbname.length;
get_create_event(thd, &show_create);
@@ -1303,3 +1306,135 @@ done:
DBUG_RETURN(ret);
}
+
+/*
+ Checks whether this thread can lock the object for modification ->
+ preventing being spawned for execution, and locks if possible.
+ use ::can_spawn_now() only for basic checking because a race
+ condition may occur between the check and eventual modification (deletion)
+ of the object.
+
+ Returns
+ true - locked
+ false - cannot lock
+*/
+
+my_bool
+event_timed::can_spawn_now_n_lock(THD *thd)
+{
+ my_bool ret= FALSE;
+ VOID(pthread_mutex_lock(&this->LOCK_running));
+ if (!in_spawned_thread)
+ {
+ in_spawned_thread= TRUE;
+ ret= TRUE;
+ locked_by_thread_id= thd->thread_id;
+ }
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+ return ret;
+}
+
+
+extern pthread_attr_t connection_attrib;
+
+/*
+ Checks whether is possible and forks a thread. Passes self as argument.
+
+ Returns
+ EVENT_EXEC_STARTED - OK
+ EVENT_EXEC_ALREADY_EXEC - Thread not forked, already working
+ EVENT_EXEC_CANT_FORK - Unable to spawn thread (error)
+*/
+
+int
+event_timed::spawn_now(void * (*thread_func)(void*))
+{
+ int ret= EVENT_EXEC_STARTED;
+ static uint exec_num= 0;
+ DBUG_ENTER("event_timed::spawn_now");
+ DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str));
+
+ VOID(pthread_mutex_lock(&this->LOCK_running));
+ if (!in_spawned_thread)
+ {
+ pthread_t th;
+ in_spawned_thread= true;
+ if (pthread_create(&th, &connection_attrib, thread_func, (void*)this))
+ {
+ DBUG_PRINT("info", ("problem while spawning thread"));
+ ret= EVENT_EXEC_CANT_FORK;
+ in_spawned_thread= false;
+ }
+#ifndef DBUG_OFF
+ else
+ {
+ sql_print_information("SCHEDULER: Started thread %d", ++exec_num);
+ DBUG_PRINT("info", ("thread spawned"));
+ }
+#endif
+ }
+ else
+ {
+ DBUG_PRINT("info", ("already in spawned thread. skipping"));
+ ret= EVENT_EXEC_ALREADY_EXEC;
+ }
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+
+ DBUG_RETURN(ret);
+}
+
+
+void
+event_timed::spawn_thread_finish(THD *thd)
+{
+ DBUG_ENTER("event_timed::spawn_thread_finish");
+ VOID(pthread_mutex_lock(&this->LOCK_running));
+ in_spawned_thread= false;
+ if ((flags & EVENT_EXEC_NO_MORE) || status == MYSQL_EVENT_DISABLED)
+ {
+ DBUG_PRINT("info", ("%s exec no more. to drop=%d", name.str, dropped));
+ if (dropped)
+ drop(thd);
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+ delete this;
+ DBUG_VOID_RETURN;
+ }
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unlocks the object after it has been locked with ::can_spawn_now_n_lock()
+
+ Returns
+ 0 - ok
+ 1 - not locked by this thread
+
+*/
+
+
+int
+event_timed::spawn_unlock(THD *thd)
+{
+ int ret= 0;
+ VOID(pthread_mutex_lock(&this->LOCK_running));
+ if (!in_spawned_thread)
+ {
+ if (locked_by_thread_id == thd->thread_id)
+ {
+ in_spawned_thread= FALSE;
+ locked_by_thread_id= 0;
+ }
+ else
+ {
+ sql_print_error("A thread tries to unlock when he hasn't locked. "
+ "thread_id=%ld locked by %ld",
+ thd->thread_id, locked_by_thread_id);
+ DBUG_ASSERT(0);
+ ret= 1;
+ }
+ }
+ VOID(pthread_mutex_unlock(&this->LOCK_running));
+ return ret;
+}
diff --git a/sql/sql_db.cc b/sql/sql_db.cc
index a7a7327bb87..885643cf899 100644
--- a/sql/sql_db.cc
+++ b/sql/sql_db.cc
@@ -20,6 +20,7 @@
#include "mysql_priv.h"
#include <mysys_err.h>
#include "sp.h"
+#include "event.h"
#include <my_dir.h>
#include <m_ctype.h>
#ifdef __WIN__
@@ -870,6 +871,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
exit:
(void)sp_drop_db_routines(thd, db); /* QQ Ignore errors for now */
+ (void)evex_drop_db_events(thd, db); /* QQ Ignore errors for now */
start_waiting_global_read_lock(thd);
/*
If this database was the client's selected database, we silently change the