diff options
author | unknown <andrey@lmy004.> | 2006-02-16 01:27:36 +0100 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-02-16 01:27:36 +0100 |
commit | 74b2989dc4fbf8c2aebc6121539f493ecdd0c879 (patch) | |
tree | 3991021bdc249111af04e3b0de1a03305a8fb538 /sql/event_executor.cc | |
parent | ea8a26c6f36a3896ea11c8e618e69fa6a8515a6f (diff) | |
parent | fea4742db5b5f5531b0e2d30ccee7883f54b0e80 (diff) | |
download | mariadb-git-74b2989dc4fbf8c2aebc6121539f493ecdd0c879.tar.gz |
merge
mysql-test/r/events.result:
Auto merged
mysql-test/t/events.test:
Auto merged
sql/event_priv.h:
Auto merged
sql/sql_db.cc:
Auto merged
sql/event.cc:
manual merge
sql/event.h:
manual merge
sql/event_executor.cc:
manual merge
sql/event_timed.cc:
manual merge
Diffstat (limited to 'sql/event_executor.cc')
-rw-r--r-- | sql/event_executor.cc | 263 |
1 files changed, 162 insertions, 101 deletions
diff --git a/sql/event_executor.cc b/sql/event_executor.cc index 43be372e96c..9483c2ab165 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -18,6 +18,11 @@ #include "event.h" #include "sp.h" +#define WAIT_STATUS_READY 0 +#define WAIT_STATUS_EMPTY_QUEUE 1 +#define WAIT_STATUS_NEW_TOP_EVENT 2 +#define WAIT_STATUS_STOP_EXECUTOR 3 + /* Make this define DBUG_FAULTY_THR to be able to put breakpoints inside @@ -295,6 +300,85 @@ init_event_thread(THD* thd) /* + This function waits till the time next event in the queue should be + executed. + + Returns + WAIT_STATUS_READY There is an event to be executed right now + WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped. + WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled + on top. Restart ticking. + WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0; +*/ + +static int +executor_wait_till_next_event_exec(THD *thd) +{ + event_timed *et; + TIME time_now; + int t2sleep; + + DBUG_ENTER("executor_wait_till_next_event_exec"); + /* + now let's see how much time to sleep, we know there is at least 1 + element in the queue. + */ + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_RETURN(1); + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + DBUG_ASSERT(et); + if (et->status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); + if (et->dropped) + et->drop(thd); + delete et; + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + sql_print_information("Event found disabled, dropping."); + DBUG_RETURN(1); + } + + DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); + // set the internal clock of thd + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); + t2sleep= evex_time_diff(&et->execute_at, &time_now); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + + DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); + if (t2sleep > 0) + { + /* + We sleep t2sleep seconds but we check every second whether this thread + has been killed, or there is a new candidate + */ + while (t2sleep-- && !thd->killed && event_executor_running_global_var && + evex_queue_num_elements(EVEX_EQ_NAME) && + (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) + { + DBUG_PRINT("evex main thread",("will sleep a bit more")); + my_sleep(1000000); + } + } + + int ret= 0; + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + ret= 1; + else if (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) != et) + ret= 2; + if (thd->killed && event_executor_running_global_var) + ret= 3; + + DBUG_RETURN(ret); +} + + +/* The main scheduler thread. Inits the priority queue on start and destroys it on thread shutdown. Forks child threads for every event execution. Sleeps between thread forking and does not do a busy wait. @@ -313,9 +397,9 @@ pthread_handler_t event_executor_main(void *arg) { THD *thd; /* needs to be first for thread_stack */ - ulonglong iter_num= 0; uint i=0, j=0; my_ulonglong cnt= 0; + TIME time_now; DBUG_ENTER("event_executor_main"); DBUG_PRINT("event_executor_main", ("EVEX thread started")); @@ -330,15 +414,16 @@ event_executor_main(void *arg) if (sizeof(my_time_t) != sizeof(time_t)) { - sql_print_error("sizeof(my_time_t) != sizeof(time_t) ." + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." "The scheduler will not work correctly. Stopping."); + DBUG_ASSERT(0); goto err_no_thd; } //TODO Andrey: Check for NULL if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create THD for event_executor_main"); + sql_print_error("SCHEDULER: Cannot create THD for the main thread."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is @@ -346,7 +431,7 @@ event_executor_main(void *arg) pthread_detach_this_thread(); if (init_event_thread(thd)) - goto err; + goto finish; /* make this thread visible it has no vio -> show processlist won't see it @@ -360,7 +445,7 @@ event_executor_main(void *arg) thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - DBUG_PRINT("EVEX main thread", ("Initing events_queuey")); + DBUG_PRINT("EVEX main thread", ("Initing events_queue")); /* eventually manifest that we are running, not to crashe because of @@ -376,15 +461,14 @@ event_executor_main(void *arg) thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); if (evex_load_events_from_db(thd)) - goto err; + goto finish; evex_main_thread_id= thd->thread_id; - sql_print_information("Scheduler thread started"); + sql_print_information("SCHEDULER: Main thread started"); while (!thd->killed) { TIME time_now; - my_time_t now; event_timed *et; cnt++; @@ -393,7 +477,7 @@ event_executor_main(void *arg) thd->proc_info = "Sleeping"; if (!event_executor_running_global_var) { - sql_print_information("Scheduler asked to stop."); + sql_print_information("SCHEDULER: Asked to stop."); break; } @@ -402,62 +486,31 @@ event_executor_main(void *arg) my_sleep(1000000);// sleep 1s continue; } - - { - int t2sleep; - /* - now let's see how much time to sleep, we know there is at least 1 - element in the queue. - */ - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - continue; - } - et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); - if (et->status == MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); - if (et->dropped) - et->drop(thd); - delete et; - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - sql_print_information("Event found disabled, dropping."); - continue; - } - - DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); - time((time_t *)&now); - my_tz_UTC->gmt_sec_to_TIME(&time_now, now); - t2sleep= evex_time_diff(&et->execute_at, &time_now); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - - DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); - if (t2sleep > 0) - { - /* - We sleep t2sleep seconds but we check every second whether this thread - has been killed, or there is a new candidate - */ - while (t2sleep-- && !thd->killed && event_executor_running_global_var && - evex_queue_num_elements(EVEX_EQ_NAME) && - (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) - { - DBUG_PRINT("evex main thread",("will sleep a bit more")); - my_sleep(1000000); - } - } - if (!event_executor_running_global_var) - { - sql_print_information("Scheduler asked to stop."); - break; - } + +restart_ticking: + switch (executor_wait_till_next_event_exec(thd)) { + case WAIT_STATUS_READY: // time to execute the event on top + DBUG_PRINT("evex main thread",("time to execute an event")); + break; + case WAIT_STATUS_EMPTY_QUEUE: // no more events + DBUG_PRINT("evex main thread",("no more events")); + continue; + break; + case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue + DBUG_PRINT("evex main thread",("restart ticking")); + goto restart_ticking; + case WAIT_STATUS_STOP_EXECUTOR: + sql_print_information("SCHEDULER: Asked to stop."); + goto finish; + break; + default: + DBUG_ASSERT(0); } VOID(pthread_mutex_lock(&LOCK_event_arrays)); + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); if (!evex_queue_num_elements(EVEX_EQ_NAME)) { @@ -479,14 +532,14 @@ event_executor_main(void *arg) DBUG_PRINT("evex main thread",("it's right time")); if (et->status == MYSQL_EVENT_ENABLED) { - pthread_t th; + int fork_ret_code; DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str, TIME_to_ulonglong_datetime(&et->execute_at))); et->mark_last_executed(thd); if (et->compute_next_execution_time()) { - sql_print_error("Error while computing time of %s.%s . " + sql_print_error("SCHEDULER: Error while computing time of %s.%s . " "Disabling after execution.", et->dbname.str, et->name.str); et->status= MYSQL_EVENT_DISABLED; @@ -495,13 +548,23 @@ event_executor_main(void *arg) TIME_to_ulonglong_datetime(&et->execute_at))); et->update_fields(thd); - ++iter_num; - DBUG_PRINT("info", (" Spawning a thread %d", iter_num)); #ifndef DBUG_FAULTY_THR - if (pthread_create(&th,&connection_attrib,event_executor_worker,(void*)et)) - { - sql_print_error("Problem while trying to create a thread"); - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); + thread_safe_increment(workers_count, &LOCK_workers_count); + switch ((fork_ret_code= et->spawn_now(event_executor_worker))) { + case EVENT_EXEC_CANT_FORK: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_error("SCHEDULER: Problem while trying to create a thread"); + UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish); + case EVENT_EXEC_ALREADY_EXEC: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", + et->dbname.str, et->name.str); + break; + default: + DBUG_ASSERT(!fork_ret_code); + if (fork_ret_code) + thread_safe_decrement(workers_count, &LOCK_workers_count); + break; } #else event_executor_worker((void *) et); @@ -511,22 +574,21 @@ event_executor_main(void *arg) et->flags |= EVENT_EXEC_NO_MORE; if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1 else evex_queue_first_updated(&EVEX_EQ_NAME); } DBUG_PRINT("evex main thread",("unlocking")); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); }// while +finish: -err: // First manifest that this thread does not work and then destroy VOID(pthread_mutex_lock(&LOCK_evex_running)); evex_is_running= false; evex_main_thread_id= 0; VOID(pthread_mutex_unlock(&LOCK_evex_running)); - sql_print_information("Event scheduler stopping. Waiting for worker threads to finish."); /* TODO: A better will be with a conditional variable @@ -535,21 +597,33 @@ err: Read workers_count without lock, no need for locking. In the worst case we have to wait 1sec more. */ - while (workers_count) - my_sleep(1000000);// 1s + sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish."); + while (1) + { + VOID(pthread_mutex_lock(&LOCK_workers_count)); + if (!workers_count) + { + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + break; + } + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + my_sleep(1000000);// 1s + } /* - LEX_STRINGs reside in the memory root and will be destroyed with it. - Hence no need of delete but only freeing of SP + First we free all objects ... + Lock because a DROP DATABASE could be running in parallel and it locks on these */ - // First we free all objects ... + sql_print_information("SCHEDULER: Emptying the queue."); + VOID(pthread_mutex_lock(&LOCK_event_arrays)); for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) { event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); et->free_sp(); delete et; } - // ... then we can thras the whole queue at once + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + // ... then we can thrash the whole queue at once evex_queue_destroy(&EVEX_EQ_NAME); thd->proc_info = "Clearing"; @@ -573,7 +647,7 @@ err_no_thd: VOID(pthread_mutex_unlock(&LOCK_evex_running)); free_root(&evex_mem_root, MYF(0)); - sql_print_information("Event scheduler stopped."); + sql_print_information("SCHEDULER: Stopped."); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -600,9 +674,6 @@ event_executor_worker(void *event_void) MEM_ROOT worker_mem_root; DBUG_ENTER("event_executor_worker"); - VOID(pthread_mutex_lock(&LOCK_workers_count)); - ++workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); @@ -611,7 +682,7 @@ event_executor_worker(void *event_void) if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create a THD structure in a scheduler worker thread"); + sql_print_error("SCHEDULER: Cannot create a THD structure in an worker."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is @@ -653,14 +724,7 @@ event_executor_worker(void *event_void) event->dbname.str, event->name.str, event->definer.str); } - if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("event_executor_worker", - ("%s exec no more. to drop=%d",event->name.str, event->dropped)); - if (event->dropped) - event->drop(thd); - delete event; - } + event->spawn_thread_finish(thd); err: @@ -689,10 +753,7 @@ err: err_no_thd: free_root(&worker_mem_root, MYF(0)); - - VOID(pthread_mutex_lock(&LOCK_workers_count)); - --workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); + thread_safe_decrement(workers_count, &LOCK_workers_count); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd) if ((ret= evex_open_event_table(thd, TL_READ, &table))) { - sql_print_error("Table mysql.event is damaged."); + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); DBUG_RETURN(SP_OPEN_TABLE_FAILED); } @@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd) if ((ret= et->load_from_row(&evex_mem_root, table))) { - sql_print_error("Error while loading from mysql.event. " + sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); goto end; } @@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd) if ((ret= et->compile(thd, &evex_mem_root))) { - sql_print_error("Error while compiling %s.%s. Aborting load.", + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", et->dbname.str, et->name.str); goto end; } @@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd) // let's find when to be executed if (et->compute_next_execution_time()) { - sql_print_error("Error while computing execution time of %s.%s. Skipping", - et->dbname.str, et->name.str); + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); continue; } @@ -799,7 +860,7 @@ end: thd->version--; // Force close to free memory close_thread_tables(thd); - sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s"); + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); |