diff options
-rw-r--r-- | sql/event_queue.cc | 173 | ||||
-rw-r--r-- | sql/event_queue.h | 16 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 188 | ||||
-rw-r--r-- | sql/event_scheduler.h | 33 | ||||
-rw-r--r-- | sql/events.cc | 4 |
5 files changed, 227 insertions, 187 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index ba12b732726..5316bae8f4a 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -18,7 +18,6 @@ #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" -#include "event_scheduler.h" #define EVENT_QUEUE_INITIAL_SIZE 30 @@ -87,6 +86,7 @@ Event_queue::Event_queue() { mutex_last_unlocked_in_func= mutex_last_locked_in_func= mutex_last_attempted_lock_in_func= ""; + set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); } @@ -135,8 +135,7 @@ Event_queue::deinit_mutexes() */ bool -Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, - Event_scheduler *sched) +Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) { pthread_t th; bool res; @@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, LOCK_QUEUE_DATA(); db_repository= db_repo; - scheduler= sched; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, @@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); + pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); - - notify_observers(); } DBUG_RETURN(res); @@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, { DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); + pthread_cond_broadcast(&COND_queue_state); } + dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); - if (new_element) - notify_observers(); - end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); @@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, i++; } /* - We don't call notify_observers() . If we remove the top event: + We don't call pthread_cond_broadcast(&COND_queue_state); + If we remove the top event: 1. The queue is empty. The scheduler will wake up at some time and realize that the queue is empty. If create_event() comes inbetween it will signal the scheduler @@ -422,24 +419,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) /* - Signals the observers (the main scheduler thread) that the - state of the queue has been changed. - - SYNOPSIS - Event_queue::notify_observers() -*/ - -void -Event_queue::notify_observers() -{ - DBUG_ENTER("Event_queue::notify_observers"); - DBUG_PRINT("info", ("Signalling change of the queue")); - scheduler->queue_changed(); - DBUG_VOID_RETURN; -} - - -/* Searches for an event in the queue SYNOPSIS @@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now) #endif } +static const char *queue_empty_msg= "Waiting on empty queue"; +static const char *queue_wait_msg= "Waiting for next activation"; /* Checks whether the top of the queue is elligible for execution and @@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now) */ bool -Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, - Event_job_data **job_data, - struct timespec *abstime) +Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) { bool ret= FALSE; struct timespec top_time; + struct timespec *abstime; *job_data= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); - DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); - abstime->tv_nsec= 0; + + top_time.tv_nsec= 0; LOCK_QUEUE_DATA(); - do { + for (;;) + { int res; - if (!queue.elements) - { - abstime->tv_sec= 0; - break; - } + Event_queue_element *top= NULL; - Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); + thd->end_time(); + time_t now= thd->query_start(); + abstime= NULL; - top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); + if (queue.elements) + { + top= ((Event_queue_element*) queue_element(&queue, 0)); + top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); - if (top_time.tv_sec > now) + abstime= &top_time; + } + + if (!abstime || abstime->tv_sec > now) { - abstime->tv_sec= top_time.tv_sec; - DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, - abstime->tv_sec)); - break; + const char *msg; + if (abstime) + { + next_activation_at= top->execute_at; + msg= queue_wait_msg; + } + else + { + set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); + msg= queue_wait_msg; + } + + cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__); + if (thd->killed) + { + DBUG_PRINT("info", ("thd->killed=%d", thd->killed)); + goto end; + } + /* + The queue could have been emptied. Therefore it's safe to start from + the beginning. Moreover, this way we will get also the new top, if + the element at the top has been changed. + */ + continue; } DBUG_PRINT("info", ("Ready for execution")); - abstime->tv_sec= 0; if (!(*job_data= new Event_job_data())) { ret= TRUE; @@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) { + DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; @@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, queue_replaced(&queue); dbug_dump_queue(now); - } while (0); + break; + } +end: UNLOCK_QUEUE_DATA(); DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ", - ret, *job_data, abstime->tv_sec)); + ret, *job_data, abstime? abstime->tv_sec:0)); if (*job_data) DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str, @@ -865,6 +872,52 @@ Event_queue::unlock_data(const char *func, uint line) /* + Wrapper for pthread_cond_wait/timedwait + + SYNOPSIS + Event_queue::cond_wait() + thd Thread (Could be NULL during shutdown procedure) + msg Message for thd->proc_info + abstime If not null then call pthread_cond_timedwait() + func Which function is requesting cond_wait + line On which line cond_wait is requested +*/ + +void +Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg, + const char *func, uint line) +{ + DBUG_ENTER("Event_queue::cond_wait"); + waiting_on_cond= TRUE; + mutex_last_unlocked_at_line= line; + mutex_queue_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + + thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg); + + DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":"")); + if (!abstime) + pthread_cond_wait(&COND_queue_state, &LOCK_event_queue); + else + pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime); + + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_queue_data_locked= TRUE; + waiting_on_cond= FALSE; + + /* + This will free the lock so we need to relock. Not the best thing to + do but we need to obey cond_wait() + */ + thd->exit_cond(""); + LOCK_QUEUE_DATA(); + + DBUG_VOID_RETURN; +} + + +/* Dumps the internal status of the queue SYNOPSIS @@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd) protocol->store(&tmp_string); ret= protocol->write(); + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs); + int_string.set((longlong) waiting_on_cond, scs); + protocol->store(&int_string); + ret= protocol->write(); + + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("next activation at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), + "%4d-%02d-%02d %02d:%02d:%02d", + next_activation_at.year, + next_activation_at.month, + next_activation_at.day, + next_activation_at.hour, + next_activation_at.minute, + next_activation_at.second + )); + protocol->store(&tmp_string); + ret= protocol->write(); + #endif DBUG_RETURN(FALSE); } diff --git a/sql/event_queue.h b/sql/event_queue.h index afb9babc572..73d1a3efe4d 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -36,7 +36,7 @@ public: deinit_mutexes(); bool - init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched); + init_queue(THD *thd, Event_db_repository *db_repo); void deinit_queue(); @@ -60,8 +60,7 @@ public: recalculate_activation_times(THD *thd); bool - get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data, - struct timespec *abstime); + get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); bool dump_internal_status(THD *thd); @@ -81,13 +80,11 @@ protected: empty_queue(); void - notify_observers(); - - void dbug_dump_queue(time_t now); /* LOCK_event_queue is the mutex which protects the access to the queue. */ pthread_mutex_t LOCK_event_queue; + pthread_cond_t COND_queue_state; Event_db_repository *db_repository; @@ -96,6 +93,8 @@ protected: /* The sorted queue with the Event_job_data objects */ QUEUE queue; + TIME next_activation_at; + uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; uint mutex_last_attempted_lock_at_line; @@ -104,6 +103,7 @@ protected: const char* mutex_last_attempted_lock_in_func; bool mutex_queue_data_locked; bool mutex_queue_data_attempting_lock; + bool waiting_on_cond; /* helper functions for working with mutexes & conditionals */ void @@ -111,6 +111,10 @@ protected: void unlock_data(const char *func, uint line); + + void + cond_wait(THD *thd, struct timespec *abstime, const char* msg, + const char *func, uint line); }; #endif /* _EVENT_QUEUE_H_ */ diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index d4a5df85cda..6423f3676fc 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q) LOCK_DATA(); queue= q; started_events= 0; - thread_id= 0; + scheduler_thd= NULL; state= INITIALIZED; UNLOCK_DATA(); } @@ -397,22 +397,18 @@ Event_scheduler::start() scheduler_param_value->thd= new_thd; scheduler_param_value->scheduler= this; + scheduler_thd= new_thd; + DBUG_PRINT("info", ("Setting state go RUNNING")); + state= RUNNING; DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); if (pthread_create(&th, &connection_attrib, event_scheduler_thread, (void*)scheduler_param_value)) { DBUG_PRINT("error", ("cannot create a new thread")); state= INITIALIZED; + scheduler_thd= NULL; ret= TRUE; - } - DBUG_PRINT("info", ("Setting state go RUNNING")); - state= RUNNING; -end: - UNLOCK_DATA(); - if (ret && new_thd) - { - DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); new_thd->proc_info= "Clearing"; DBUG_ASSERT(new_thd->net.buff != 0); net_end(&new_thd->net); @@ -422,6 +418,9 @@ end: delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } +end: + UNLOCK_DATA(); + DBUG_RETURN(ret); } @@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd) Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); - LOCK_DATA(); - - thread_id= thd->thread_id; sql_print_information("SCHEDULER: Manager thread started with id %lu", - thread_id); + thd->thread_id); /* Recalculate the values in the queue because there could have been stops in executions of the scheduler and some times could have passed by. */ queue->recalculate_activation_times(thd); - while (state == RUNNING) + + while (is_running()) { - thd->end_time(); /* Gets a minimized version */ - if (queue->get_top_for_execution_if_time(thd, thd->query_start(), - &job_data, &abstime)) + if (queue->get_top_for_execution_if_time(thd, &job_data)) { - sql_print_information("SCHEDULER: Serious error during getting next" - " event to execute. Stopping"); + sql_print_information("SCHEDULER: Serious error during getting next " + "event to execute. Stopping"); break; } - DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " - "abs_time.tv_sec=%d", - job_data, thd->query_start(), abstime.tv_sec)); - if (!job_data && !abstime.tv_sec) - { - DBUG_PRINT("info", ("The queue is empty. Going to sleep")); - COND_STATE_WAIT(thd, NULL, "Waiting on empty queue"); - DBUG_PRINT("info", ("Woke up. Got COND_state")); - } - else if (abstime.tv_sec) + DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data)); + if (job_data) { - DBUG_PRINT("info", ("Have to sleep some time %u s. till %u", - abstime.tv_sec - thd->query_start(), abstime.tv_sec)); - - COND_STATE_WAIT(thd, &abstime, "Waiting for next activation"); - /* - If we get signal we should recalculate the whether it's the right time - because there could be : - 1. Spurious wake-up - 2. The top of the queue was changed (new one becase of create/update) - */ - DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); + if ((res= execute_top(thd, job_data))) + break; } else { - UNLOCK_DATA(); - res= execute_top(thd, job_data); - LOCK_DATA(); - if (res) - break; - ++started_events; + DBUG_ASSERT(thd->killed); + DBUG_PRINT("info", ("job_data is NULL, the thread was killed")); } DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); } + LOCK_DATA(); DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); - pthread_cond_signal(&COND_state); -error: state= INITIALIZED; + pthread_cond_signal(&COND_state); UNLOCK_DATA(); sql_print_information("SCHEDULER: Stopped"); @@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) job_data))) goto error; + ++started_events; + DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); DBUG_RETURN(FALSE); @@ -568,6 +544,27 @@ error: /* + Checkes whether the state of the scheduler is RUNNING + + SYNOPSIS + Event_scheduler::is_running() + + RETURN VALUE + TRUE RUNNING + FALSE Not RUNNING +*/ + +inline bool +Event_scheduler::is_running() +{ + LOCK_DATA(); + bool ret= (state == RUNNING); + UNLOCK_DATA(); + return ret; +} + + +/* Stops the scheduler (again). Waits for acknowledgement from the scheduler that it has stopped - synchronous stopping. @@ -591,26 +588,48 @@ Event_scheduler::stop() if (state != RUNNING) goto end; - state= STOPPING; - - DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); - sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); - - pthread_cond_signal(&COND_state); - /* Guarantee we don't catch spurious signals */ - sql_print_information("SCHEDULER: Waiting the manager thread to reply"); do { DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " "thread. Current value of state is %s . " "workers count=%d", scheduler_states_names[state].str, workers_count())); + /* + NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON + threads. In addition, kill_one_thread() requires THD but during shutdown + current_thd is NULL. Hence, if kill_one_thread should be used it has to + be modified to kill also daemons, by adding a flag, and also we have to + create artificial THD here. To save all this work, we just do what + kill_one_thread() does to kill a thread. See also sql_repl.cc for similar + usage. + */ + + state= STOPPING; + DBUG_PRINT("info", ("Manager thread has id %d", scheduler_thd->thread_id)); + /* Lock from delete */ + pthread_mutex_lock(&scheduler_thd->LOCK_delete); + /* This will wake up the thread if it waits on Queue's conditional */ + sql_print_information("SCHEDULER: Killing manager thread %lu", + scheduler_thd->thread_id); + scheduler_thd->awake(THD::KILL_CONNECTION); + pthread_mutex_unlock(&scheduler_thd->LOCK_delete); + /* thd could be 0x0, when shutting down */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop"); } while (state == STOPPING); DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); - - thread_id= 0; + /* + The rationale behind setting it to NULL here but not destructing it + beforehand is because the THD will be deinited in event_scheduler_thread(). + It's more clear when the post_init and the deinit is done in one function. + Here we just mark that the scheduler doesn't have a THD anymore. Though for + milliseconds the old thread could exist we can't use it anymore. When we + unlock the mutex in this function a little later the state will be + INITIALIZED. Therefore, a connection thread could enter the critical section + and will create a new THD object. + */ + scheduler_thd= NULL; end: UNLOCK_DATA(); DBUG_RETURN(FALSE); @@ -634,12 +653,8 @@ Event_scheduler::workers_count() pthread_mutex_lock(&LOCK_thread_count); // For unlink from list I_List_iterator<THD> it(threads); while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) ++count; - } pthread_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("exit", ("%d", count)); DBUG_RETURN(count); @@ -647,25 +662,6 @@ Event_scheduler::workers_count() /* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_scheduler::queue_changed() -*/ - -void -Event_scheduler::queue_changed() -{ - DBUG_ENTER("Event_scheduler::queue_changed"); - DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", - scheduler_states_names[state].str)); - pthread_cond_signal(&COND_state); - DBUG_VOID_RETURN; -} - - -/* Auxiliary function for locking LOCK_scheduler_state. Used by the LOCK_DATA macro. @@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line) Event_scheduler::cond_wait() thd Thread (Could be NULL during shutdown procedure) abstime If not null then call pthread_cond_timedwait() + msg Message for thd->proc_info func Which function is requesting cond_wait line On which line cond_wait is requested */ @@ -757,33 +754,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg, /* - Returns the current state of the scheduler - - SYNOPSIS - Event_scheduler::get_state() - - RETURN VALUE - The state of the scheduler (INITIALIZED | RUNNING | STOPPING) -*/ - -enum Event_scheduler::enum_state -Event_scheduler::get_state() -{ - enum Event_scheduler::enum_state ret; - DBUG_ENTER("Event_scheduler::get_state"); - LOCK_DATA(); - ret= state; - UNLOCK_DATA(); - DBUG_RETURN(ret); -} - - -/* - REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF - Returns whether the scheduler was initialized. -*/ - -/* Dumps the internal status of the scheduler SYNOPSIS @@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("thread_id"), scs); if (thread_id) { - int_string.set((longlong) thread_id, scs); + int_string.set((longlong) scheduler_thd->thread_id, scs); protocol->store(&int_string); } else diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index 18a805eb6f6..fc41e345dfd 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -34,14 +34,6 @@ public: Event_scheduler():state(UNINITIALIZED){} ~Event_scheduler(){} - enum enum_state - { - UNINITIALIZED = 0, - INITIALIZED, - RUNNING, - STOPPING - }; - /* State changing methods follow */ bool @@ -70,12 +62,8 @@ public: deinit_mutexes(); /* Information retrieving methods follow */ - - enum enum_state - get_state(); - - void - queue_changed(); + bool + is_running(); bool dump_internal_status(THD *thd); @@ -84,6 +72,7 @@ private: uint workers_count(); + /* helper functions */ bool execute_top(THD *thd, Event_job_data *job_data); @@ -101,16 +90,18 @@ private: pthread_mutex_t LOCK_scheduler_state; + enum enum_state + { + UNINITIALIZED = 0, + INITIALIZED, + RUNNING, + STOPPING + }; + /* This is the current status of the life-cycle of the scheduler. */ enum enum_state state; - /* - Holds the thread id of the executor thread or 0 if the scheduler is not - running. It is used by ::shutdown() to know which thread to kill with - kill_one_thread(). The latter wake ups a thread if it is waiting on a - conditional variable and sets thd->killed to non-zero. - */ - ulong thread_id; + THD *scheduler_thd; pthread_cond_t COND_state; diff --git a/sql/events.cc b/sql/events.cc index deb96ba2e89..8bfafcaf926 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -637,7 +637,7 @@ Events::init() } check_system_tables_error= FALSE; - if (event_queue->init_queue(thd, db_repository, scheduler)) + if (event_queue->init_queue(thd, db_repository)) { sql_print_error("SCHEDULER: Error while loading from disk."); goto end; @@ -827,7 +827,7 @@ Events::is_execution_of_events_started() my_error(ER_EVENTS_DB_ERROR, MYF(0)); DBUG_RETURN(FALSE); } - DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING); + DBUG_RETURN(scheduler->is_running()); } |