diff options
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 173 |
1 files changed, 124 insertions, 49 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index ba12b732726..5316bae8f4a 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -18,7 +18,6 @@ #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" -#include "event_scheduler.h" #define EVENT_QUEUE_INITIAL_SIZE 30 @@ -87,6 +86,7 @@ Event_queue::Event_queue() { mutex_last_unlocked_in_func= mutex_last_locked_in_func= mutex_last_attempted_lock_in_func= ""; + set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); } @@ -135,8 +135,7 @@ Event_queue::deinit_mutexes() */ bool -Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, - Event_scheduler *sched) +Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) { pthread_t th; bool res; @@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, LOCK_QUEUE_DATA(); db_repository= db_repo; - scheduler= sched; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, @@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); + pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); - - notify_observers(); } DBUG_RETURN(res); @@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, { DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); + pthread_cond_broadcast(&COND_queue_state); } + dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); - if (new_element) - notify_observers(); - end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); @@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, i++; } /* - We don't call notify_observers() . If we remove the top event: + We don't call pthread_cond_broadcast(&COND_queue_state); + If we remove the top event: 1. The queue is empty. The scheduler will wake up at some time and realize that the queue is empty. If create_event() comes inbetween it will signal the scheduler @@ -422,24 +419,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) /* - Signals the observers (the main scheduler thread) that the - state of the queue has been changed. - - SYNOPSIS - Event_queue::notify_observers() -*/ - -void -Event_queue::notify_observers() -{ - DBUG_ENTER("Event_queue::notify_observers"); - DBUG_PRINT("info", ("Signalling change of the queue")); - scheduler->queue_changed(); - DBUG_VOID_RETURN; -} - - -/* Searches for an event in the queue SYNOPSIS @@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now) #endif } +static const char *queue_empty_msg= "Waiting on empty queue"; +static const char *queue_wait_msg= "Waiting for next activation"; /* Checks whether the top of the queue is elligible for execution and @@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now) */ bool -Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, - Event_job_data **job_data, - struct timespec *abstime) +Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) { bool ret= FALSE; struct timespec top_time; + struct timespec *abstime; *job_data= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); - DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); - abstime->tv_nsec= 0; + + top_time.tv_nsec= 0; LOCK_QUEUE_DATA(); - do { + for (;;) + { int res; - if (!queue.elements) - { - abstime->tv_sec= 0; - break; - } + Event_queue_element *top= NULL; - Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); + thd->end_time(); + time_t now= thd->query_start(); + abstime= NULL; - top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); + if (queue.elements) + { + top= ((Event_queue_element*) queue_element(&queue, 0)); + top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); - if (top_time.tv_sec > now) + abstime= &top_time; + } + + if (!abstime || abstime->tv_sec > now) { - abstime->tv_sec= top_time.tv_sec; - DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, - abstime->tv_sec)); - break; + const char *msg; + if (abstime) + { + next_activation_at= top->execute_at; + msg= queue_wait_msg; + } + else + { + set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); + msg= queue_wait_msg; + } + + cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__); + if (thd->killed) + { + DBUG_PRINT("info", ("thd->killed=%d", thd->killed)); + goto end; + } + /* + The queue could have been emptied. Therefore it's safe to start from + the beginning. Moreover, this way we will get also the new top, if + the element at the top has been changed. + */ + continue; } DBUG_PRINT("info", ("Ready for execution")); - abstime->tv_sec= 0; if (!(*job_data= new Event_job_data())) { ret= TRUE; @@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) { + DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; @@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, queue_replaced(&queue); dbug_dump_queue(now); - } while (0); + break; + } +end: UNLOCK_QUEUE_DATA(); DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ", - ret, *job_data, abstime->tv_sec)); + ret, *job_data, abstime? abstime->tv_sec:0)); if (*job_data) DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str, @@ -865,6 +872,52 @@ Event_queue::unlock_data(const char *func, uint line) /* + Wrapper for pthread_cond_wait/timedwait + + SYNOPSIS + Event_queue::cond_wait() + thd Thread (Could be NULL during shutdown procedure) + msg Message for thd->proc_info + abstime If not null then call pthread_cond_timedwait() + func Which function is requesting cond_wait + line On which line cond_wait is requested +*/ + +void +Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg, + const char *func, uint line) +{ + DBUG_ENTER("Event_queue::cond_wait"); + waiting_on_cond= TRUE; + mutex_last_unlocked_at_line= line; + mutex_queue_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + + thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg); + + DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":"")); + if (!abstime) + pthread_cond_wait(&COND_queue_state, &LOCK_event_queue); + else + pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime); + + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_queue_data_locked= TRUE; + waiting_on_cond= FALSE; + + /* + This will free the lock so we need to relock. Not the best thing to + do but we need to obey cond_wait() + */ + thd->exit_cond(""); + LOCK_QUEUE_DATA(); + + DBUG_VOID_RETURN; +} + + +/* Dumps the internal status of the queue SYNOPSIS @@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd) protocol->store(&tmp_string); ret= protocol->write(); + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs); + int_string.set((longlong) waiting_on_cond, scs); + protocol->store(&int_string); + ret= protocol->write(); + + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("next activation at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), + "%4d-%02d-%02d %02d:%02d:%02d", + next_activation_at.year, + next_activation_at.month, + next_activation_at.day, + next_activation_at.hour, + next_activation_at.minute, + next_activation_at.second + )); + protocol->store(&tmp_string); + ret= protocol->write(); + #endif DBUG_RETURN(FALSE); } |