diff options
author | unknown <andrey@lmy004.> | 2006-07-10 13:44:43 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-10 13:44:43 +0200 |
commit | 974eecc246db18e29e5ef06db6f48a0f4b3f7951 (patch) | |
tree | 692560cb267120b449f56b1df9db9a918e5c7477 /sql/event_queue.cc | |
parent | b9a7fe2757d9040296311e96a9e2740416d181f2 (diff) | |
download | mariadb-git-974eecc246db18e29e5ef06db6f48a0f4b3f7951.tar.gz |
WL#3337 (Event scheduler new architecture)
This patch introduces specialized Event data objects
Event_basic as parent.
Event_queue_element used for queue storage
Event_timed used for SHOW EVENTS/ I_S.EVENTS / SHOW CREATE EVENT
Event_job_data using during execution.
Methods were moved out of Event_timed to other classes.
This patch also introduces Events::LOCK_event_metadata.
This patch gives new implementation of Events::dump_internal_status().
Now both the Event_scheduler and Event_queue return information during
their ::dump_internal_status().
Shortened a bit the runtime for executing events test cases.
mysql-test/r/events.result:
update results
mysql-test/r/events_bugs.result:
update results
mysql-test/r/events_logs_tests.result:
update results
mysql-test/r/events_scheduling.result:
update results
mysql-test/t/events.test:
update test
make --sleep more appropriate . saving some time could mean failure on loaded boxes though :(
add tests for previously uncovered branches.
mysql-test/t/events_bugs.test:
update test
make --sleep more appropriate . saving some time could mean failure on loaded boxes though :(
add tests for previously uncovered branches.
mysql-test/t/events_logs_tests.test:
make the test shorter by time
mysql-test/t/events_scheduling.test:
when selecting always use ORDER BY
mysql-test/t/events_stress.test:
sleep 2.5secs for shorter stress test
sql/event_data_objects.cc:
Event_timed is no more used during execution.
Event_timed is no more used during in the memory queue.
Event_timed is only used for SHOW CREATE EVENT/ I_S.EVENTS/ SHOW EVENTS
Event_basic is the parent of almost all Event data objects.
Event_basic -> Event_queue_element (used for the memory queue) -> Event_timed
Event_basic -> Event_job_data (the object used for execution)
Sql_alloc -> Event_parse_data (used during parsing)
sql/event_data_objects.h:
Event_timed is no more used during execution.
Event_timed is no more used during in the memory queue.
Event_timed is only used for SHOW CREATE EVENT/ I_S.EVENTS/ SHOW EVENTS
Event_basic is the parent of almost all Event data objects.
Event_basic -> Event_queue_element (used for the memory queue) -> Event_timed
Event_basic -> Event_job_data (the object used for execution)
Sql_alloc -> Event_parse_data (used during parsing)
sql/event_db_repository.cc:
Cosmetics.
load_named_event now uses Event_basic, for polymorphism
find_event uses Event_basic, to be polymorphic.
use Field **fields= table->field and then index fields[...]
Add documentation.
Fix documentation.
sql/event_db_repository.h:
Event_db_repository depends only on Event_basic's interface
sql/event_queue.cc:
Cosmetics.
Don't use Event_timed for the queue and giving back object for execution.
Event_queue_element is for the queue, Event_job_data is for execution.
Add Event_queue::dump_internal_status() for SHOW SCHEDULER STATUS command
sql/event_queue.h:
Cosmetics.
Don't use Event_timed for the queue and giving back object for execution.
Event_queue_element is for the queue, Event_job_data is for execution.
Add Event_queue::dump_internal_status() for SHOW SCHEDULER STATUS command
sql/event_scheduler_ng.cc:
Add back Event_scheduler::cond_wait()
Add back Event_scheduler::dump_internal_status()
Using Event_job_data for execution. Make the scheduler thread unkillable (thd->command= COM_DAEMON).
Add a lot of documentation.
sql/event_scheduler_ng.h:
Add back Event_scheduler::cond_wait()
Add back Event_scheduler::dump_internal_status()
Using Event_job_data for execution.
sql/events.cc:
Documentation
Add LOCK_event_metadata
sql/events.h:
Change the signature of Events::drop_event() not to use sp_name but LEX_STRING
sql/share/errmsg.txt:
Fix error message
sql/sql_parse.cc:
Events::drop_event() has new signature
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 653 |
1 files changed, 365 insertions, 288 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index b42372ba6bd..bd8809ba708 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -36,16 +36,16 @@ /* - Compares the execute_at members of 2 Event_timed instances. + Compares the execute_at members of 2 Event_queue_element instances. Used as callback for the prioritized queue when shifting elements inside. SYNOPSIS - event_timed_compare_q() + event_queue_element_data_compare_q() vptr - not used (set it to NULL) - a - first Event_timed object - b - second Event_timed object + a - first Event_queue_element object + b - second Event_queue_element object RETURN VALUE -1 - a->execute_at < b->execute_at @@ -57,14 +57,13 @@ */ static int -event_timed_compare_q(void *vptr, byte* a, byte *b) +event_queue_element_compare_q(void *vptr, byte* a, byte *b) { - return my_time_compare(&((Event_timed *)a)->execute_at, - &((Event_timed *)b)->execute_at); + return my_time_compare(&((Event_queue_element *)a)->execute_at, + &((Event_queue_element *)b)->execute_at); } - /* Constructor of class Event_queue. @@ -79,6 +78,120 @@ Event_queue::Event_queue() mutex_queue_data_locked= FALSE; } + +/* + Inits mutexes. + + SYNOPSIS + Event_queue::init_mutexes() +*/ + +void +Event_queue::init_mutexes() +{ + pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST); +} + + +/* + Destroys mutexes. + + SYNOPSIS + Event_queue::deinit_mutexes() +*/ + +void +Event_queue::deinit_mutexes() +{ + pthread_mutex_destroy(&LOCK_event_queue); +} + + +/* + Signals the main scheduler thread that the queue has changed + its state. + + SYNOPSIS + Event_queue::notify_observers() +*/ + +void +Event_queue::notify_observers() +{ + DBUG_ENTER("Event_queue::notify_observers"); + DBUG_PRINT("info", ("Signalling change of the queue")); + scheduler->queue_changed(); + DBUG_VOID_RETURN; +} + + +/* + Inits the queue + + SYNOPSIS + Event_queue::init() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) +{ + int i= 0; + bool ret= FALSE; + DBUG_ENTER("Event_queue::init_queue"); + DBUG_PRINT("enter", ("this=0x%lx", this)); + + LOCK_QUEUE_DATA(); + db_repository= db_repo; + scheduler= sched; + + if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, + event_queue_element_compare_q, NULL, 30 /*auto_extent*/)) + { + sql_print_error("SCHEDULER: Can't initialize the execution queue"); + ret= TRUE; + goto end; + } + + if (sizeof(my_time_t) != sizeof(time_t)) + { + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." + "The scheduler may not work correctly. Stopping."); + DBUG_ASSERT(0); + ret= TRUE; + goto end; + } + +end: + UNLOCK_QUEUE_DATA(); + DBUG_RETURN(ret); +} + + +/* + Deinits the queue + + SYNOPSIS + Event_queue::deinit_queue() +*/ + +void +Event_queue::deinit_queue() +{ + DBUG_ENTER("Event_queue::deinit_queue"); + + LOCK_QUEUE_DATA(); + empty_queue(); + delete_queue(&queue); + UNLOCK_QUEUE_DATA(); + + DBUG_VOID_RETURN; +} + + /* Creates an event in the scheduler queue @@ -90,28 +203,31 @@ Event_queue::Event_queue() RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk - OP_ALREADY_EXISTS Event already in the queue */ int -Event_queue::create_event(THD *thd, Event_parse_data *et) +Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) { int res; - Event_timed *et_new; + Event_queue_element *element_new; DBUG_ENTER("Event_queue::create_event"); - DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue)); + DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str)); - res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new); - LOCK_QUEUE_DATA(); - if (!res) + element_new= new Event_queue_element(); + res= db_repository->load_named_event(thd, dbname, name, element_new); + if (res || element_new->status == Event_queue_element::DISABLED) + delete element_new; + else { - DBUG_PRINT("info", ("new event in the queue %p", et_new)); - queue_insert_safe(&queue, (byte *) et_new); + element_new->compute_next_execution_time(); + + LOCK_QUEUE_DATA(); + DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new)); + queue_insert_safe(&queue, (byte *) element_new); + UNLOCK_QUEUE_DATA(); + notify_observers(); } - else if (res == OP_DISABLED_EVENT) - res= OP_OK; - UNLOCK_QUEUE_DATA(); DBUG_RETURN(res); } @@ -123,9 +239,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et) SYNOPSIS Event_queue::update_event() thd Thread - et The event to replace(add) into the queue - new_schema New schema, in case of RENAME TO - new_name New name, in case of RENAME TO + dbname Schema of the event + name Name of the event + new_schema New schema, in case of RENAME TO, otherwise NULL + new_name New name, in case of RENAME TO, otherwise NULL RETURN VALUE OP_OK OK or scheduler not working @@ -133,43 +250,57 @@ Event_queue::create_event(THD *thd, Event_parse_data *et) */ int -Event_queue::update_event(THD *thd, Event_parse_data *et, +Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name) { int res; - Event_timed *et_old= NULL, *et_new= NULL; + Event_queue_element *element_old= NULL, + *element_new; DBUG_ENTER("Event_queue::update_event"); - DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", - thd, et, et->dbname.str, et->name.str, &LOCK_event_queue)); + DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str)); - res= db_repository-> - load_named_event_timed(thd, new_schema?*new_schema:et->dbname, - new_name? *new_name:et->name, &et_new); + element_new= new Event_queue_element(); - if (res && res != OP_DISABLED_EVENT) + res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname, + new_name? *new_name:name, element_new); + if (res) + { + delete element_new; goto end; + } + else if (element_new->status == Event_queue_element::DISABLED) + { + DBUG_PRINT("info", ("The event is disabled.")); + /* + Destroy the object but don't skip to end: because we may have to remove + object from the cache. + */ + delete element_new; + element_new= NULL; + } + else + element_new->compute_next_execution_time(); LOCK_QUEUE_DATA(); - if (!(et_old= find_event(et->dbname, et->name, TRUE))) + if (!(element_old= find_event(dbname, name, TRUE))) { - DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", - et->dbname.str, et->name.str)); + DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED", + dbname.str, name.str)); } - - if (!res) + /* If not disabled event */ + if (element_new) { - DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old)); - queue_insert_safe(&queue, (byte *) et_new); + DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx", + element_new, element_old)); + queue_insert_safe(&queue, (byte *) element_new); } - else if (res == OP_DISABLED_EVENT) - res= OP_OK; UNLOCK_QUEUE_DATA(); notify_observers(); - if (et_old) - delete et_old; + if (element_old) + delete element_old; end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); @@ -181,35 +312,34 @@ end: SYNOPSIS Event_queue::drop_event() - thd Thread - name The event to drop - - RETURN VALUE - FALSE OK (replaced or scheduler not working) - TRUE Failure + thd Thread + dbname Schema of the event to drop + name Name of the event to drop */ -bool -Event_queue::drop_event(THD *thd, sp_name *name) +void +Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) { int res; - Event_timed *et_old; + Event_queue_element *element; DBUG_ENTER("Event_queue::drop_event"); - DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name, - &LOCK_event_queue)); + DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name)); LOCK_QUEUE_DATA(); - if (!(et_old= find_event(name->m_db, name->m_name, TRUE))) - DBUG_PRINT("info", ("No such event found, probably DISABLED")); + element= find_event(dbname, name, TRUE); UNLOCK_QUEUE_DATA(); - if (et_old) - delete et_old; + + if (element) + delete element; + else + DBUG_PRINT("info", ("No such event found, probably DISABLED")); + /* We don't signal here because the scheduler will catch the change next time it wakes up. */ - DBUG_RETURN(FALSE); + DBUG_VOID_RETURN; } @@ -232,7 +362,7 @@ Event_queue::drop_event(THD *thd, sp_name *name) (signalling COND_new_work for instance). */ -Event_timed * +Event_queue_element * Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) { uint i; @@ -240,10 +370,10 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) for (i= 0; i < queue.elements; ++i) { - Event_timed *et= (Event_timed *) queue_element(&queue, i); + Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str, et->dbname.str, et->name.str)); - if (event_timed_identifier_equal(db, name, et)) + if (event_basic_identifier_equal(db, name, et)) { if (remove_from_q) queue_remove(&queue, i); @@ -274,7 +404,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) void Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, - bool (*comparator)(Event_timed *,LEX_STRING *)) + bool (*comparator)(LEX_STRING *, Event_basic *)) { DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); @@ -282,9 +412,9 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, uint i= 0; while (i < queue.elements) { - Event_timed *et= (Event_timed *) queue_element(&queue, i); + Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); - if (comparator(et, &pattern)) + if (comparator(&pattern, et)) { /* The queue is ordered. If we remove an element, then all elements after @@ -331,67 +461,20 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) { DBUG_ENTER("Event_queue::drop_schema_events"); LOCK_QUEUE_DATA(); - drop_matching_events(thd, schema, event_timed_db_equal); + drop_matching_events(thd, schema, event_basic_db_equal); UNLOCK_QUEUE_DATA(); DBUG_VOID_RETURN; } /* - Wrapper for pthread_mutex_lock - - SYNOPSIS - Event_queue::lock_data() - mutex Mutex to lock - line The line number on which the lock is done - - RETURN VALUE - Error code of pthread_mutex_lock() -*/ - -void -Event_queue::lock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_queue::lock_mutex"); - DBUG_PRINT("enter", ("func=%s line=%u", func, line)); - pthread_mutex_lock(&LOCK_event_queue); - mutex_last_locked_in_func= func; - mutex_last_locked_at_line= line; - mutex_queue_data_locked= TRUE; - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_mutex_unlock - - SYNOPSIS - Event_queue::unlock_data() - mutex Mutex to unlock - line The line number on which the unlock is done -*/ - -void -Event_queue::unlock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_queue::unlock_mutex"); - DBUG_PRINT("enter", ("func=%s line=%u", func, line)); - mutex_last_unlocked_at_line= line; - mutex_queue_data_locked= FALSE; - mutex_last_unlocked_in_func= func; - pthread_mutex_unlock(&LOCK_event_queue); - DBUG_VOID_RETURN; -} - - -/* Returns the number of elements in the queue SYNOPSIS Event_queue::events_count() RETURN VALUE - 0 Number of Event_timed objects in the queue + Number of Event_queue_element objects in the queue */ uint @@ -408,31 +491,9 @@ Event_queue::events_count() /* - Returns the number of elements in the queue - - SYNOPSIS - Event_queue::events_count_no_lock() - - RETURN VALUE - 0 Number of Event_timed objects in the queue -*/ - -uint -Event_queue::events_count_no_lock() -{ - uint n; - DBUG_ENTER("Event_queue::events_count_no_lock"); - - n= queue.elements; - - DBUG_RETURN(n); -} - - -/* Loads all ENABLED events from mysql.event into the prioritized queue. Called during scheduler main thread initialization. Compiles - the events. Creates Event_timed instances for every ENABLED event + the events. Creates Event_queue_element instances for every ENABLED event from mysql.event. SYNOPSIS @@ -461,7 +522,7 @@ Event_queue::load_events_from_db(THD *thd) MEM_ROOT boot_root; DBUG_ENTER("Event_queue::load_events_from_db"); - DBUG_PRINT("enter", ("thd=%p", thd)); + DBUG_PRINT("enter", ("thd=0x%lx", thd)); if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) { @@ -469,12 +530,11 @@ Event_queue::load_events_from_db(THD *thd) DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); } - init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); init_read_record(&read_record_info, thd, table ,NULL,1,0); while (!(read_record_info.read_record(&read_record_info))) { - Event_timed *et; - if (!(et= new Event_timed)) + Event_queue_element *et; + if (!(et= new Event_queue_element)) { DBUG_PRINT("info", ("Out of memory")); clean_the_queue= TRUE; @@ -489,13 +549,14 @@ Event_queue::load_events_from_db(THD *thd) "Table probably corrupted"); break; } - if (et->status != Event_timed::ENABLED) + if (et->status != Event_queue_element::ENABLED) { DBUG_PRINT("info",("%s is disabled",et->name.str)); delete et; continue; } - +#if 0 + init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); /* We load only on scheduler root just to check whether the body compiles */ @@ -514,6 +575,7 @@ Event_queue::load_events_from_db(THD *thd) et->free_sp(); break; } + free_root(&boot_root, MYF(0)); /* let's find when to be executed */ if (et->compute_next_execution_time()) @@ -522,19 +584,17 @@ Event_queue::load_events_from_db(THD *thd) " Skipping", et->dbname.str, et->name.str); continue; } - - DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); +#endif + DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list.")); queue_insert_safe(&queue, (byte *) et); count++; } end: end_read_record(&read_record_info); - free_root(&boot_root, MYF(0)); if (clean_the_queue) { - for (count= 0; count < queue.elements; ++count) - queue_remove(&queue, 0); + empty_queue(); ret= -1; } else @@ -571,7 +631,7 @@ Event_queue::check_system_tables(THD *thd) bool ret; DBUG_ENTER("Event_queue::check_system_tables"); - DBUG_PRINT("enter", ("thd=%p", thd)); + DBUG_PRINT("enter", ("thd=0x%lx", thd)); thd->reset_n_backup_open_tables_state(&backup); @@ -618,123 +678,48 @@ Event_queue::check_system_tables(THD *thd) /* - Inits mutexes. - - SYNOPSIS - Event_queue::init_mutexes() -*/ - -void -Event_queue::init_mutexes() -{ - pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST); -} - - -/* - Destroys mutexes. + Recalculates activation times in the queue. There is one reason for + that. Because the values (execute_at) by which the queue is ordered are + changed by calls to compute_next_execution_time() on a request from the + scheduler thread, if it is not running then the values won't be updated. + Once the scheduler is started again the values has to be recalculated + so they are right for the current time. SYNOPSIS - Event_queue::deinit_mutexes() + Event_queue::recalculate_activation_times() + thd Thread */ void -Event_queue::deinit_mutexes() +Event_queue::recalculate_activation_times(THD *thd) { - pthread_mutex_destroy(&LOCK_event_queue); -} - - -/* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_queue::notify_observers() -*/ - -void -Event_queue::notify_observers() -{ - DBUG_ENTER("Event_queue::notify_observers"); - DBUG_PRINT("info", ("Signalling change of the queue")); - scheduler->queue_changed(); - DBUG_VOID_RETURN; -} - - -/* - The implementation of full-fledged initialization. - - SYNOPSIS - Event_queue::init() - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -bool -Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) -{ - int i= 0; - bool ret= FALSE; - DBUG_ENTER("Event_queue::init_queue"); - DBUG_PRINT("enter", ("this=%p", this)); + uint i; + DBUG_ENTER("Event_queue::recalculate_activation_times"); LOCK_QUEUE_DATA(); - db_repository= db_repo; - scheduler= sched; - - if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, - event_timed_compare_q, NULL, 30 /*auto_extent*/)) - { - sql_print_error("SCHEDULER: Can't initialize the execution queue"); - ret= TRUE; - goto end; - } - - if (sizeof(my_time_t) != sizeof(time_t)) + DBUG_PRINT("info", ("%u loaded events to be recalculated", queue.elements)); + for (i= 0; i < queue.elements; i++) { - sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." - "The scheduler may not work correctly. Stopping."); - DBUG_ASSERT(0); - ret= TRUE; - goto end; + ((Event_queue_element*)queue_element(&queue, i))->compute_next_execution_time(); + ((Event_queue_element*)queue_element(&queue, i))->update_timing_fields(thd); } - -end: - UNLOCK_QUEUE_DATA(); - DBUG_RETURN(ret); -} - - -void -Event_queue::deinit_queue() -{ - DBUG_ENTER("Event_queue::deinit_queue"); - - LOCK_QUEUE_DATA(); - empty_queue(); - delete_queue(&queue); + queue_fix(&queue); UNLOCK_QUEUE_DATA(); DBUG_VOID_RETURN; } -void -Event_queue::recalculate_queue(THD *thd) -{ - uint i; - for (i= 0; i < queue.elements; i++) - { - ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); - ((Event_timed*)queue_element(&queue, i))->update_fields(thd); - } - queue_fix(&queue); -} +/* + Empties the queue and destroys the Event_queue_element objects in the + queue. + SYNOPSIS + Event_queue::empty_queue() + + NOTE + Should be called with LOCK_event_queue locked +*/ void Event_queue::empty_queue() @@ -743,9 +728,9 @@ Event_queue::empty_queue() DBUG_ENTER("Event_queue::empty_queue"); DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); /* empty the queue */ - for (i= 0; i < events_count_no_lock(); ++i) + for (i= 0; i < queue.elements; ++i) { - Event_timed *et= (Event_timed *) queue_element(&queue, i); + Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); delete et; } resize_queue(&queue, 0); @@ -753,38 +738,17 @@ Event_queue::empty_queue() } -Event_timed* -Event_queue::get_top() -{ - return (Event_timed *)queue_top(&queue); -} - - -void -Event_queue::remove_top() -{ - queue_remove(&queue, 0);// 0 is top, internally 1 -} - - -void -Event_queue::top_changed() -{ - queue_replaced(&queue); -} - - inline void Event_queue::dbug_dump_queue(time_t now) { #ifndef DBUG_OFF - Event_timed *et; + Event_queue_element *et; uint i; DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); for (i = 0; i < queue.elements; i++) { - et= ((Event_timed*)queue_element(&queue, i)); - DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); + et= ((Event_queue_element*)queue_element(&queue, i)); + DBUG_PRINT("info",("et=0x%lx db=%s name=%s",et, et->dbname.str, et->name.str)); DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", TIME_to_ulonglong_datetime(&et->execute_at), @@ -797,14 +761,14 @@ Event_queue::dbug_dump_queue(time_t now) #endif } -Event_timed * +Event_job_data * Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime) { struct timespec top_time; - Event_timed *et_new= NULL; + Event_job_data *et_new= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); - DBUG_PRINT("enter", ("thd=%p now=%d", thd, now)); + DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); abstime->tv_nsec= 0; LOCK_QUEUE_DATA(); do { @@ -816,28 +780,31 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, } dbug_dump_queue(now); - Event_timed *et= ((Event_timed*)queue_element(&queue, 0)); + Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0)); top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); if (top_time.tv_sec <= now) { DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; - if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name, - &et_new))) + et_new= new Event_job_data(); + if ((res= db_repository->load_named_event(thd, et->dbname, et->name, + et_new))) { + delete et_new; + et_new= NULL; DBUG_ASSERT(0); break; } et->mark_last_executed(thd); if (et->compute_next_execution_time()) - et->status= Event_timed::DISABLED; + et->status= Event_queue_element::DISABLED; DBUG_PRINT("info", ("event's status is %d", et->status)); - et->update_fields(thd); + et->update_timing_fields(thd); if (((et->execute_at.year && !et->expression) || et->execute_at_null) || - (et->status == Event_timed::DISABLED)) + (et->status == Event_queue_element::DISABLED)) { DBUG_PRINT("info", ("removing from the queue")); if (et->dropped) @@ -857,12 +824,122 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, } while (0); UNLOCK_QUEUE_DATA(); - DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new, + DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new, abstime->tv_sec)); if (et_new) - DBUG_PRINT("info", ("db=%s name=%s definer=%s " - "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str, - et_new->definer.str, - TIME_to_ulonglong_datetime(&et_new->execute_at))); + DBUG_PRINT("info", ("db=%s name=%s definer=%s", + et_new->dbname.str, et_new->name.str, et_new->definer.str)); DBUG_RETURN(et_new); } + + +/* + Auxiliary function for locking LOCK_event_queue. Used by the + LOCK_QUEUE_DATA macro + + SYNOPSIS + Event_queue::lock_data() + func Which function is requesting mutex lock + line On which line mutex lock is requested +*/ + +void +Event_queue::lock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_queue::lock_data"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); + pthread_mutex_lock(&LOCK_event_queue); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_queue_data_locked= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Auxiliary function for unlocking LOCK_event_queue. Used by the + UNLOCK_QUEUE_DATA macro + + SYNOPSIS + Event_queue::unlock_data() + func Which function is requesting mutex unlock + line On which line mutex unlock is requested +*/ + +void +Event_queue::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_queue::unlock_data"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); + mutex_last_unlocked_at_line= line; + mutex_queue_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + pthread_mutex_unlock(&LOCK_event_queue); + DBUG_VOID_RETURN; +} + + +/* + Dumps the internal status of the queue + + SYNOPSIS + Event_queue::dump_internal_status() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_queue::dump_internal_status(THD *thd) +{ + DBUG_ENTER("Event_queue::dump_internal_status"); +#ifndef DBUG_OFF + CHARSET_INFO *scs= system_charset_info; + Protocol *protocol= thd->protocol; + List<Item> field_list; + int ret; + char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + int_string.length(0); + + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue element count"), scs); + int_string.set((longlong) queue.elements, scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* queue_data_locked */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue data locked"), scs); + int_string.set((longlong) mutex_queue_data_locked, scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* last locked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue last locked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_locked_in_func, + mutex_last_locked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + + /* last unlocked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue last unlocked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_unlocked_in_func, + mutex_last_unlocked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); +#endif + DBUG_RETURN(FALSE); +} |