diff options
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 249 |
1 files changed, 84 insertions, 165 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 44920b29c16..b42372ba6bd 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -90,34 +90,29 @@ Event_queue::Event_queue() RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk + OP_ALREADY_EXISTS Event already in the queue */ int -Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) +Event_queue::create_event(THD *thd, Event_parse_data *et) { int res; Event_timed *et_new; DBUG_ENTER("Event_queue::create_event"); DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue)); + res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new); LOCK_QUEUE_DATA(); - if (check_existence && find_event(et->dbname, et->name, FALSE)) - { - res= OP_ALREADY_EXISTS; - goto end; - } - - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) + if (!res) { DBUG_PRINT("info", ("new event in the queue %p", et_new)); queue_insert_safe(&queue, (byte *) et_new); - on_queue_change(); + notify_observers(); } else if (res == OP_DISABLED_EVENT) res= OP_OK; -end: UNLOCK_QUEUE_DATA(); + DBUG_RETURN(res); } @@ -129,104 +124,54 @@ end: Event_queue::update_event() thd Thread et The event to replace(add) into the queue - new_schema New schema - new_name New name + new_schema New schema, in case of RENAME TO + new_name New name, in case of RENAME TO RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk - OP_ALREADY_EXISTS Event already in the queue */ int Event_queue::update_event(THD *thd, Event_parse_data *et, - LEX_STRING *new_schema, - LEX_STRING *new_name) + LEX_STRING *new_schema, LEX_STRING *new_name) { - int res= OP_OK; - Event_timed *et_old, *et_new= NULL; - LEX_STRING old_schema, old_name; - - LINT_INIT(old_schema.str); - LINT_INIT(old_schema.length); - LINT_INIT(old_name.str); - LINT_INIT(old_name.length); + int res; + Event_timed *et_old= NULL, *et_new= NULL; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", thd, et, et->dbname.str, et->name.str, &LOCK_event_queue)); + res= db_repository-> + load_named_event_timed(thd, new_schema?*new_schema:et->dbname, + new_name? *new_name:et->name, &et_new); + + if (res && res != OP_DISABLED_EVENT) + goto end; + LOCK_QUEUE_DATA(); if (!(et_old= find_event(et->dbname, et->name, TRUE))) + { DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", et->dbname.str, et->name.str)); - - if (new_schema && new_name) - { - old_schema= et->dbname; - old_name= et->name; - et->dbname= *new_schema; - et->name= *new_name; } - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) + if (!res) { DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old)); queue_insert_safe(&queue, (byte *) et_new); - on_queue_change(); } else if (res == OP_DISABLED_EVENT) res= OP_OK; - - if (new_schema && new_name) - { - et->dbname= old_schema; - et->name= old_name; - } - DBUG_PRINT("info", ("res=%d", res)); UNLOCK_QUEUE_DATA(); - /* - Andrey: Is this comment still truthful ??? - - We don't move this code above because a potential kill_thread will call - THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, - which is LOCK_event_queue on which the COND_new_work in ::run() locks. - Hence, we try to acquire a lock which we have already acquired and we run - into an assert. Holding LOCK_event_queue however is not needed because - we don't touch any invariant of the scheduler anymore. ::drop_event() does - the same. - */ - if (et_old) - { - switch (et_old->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - DBUG_ASSERT(0); - } - } + notify_observers(); + + if (et_old) + delete et_old; +end: + DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); } @@ -256,40 +201,13 @@ Event_queue::drop_event(THD *thd, sp_name *name) LOCK_QUEUE_DATA(); if (!(et_old= find_event(name->m_db, name->m_name, TRUE))) DBUG_PRINT("info", ("No such event found, probably DISABLED")); - UNLOCK_QUEUE_DATA(); - - /* See comments in ::replace_event() why this is split in two parts. */ if (et_old) - { - switch ((res= et_old->kill_thread(thd))) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - sql_print_error("SCHEDULER: Got unexpected error %d", res); - DBUG_ASSERT(0); - } - } + delete et_old; + /* + We don't signal here because the scheduler will catch the change + next time it wakes up. + */ DBUG_RETURN(FALSE); } @@ -361,7 +279,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); - uint i= 0, dropped= 0; + uint i= 0; while (i < queue.elements) { Event_timed *et= (Event_timed *) queue_element(&queue, i); @@ -375,32 +293,22 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, counter and the (i < queue.elements) condition is ok. */ queue_remove(&queue, i); - - /* See replace_event() */ - switch (et->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et->flags |= EVENT_FREE_WHEN_FINISHED; - ++dropped; - break; - case 0: - delete et; - ++dropped; - break; - default: - DBUG_ASSERT(0); - } + delete et; } else i++; } - DBUG_PRINT("info", ("Dropped %lu", dropped)); /* - Don't send COND_new_work because no need to wake up the scheduler thread. - When it wakes next time up it will recalculate how much more it should - sleep if the top of the queue has been changed by this method. + We don't call notify_observers() . If we remove the top event: + 1. The queue is empty. The scheduler will wake up at some time and realize + that the queue is empty. If create_event() comes inbetween it will + signal the scheduler + 2. The queue is not empty, but the next event after the previous top, won't + be executed any time sooner than the element we removed. Hence, we may + not notify the scheduler and it will realize the change when it + wakes up from timedwait. */ - + DBUG_VOID_RETURN; } @@ -418,16 +326,14 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, >=0 Number of dropped events */ -int +void Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) { - int ret; DBUG_ENTER("Event_queue::drop_schema_events"); LOCK_QUEUE_DATA(); drop_matching_events(thd, schema, event_timed_db_equal); UNLOCK_QUEUE_DATA(); - - DBUG_RETURN(ret); + DBUG_VOID_RETURN; } @@ -744,13 +650,13 @@ Event_queue::deinit_mutexes() its state. SYNOPSIS - Event_queue::on_queue_change() + Event_queue::notify_observers() */ void -Event_queue::on_queue_change() +Event_queue::notify_observers() { - DBUG_ENTER("Event_queue::on_queue_change"); + DBUG_ENTER("Event_queue::notify_observers"); DBUG_PRINT("info", ("Signalling change of the queue")); scheduler->queue_changed(); DBUG_VOID_RETURN; @@ -761,7 +667,7 @@ Event_queue::on_queue_change() The implementation of full-fledged initialization. SYNOPSIS - Event_scheduler::init() + Event_queue::init() RETURN VALUE FALSE OK @@ -769,15 +675,16 @@ Event_queue::on_queue_change() */ bool -Event_queue::init(Event_db_repository *db_repo) +Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) { int i= 0; bool ret= FALSE; - DBUG_ENTER("Event_queue::init"); + DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this=%p", this)); LOCK_QUEUE_DATA(); db_repository= db_repo; + scheduler= sched; if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 30 /*auto_extent*/)) @@ -803,9 +710,9 @@ end: void -Event_queue::deinit() +Event_queue::deinit_queue() { - DBUG_ENTER("Event_queue::deinit"); + DBUG_ENTER("Event_queue::deinit_queue"); LOCK_QUEUE_DATA(); empty_queue(); @@ -833,6 +740,8 @@ void Event_queue::empty_queue() { uint i; + DBUG_ENTER("Event_queue::empty_queue"); + DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); /* empty the queue */ for (i= 0; i < events_count_no_lock(); ++i) { @@ -840,6 +749,7 @@ Event_queue::empty_queue() delete et; } resize_queue(&queue, 0); + DBUG_VOID_RETURN; } @@ -864,6 +774,29 @@ Event_queue::top_changed() } +inline void +Event_queue::dbug_dump_queue(time_t now) +{ +#ifndef DBUG_OFF + Event_timed *et; + uint i; + DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); + for (i = 0; i < queue.elements; i++) + { + et= ((Event_timed*)queue_element(&queue, i)); + DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); + DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " + " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", + TIME_to_ulonglong_datetime(&et->execute_at), + TIME_to_ulonglong_datetime(&et->starts), + TIME_to_ulonglong_datetime(&et->ends), + et->expression, sec_since_epoch_TIME(&et->execute_at), now, + (int)(sec_since_epoch_TIME(&et->execute_at) - now), + sec_since_epoch_TIME(&et->execute_at) <= now)); + } +#endif +} + Event_timed * Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime) @@ -876,36 +809,22 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, LOCK_QUEUE_DATA(); do { int res; - Event_timed *et= NULL; if (!queue.elements) { abstime->tv_sec= 0; break; } - int i; - DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); - for (i = 0; i < queue.elements; i++) - { - et= ((Event_timed*)queue_element(&queue, i)); - DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); - DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " - " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", - TIME_to_ulonglong_datetime(&et->execute_at), - TIME_to_ulonglong_datetime(&et->starts), - TIME_to_ulonglong_datetime(&et->ends), - et->expression, sec_since_epoch_TIME(&et->execute_at), now, - (int)(sec_since_epoch_TIME(&et->execute_at) - now), - sec_since_epoch_TIME(&et->execute_at) <= now)); - } - et= ((Event_timed*)queue_element(&queue, 0)); + dbug_dump_queue(now); + + Event_timed *et= ((Event_timed*)queue_element(&queue, 0)); top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); if (top_time.tv_sec <= now) { DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; - if ((res= db_repository->load_named_event(thd, et->dbname, et->name, - &et_new))) + if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name, + &et_new))) { DBUG_ASSERT(0); break; |