diff options
author | unknown <andrey@lmy004.> | 2006-07-11 18:28:15 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-11 18:28:15 +0200 |
commit | 42a8e2c9421854710679a0f6c3ceef6c0777ded4 (patch) | |
tree | f2fc5508efd161a1ef60e481f22fe90017197fa1 /sql/event_queue.cc | |
parent | 084f74426b5f19b47984ef298309e9a4015940c3 (diff) | |
download | mariadb-git-42a8e2c9421854710679a0f6c3ceef6c0777ded4.tar.gz |
WL#3337 (Event scheduler new architecture)
More small fixes to the API : use LEX_STRING instead of LEX_STRING* and if error
then return bool(true) instead of error code.
Merged functions. Reduced usage of sp_name.
Fixed a lot of function documentation errors.
Added function documentation wherever needed.
Removed some unused defines and error codes.
Next to come is batch rename of Event_scheduler_ng to Event_scheduler.
mysql-test/r/events.result:
update result
mysql-test/r/events_logs_tests.result:
update result
mysql-test/t/events.test:
more test coverage
mysql-test/t/events_logs_tests.test:
fix test
sql/event_data_objects.cc:
Cosmetics.
Fix function documentation whenever needed.
Move Event_job_data::compile() next to Event_job_data::execute()
sql/event_data_objects.h:
Remove unneeded error codes and defines
Move function declarations at the end of the header
sql/event_db_repository.cc:
Fix function documentation.
Event_db_repository::update_event() now uses LEX_STRING *-s instead of
sp_name . Lower coupling.
sql/event_db_repository.h:
Event_db_repository::update_event() now uses LEX_STRING *-s instead of
sp_name . Lower coupling.
find_event -> find_named_event
find_event_by_name is not used externally, merge with load_named_event()
sql/event_queue.cc:
LEX_STRING* to LEX_STRING
Fix comments.
Fix and add function documentation.
Remove Event_queue::events_count() as it is unused
Change get_top_for_execution_if_time() to return status code as return value
and the object is in out parameter.
sql/event_queue.h:
LEX_STRING* to LEX_STRING
Fix comments.
Fix and add function documentation.
Remove Event_queue::events_count() as it is unused
Change get_top_for_execution_if_time() to return status code as return value
and the object is in out parameter.
Try to detect also lock attemptions for deadlocks.
sql/event_scheduler_ng.cc:
Always execute on thd->mem_root
Fix according to changed API of Event_queue::get_top_for_execution_if_time()
sql/events.cc:
Fix function documentation.
Fix code after API changes of internal Event module classes.
sql/events.h:
sp_name -> LEX_STRINGs
sql/sql_parse.cc:
Fix according to changed API of Events::show_create_event()
sql/sql_yacc.yy:
Don't pass NULL as third parameter to sp_head::init_strings()
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 371 |
1 files changed, 209 insertions, 162 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 0fbde5d8910..63dee303fc2 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -15,13 +15,14 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "mysql_priv.h" -#include "events.h" -#include "event_scheduler_ng.h" #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" -#include "sp_head.h" +#include "event_scheduler_ng.h" + +#define EVENT_QUEUE_INITIAL_SIZE 30 +#define EVENT_QUEUE_EXTENT 30 #ifdef __GNUC__ #if __GNUC__ >= 2 @@ -36,21 +37,20 @@ /* - Compares the execute_at members of 2 Event_queue_element instances. + Compares the execute_at members of two Event_queue_element instances. Used as callback for the prioritized queue when shifting elements inside. SYNOPSIS event_queue_element_data_compare_q() - - vptr - not used (set it to NULL) - a - first Event_queue_element object - b - second Event_queue_element object + vptr Not used (set it to NULL) + a First Event_queue_element object + b Second Event_queue_element object RETURN VALUE - -1 - a->execute_at < b->execute_at - 0 - a->execute_at == b->execute_at - 1 - a->execute_at > b->execute_at + -1 a->execute_at < b->execute_at + 0 a->execute_at == b->execute_at + 1 a->execute_at > b->execute_at NOTES execute_at.second_part is not considered during comparison @@ -73,9 +73,13 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b) Event_queue::Event_queue() { - mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0; - mutex_last_unlocked_in_func= mutex_last_locked_in_func= ""; - mutex_queue_data_locked= FALSE; + mutex_last_unlocked_at_line= mutex_last_locked_at_line= + mutex_last_attempted_lock_at_line= 0; + + mutex_last_unlocked_in_func= mutex_last_locked_in_func= + mutex_last_attempted_lock_in_func= ""; + + mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE; } @@ -108,24 +112,6 @@ Event_queue::deinit_mutexes() /* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_queue::notify_observers() -*/ - -void -Event_queue::notify_observers() -{ - DBUG_ENTER("Event_queue::notify_observers"); - DBUG_PRINT("info", ("Signalling change of the queue")); - scheduler->queue_changed(); - DBUG_VOID_RETURN; -} - - -/* Inits the queue SYNOPSIS @@ -148,8 +134,9 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) db_repository= db_repo; scheduler= sched; - if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, - event_queue_element_compare_q, NULL, 30 /*auto_extent*/)) + if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, + 0 /*smallest_on_top*/, event_queue_element_compare_q, + NULL, EVENT_QUEUE_EXTENT)) { sql_print_error("SCHEDULER: Can't initialize the execution queue"); ret= TRUE; @@ -172,7 +159,8 @@ end: /* - Deinits the queue + Deinits the queue. Remove all elements from it and destroys them + too. SYNOPSIS Event_queue::deinit_queue() @@ -193,12 +181,12 @@ Event_queue::deinit_queue() /* - Creates an event in the scheduler queue + Adds an event to the queue. SYNOPSIS Event_queue::create_event() - et The event to add - check_existence Whether to check if already loaded. + dbname The schema of the new event + name The name of the new event RETURN VALUE OP_OK OK or scheduler not working @@ -209,21 +197,21 @@ int Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) { int res; - Event_queue_element *element_new; + Event_queue_element *new_element; DBUG_ENTER("Event_queue::create_event"); DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str)); - element_new= new Event_queue_element(); - res= db_repository->load_named_event(thd, dbname, name, element_new); - if (res || element_new->status == Event_queue_element::DISABLED) - delete element_new; + new_element= new Event_queue_element(); + res= db_repository->load_named_event(thd, dbname, name, new_element); + if (res || new_element->status == Event_queue_element::DISABLED) + delete new_element; else { - element_new->compute_next_execution_time(); + new_element->compute_next_execution_time(); LOCK_QUEUE_DATA(); - DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new)); - queue_insert_safe(&queue, (byte *) element_new); + DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); + queue_insert_safe(&queue, (byte *) new_element); UNLOCK_QUEUE_DATA(); notify_observers(); @@ -254,53 +242,54 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name) { int res; - Event_queue_element *element_old= NULL, - *element_new; + Event_queue_element *old_element= NULL, + *new_element; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str)); - element_new= new Event_queue_element(); + new_element= new Event_queue_element(); res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname, - new_name? *new_name:name, element_new); + new_name? *new_name:name, new_element); if (res) { - delete element_new; + delete new_element; goto end; } - else if (element_new->status == Event_queue_element::DISABLED) + else if (new_element->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("The event is disabled.")); /* Destroy the object but don't skip to end: because we may have to remove object from the cache. */ - delete element_new; - element_new= NULL; + delete new_element; + new_element= NULL; } else - element_new->compute_next_execution_time(); + new_element->compute_next_execution_time(); LOCK_QUEUE_DATA(); - if (!(element_old= find_event(dbname, name, TRUE))) + if (!(old_element= find_n_remove_event(dbname, name))) { DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED", dbname.str, name.str)); } /* If not disabled event */ - if (element_new) + if (new_element) { DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx", - element_new, element_old)); - queue_insert_safe(&queue, (byte *) element_new); + new_element, old_element)); + queue_insert_safe(&queue, (byte *) new_element); } UNLOCK_QUEUE_DATA(); - notify_observers(); + if (new_element) + notify_observers(); - if (element_old) - delete element_old; + if (old_element) + delete old_element; end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); @@ -326,7 +315,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name)); LOCK_QUEUE_DATA(); - element= find_event(dbname, name, TRUE); + element= find_n_remove_event(dbname, name); UNLOCK_QUEUE_DATA(); if (element) @@ -344,48 +333,6 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) /* - Searches for an event in the queue - - SYNOPSIS - Event_queue::find_event() - db The schema of the event to find - name The event to find - remove_from_q If found whether to remove from the Q - - RETURN VALUE - NULL Not found - otherwise Address - - NOTE - The caller should do the locking also the caller is responsible for - actual signalling in case an event is removed from the queue - (signalling COND_new_work for instance). -*/ - -Event_queue_element * -Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) -{ - uint i; - DBUG_ENTER("Event_queue::find_event"); - - for (i= 0; i < queue.elements; ++i) - { - Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); - DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str, - et->dbname.str, et->name.str)); - if (event_basic_identifier_equal(db, name, et)) - { - if (remove_from_q) - queue_remove(&queue, i); - DBUG_RETURN(et); - } - } - - DBUG_RETURN(NULL); -} - - -/* Drops all events from the in-memory queue and disk that match certain pattern evaluated by a comparator function @@ -404,7 +351,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) void Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, - bool (*comparator)(LEX_STRING *, Event_basic *)) + bool (*comparator)(LEX_STRING, Event_basic *)) { DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); @@ -414,7 +361,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, { Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); - if (comparator(&pattern, et)) + if (comparator(pattern, et)) { /* The queue is ordered. If we remove an element, then all elements after @@ -468,25 +415,59 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) /* - Returns the number of elements in the queue + Signals the observers (the main scheduler thread) that the + state of the queue has been changed. SYNOPSIS - Event_queue::events_count() + Event_queue::notify_observers() +*/ + +void +Event_queue::notify_observers() +{ + DBUG_ENTER("Event_queue::notify_observers"); + DBUG_PRINT("info", ("Signalling change of the queue")); + scheduler->queue_changed(); + DBUG_VOID_RETURN; +} + + +/* + Searches for an event in the queue + + SYNOPSIS + Event_queue::find_n_remove_event() + db The schema of the event to find + name The event to find RETURN VALUE - Number of Event_queue_element objects in the queue + NULL Not found + otherwise Address + + NOTE + The caller should do the locking also the caller is responsible for + actual signalling in case an event is removed from the queue. */ -uint -Event_queue::events_count() +Event_queue_element * +Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) { - uint n; - DBUG_ENTER("Event_scheduler::events_count"); - LOCK_QUEUE_DATA(); - n= queue.elements; - UNLOCK_QUEUE_DATA(); - DBUG_PRINT("info", ("n=%u", n)); - DBUG_RETURN(n); + uint i; + DBUG_ENTER("Event_queue::find_n_remove_event"); + + for (i= 0; i < queue.elements; ++i) + { + Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str, + et->dbname.str, et->name.str)); + if (event_basic_identifier_equal(db, name, et)) + { + queue_remove(&queue, i); + DBUG_RETURN(et); + } + } + + DBUG_RETURN(NULL); } @@ -620,6 +601,11 @@ end: SYNOPSIS Event_queue::check_system_tables() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Error */ bool @@ -738,6 +724,14 @@ Event_queue::empty_queue() } +/* + Dumps the queue to the trace log. + + SYNOPSIS + Event_queue::dbug_dump_queue() + now Current timestamp +*/ + inline void Event_queue::dbug_dump_queue(time_t now) { @@ -761,12 +755,37 @@ Event_queue::dbug_dump_queue(time_t now) #endif } -Event_job_data * + +/* + Checks whether the top of the queue is elligible for execution and + returns an Event_job_data instance in case it should be executed. + `now` is compared against `execute_at` of the top element in the queue. + + SYNOPSIS + Event_queue::dbug_dump_queue() + thd [in] Thread + now [in] Current timestamp + job_data [out] The object to execute + abstime [out] Time to sleep + + RETURN VALUE + FALSE No error. If *job_data==NULL then top not elligible for execution. + Could be that there is no top. If abstime->tv_sec is set to value + greater than zero then use abstime with pthread_cond_timedwait(). + If abstime->tv_sec is zero then sleep with pthread_cond_wait(). + abstime->tv_nsec is always zero. + TRUE Error + +*/ + +bool Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, + Event_job_data **job_data, struct timespec *abstime) { + bool ret= FALSE; struct timespec top_time; - Event_job_data *et_new= NULL; + *job_data= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); abstime->tv_nsec= 0; @@ -780,56 +799,58 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, } dbug_dump_queue(now); - Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0)); - top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); + Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); - if (top_time.tv_sec <= now) - { - DBUG_PRINT("info", ("Ready for execution")); - abstime->tv_sec= 0; - et_new= new Event_job_data(); - if ((res= db_repository->load_named_event(thd, et->dbname, et->name, - et_new))) - { - delete et_new; - et_new= NULL; - DBUG_ASSERT(0); - break; - } - - et->mark_last_executed(thd); - if (et->compute_next_execution_time()) - et->status= Event_queue_element::DISABLED; - DBUG_PRINT("info", ("event's status is %d", et->status)); - - et->update_timing_fields(thd); - if (((et->execute_at.year && !et->expression) || et->execute_at_null) || - (et->status == Event_queue_element::DISABLED)) - { - DBUG_PRINT("info", ("removing from the queue")); - if (et->dropped) - et->drop(thd); - delete et; - queue_remove(&queue, 0); - } - else - queue_replaced(&queue); - } - else + top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); + + if (top_time.tv_sec > now) { abstime->tv_sec= top_time.tv_sec; DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, abstime->tv_sec)); + break; } + + DBUG_PRINT("info", ("Ready for execution")); + abstime->tv_sec= 0; + *job_data= new Event_job_data(); + if ((res= db_repository->load_named_event(thd, top->dbname, top->name, + *job_data))) + { + delete *job_data; + *job_data= NULL; + ret= TRUE; + break; + } + + top->mark_last_executed(thd); + if (top->compute_next_execution_time()) + top->status= Event_queue_element::DISABLED; + DBUG_PRINT("info", ("event's status is %d", top->status)); + + top->update_timing_fields(thd); + if (((top->execute_at.year && !top->expression) || top->execute_at_null) || + (top->status == Event_queue_element::DISABLED)) + { + DBUG_PRINT("info", ("removing from the queue")); + if (top->dropped) + top->drop(thd); + delete top; + queue_remove(&queue, 0); + } + else + queue_replaced(&queue); } while (0); UNLOCK_QUEUE_DATA(); - DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new, - abstime->tv_sec)); - if (et_new) - DBUG_PRINT("info", ("db=%s name=%s definer=%s", - et_new->dbname.str, et_new->name.str, et_new->definer.str)); - DBUG_RETURN(et_new); + DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ", + ret, *job_data, abstime->tv_sec)); + + if (*job_data) + DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str, + (*job_data)->name.str, (*job_data)->definer.str)); + + DBUG_RETURN(ret); } @@ -848,10 +869,18 @@ Event_queue::lock_data(const char *func, uint line) { DBUG_ENTER("Event_queue::lock_data"); DBUG_PRINT("enter", ("func=%s line=%u", func, line)); + mutex_last_attempted_lock_in_func= func; + mutex_last_attempted_lock_at_line= line; + mutex_queue_data_attempting_lock= TRUE; pthread_mutex_lock(&LOCK_event_queue); + mutex_last_attempted_lock_in_func= ""; + mutex_last_attempted_lock_at_line= 0; + mutex_queue_data_attempting_lock= FALSE; + mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; mutex_queue_data_locked= TRUE; + DBUG_VOID_RETURN; } @@ -921,6 +950,13 @@ Event_queue::dump_internal_status(THD *thd) protocol->store(&int_string); ret= protocol->write(); + /* queue_data_attempting_lock */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue data attempting lock"), scs); + int_string.set((longlong) mutex_queue_data_attempting_lock, scs); + protocol->store(&int_string); + ret= protocol->write(); + /* last locked at*/ protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("queue last locked at"), scs); @@ -940,6 +976,17 @@ Event_queue::dump_internal_status(THD *thd) mutex_last_unlocked_at_line)); protocol->store(&tmp_string); ret= protocol->write(); + + /* last attempted lock at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue last attempted lock at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_attempted_lock_in_func, + mutex_last_attempted_lock_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + #endif DBUG_RETURN(FALSE); } |