diff options
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 169 |
1 files changed, 129 insertions, 40 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 32c5a076a62..44920b29c16 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -16,7 +16,7 @@ #include "mysql_priv.h" #include "events.h" -#include "event_scheduler.h" +#include "event_scheduler_ng.h" #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" @@ -35,10 +35,6 @@ #define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__) -Event_scheduler* -Event_queue::singleton= NULL; - - /* Compares the execute_at members of 2 Event_timed instances. Used as callback for the prioritized queue when shifting @@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) goto end; } - /* We need to load the event on scheduler_root */ if (!(res= db_repository-> load_named_event(thd, et->dbname, et->name, &et_new))) { + DBUG_PRINT("info", ("new event in the queue %p", et_new)); queue_insert_safe(&queue, (byte *) et_new); on_queue_change(); } @@ -130,7 +126,7 @@ end: Updates an event from the scheduler queue SYNOPSIS - Event_scheduler::update_event() + Event_queue::update_event() thd Thread et The event to replace(add) into the queue new_schema New schema @@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, et->dbname= *new_schema; et->name= *new_name; } - /* - We need to load the event (it's strings but on the object itself) - on scheduler_root. et_new could be NULL : - 1. Error occured - 2. If the replace is DISABLED, we don't load it into the queue. - */ + if (!(res= db_repository-> load_named_event(thd, et->dbname, et->name, &et_new))) { + DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old)); queue_insert_safe(&queue, (byte *) et_new); on_queue_change(); } @@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, /* - Drops an event from the scheduler queue + Drops an event from the queue SYNOPSIS Event_queue::drop_event() @@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name) } - - /* - Searches for an event in the scheduler queue + Searches for an event in the queue SYNOPSIS Event_queue::find_event() @@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) comparator The function to use for comparing RETURN VALUE - -1 Scheduler not working >=0 Number of dropped events NOTE @@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, db The schema name RETURN VALUE - -1 Scheduler not working >=0 Number of dropped events */ @@ -459,8 +447,7 @@ void Event_queue::lock_data(const char *func, uint line) { DBUG_ENTER("Event_queue::lock_mutex"); - DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", - &LOCK_event_queue, func, line)); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); pthread_mutex_lock(&LOCK_event_queue); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; @@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line) void Event_queue::unlock_data(const char *func, uint line) { - DBUG_ENTER("Event_queue::UNLOCK_mutex"); - DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", - &LOCK_event_queue, func, line)); + DBUG_ENTER("Event_queue::unlock_mutex"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); mutex_last_unlocked_at_line= line; mutex_queue_data_locked= FALSE; mutex_last_unlocked_in_func= func; @@ -510,7 +496,7 @@ Event_queue::events_count() LOCK_QUEUE_DATA(); n= queue.elements; UNLOCK_QUEUE_DATA(); - + DBUG_PRINT("info", ("n=%u", n)); DBUG_RETURN(n); } @@ -529,7 +515,7 @@ uint Event_queue::events_count_no_lock() { uint n; - DBUG_ENTER("Event_scheduler::events_count_no_lock"); + DBUG_ENTER("Event_queue::events_count_no_lock"); n= queue.elements; @@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd) } DBUG_PRINT("info", ("Loading event from row.")); - if ((ret= et->load_from_row(&scheduler_root, table))) + if ((ret= et->load_from_row(table))) { clean_the_queue= TRUE; sql_print_error("SCHEDULER: Error while loading from mysql.event. " @@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd) void Event_queue::init_mutexes() { - pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST); } @@ -743,13 +729,13 @@ Event_queue::init_mutexes() Destroys mutexes. SYNOPSIS - Event_queue::destroy_mutexes() + Event_queue::deinit_mutexes() */ void -Event_queue::destroy_mutexes() +Event_queue::deinit_mutexes() { - pthread_mutex_destroy(&singleton->LOCK_event_queue); + pthread_mutex_destroy(&LOCK_event_queue); } @@ -765,8 +751,8 @@ void Event_queue::on_queue_change() { DBUG_ENTER("Event_queue::on_queue_change"); - DBUG_PRINT("info", ("Sending COND_new_work")); - singleton->queue_changed(); + DBUG_PRINT("info", ("Signalling change of the queue")); + scheduler->queue_changed(); DBUG_VOID_RETURN; } @@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo) { int i= 0; bool ret= FALSE; - DBUG_ENTER("Event_scheduler::init"); + DBUG_ENTER("Event_queue::init"); DBUG_PRINT("enter", ("this=%p", this)); LOCK_QUEUE_DATA(); db_repository= db_repo; - /* init memory root */ - init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 30 /*auto_extent*/)) @@ -824,8 +808,8 @@ Event_queue::deinit() DBUG_ENTER("Event_queue::deinit"); LOCK_QUEUE_DATA(); + empty_queue(); delete_queue(&queue); - free_root(&scheduler_root, MYF(0)); UNLOCK_QUEUE_DATA(); DBUG_VOID_RETURN; @@ -835,7 +819,7 @@ Event_queue::deinit() void Event_queue::recalculate_queue(THD *thd) { - int i; + uint i; for (i= 0; i < queue.elements; i++) { ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); @@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd) void Event_queue::empty_queue() { - int i; + uint i; /* empty the queue */ for (i= 0; i < events_count_no_lock(); ++i) { Event_timed *et= (Event_timed *) queue_element(&queue, i); - et->free_sp(); delete et; } resize_queue(&queue, 0); } + + +Event_timed* +Event_queue::get_top() +{ + return (Event_timed *)queue_top(&queue); +} + + +void +Event_queue::remove_top() +{ + queue_remove(&queue, 0);// 0 is top, internally 1 +} + + +void +Event_queue::top_changed() +{ + queue_replaced(&queue); +} + + +Event_timed * +Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, + struct timespec *abstime) +{ + struct timespec top_time; + Event_timed *et_new= NULL; + DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); + DBUG_PRINT("enter", ("thd=%p now=%d", thd, now)); + abstime->tv_nsec= 0; + LOCK_QUEUE_DATA(); + do { + int res; + Event_timed *et= NULL; + if (!queue.elements) + { + abstime->tv_sec= 0; + break; + } + int i; + DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); + for (i = 0; i < queue.elements; i++) + { + et= ((Event_timed*)queue_element(&queue, i)); + DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); + DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " + " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", + TIME_to_ulonglong_datetime(&et->execute_at), + TIME_to_ulonglong_datetime(&et->starts), + TIME_to_ulonglong_datetime(&et->ends), + et->expression, sec_since_epoch_TIME(&et->execute_at), now, + (int)(sec_since_epoch_TIME(&et->execute_at) - now), + sec_since_epoch_TIME(&et->execute_at) <= now)); + } + et= ((Event_timed*)queue_element(&queue, 0)); + top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); + + if (top_time.tv_sec <= now) + { + DBUG_PRINT("info", ("Ready for execution")); + abstime->tv_sec= 0; + if ((res= db_repository->load_named_event(thd, et->dbname, et->name, + &et_new))) + { + DBUG_ASSERT(0); + break; + } + + et->mark_last_executed(thd); + if (et->compute_next_execution_time()) + et->status= Event_timed::DISABLED; + DBUG_PRINT("info", ("event's status is %d", et->status)); + + et->update_fields(thd); + if (((et->execute_at.year && !et->expression) || et->execute_at_null) || + (et->status == Event_timed::DISABLED)) + { + DBUG_PRINT("info", ("removing from the queue")); + if (et->dropped) + et->drop(thd); + delete et; + queue_remove(&queue, 0); + } + else + queue_replaced(&queue); + } + else + { + abstime->tv_sec= top_time.tv_sec; + DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, + abstime->tv_sec)); + } + } while (0); + UNLOCK_QUEUE_DATA(); + + DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new, + abstime->tv_sec)); + if (et_new) + DBUG_PRINT("info", ("db=%s name=%s definer=%s " + "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str, + et_new->definer.str, + TIME_to_ulonglong_datetime(&et_new->execute_at))); + DBUG_RETURN(et_new); +} |