diff options
-rw-r--r-- | sql/event_data_objects.cc | 20 | ||||
-rw-r--r-- | sql/event_db_repository.cc | 1 | ||||
-rw-r--r-- | sql/event_db_repository.h | 3 | ||||
-rw-r--r-- | sql/event_queue.cc | 841 | ||||
-rw-r--r-- | sql/event_queue.h | 104 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 1006 | ||||
-rw-r--r-- | sql/event_scheduler.h | 154 | ||||
-rw-r--r-- | sql/events.cc | 1 |
8 files changed, 1182 insertions, 948 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index a47a3e9e936..f4147d72c3d 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -25,6 +25,19 @@ #define EVEX_MAX_INTERVAL_VALUE 2147483647L +/* + Returns a new instance + + SYNOPSIS + Event_parse_data::new_instance() + + RETURN VALUE + Address or NULL in case of error + + NOTE + Created on THD's mem_root +*/ + Event_parse_data * Event_parse_data::new_instance(THD *thd) { @@ -32,6 +45,13 @@ Event_parse_data::new_instance(THD *thd) } +/* + Constructor + + SYNOPSIS + Event_parse_data::Event_parse_data() +*/ + Event_parse_data::Event_parse_data() { item_execute_at= item_expression= item_starts= item_ends= NULL; diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index 68d8234cc90..8886992c839 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -1297,3 +1297,4 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na DBUG_RETURN(OP_OK); } + diff --git a/sql/event_db_repository.h b/sql/event_db_repository.h index 625d5bfb993..e1c64c8aded 100644 --- a/sql/event_db_repository.h +++ b/sql/event_db_repository.h @@ -113,6 +113,9 @@ private: int table_scan_all_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table); + static bool + check_system_tables(THD *thd); + MEM_ROOT repo_root; /* Prevent use of these */ diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 46f965678c6..32c5a076a62 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -15,5 +15,846 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "mysql_priv.h" +#include "events.h" +#include "event_scheduler.h" #include "event_queue.h" #include "event_data_objects.h" +#include "event_db_repository.h" +#include "sp_head.h" + + +#ifdef __GNUC__ +#if __GNUC__ >= 2 +#define SCHED_FUNC __FUNCTION__ +#endif +#else +#define SCHED_FUNC "<unknown>" +#endif + +#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__) +#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__) + + +Event_scheduler* +Event_queue::singleton= NULL; + + +/* + Compares the execute_at members of 2 Event_timed instances. + Used as callback for the prioritized queue when shifting + elements inside. + + SYNOPSIS + event_timed_compare_q() + + vptr - not used (set it to NULL) + a - first Event_timed object + b - second Event_timed object + + RETURN VALUE + -1 - a->execute_at < b->execute_at + 0 - a->execute_at == b->execute_at + 1 - a->execute_at > b->execute_at + + NOTES + execute_at.second_part is not considered during comparison +*/ + +static int +event_timed_compare_q(void *vptr, byte* a, byte *b) +{ + return my_time_compare(&((Event_timed *)a)->execute_at, + &((Event_timed *)b)->execute_at); +} + + + +/* + Constructor of class Event_queue. + + SYNOPSIS + Event_queue::Event_queue() +*/ + +Event_queue::Event_queue() +{ + mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0; + mutex_last_unlocked_in_func= mutex_last_locked_in_func= ""; + mutex_queue_data_locked= FALSE; +} + +/* + Creates an event in the scheduler queue + + SYNOPSIS + Event_queue::create_event() + et The event to add + check_existence Whether to check if already loaded. + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk +*/ + +int +Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) +{ + int res; + Event_timed *et_new; + DBUG_ENTER("Event_queue::create_event"); + DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue)); + + LOCK_QUEUE_DATA(); + if (check_existence && find_event(et->dbname, et->name, FALSE)) + { + res= OP_ALREADY_EXISTS; + goto end; + } + + /* We need to load the event on scheduler_root */ + if (!(res= db_repository-> + load_named_event(thd, et->dbname, et->name, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + on_queue_change(); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; +end: + UNLOCK_QUEUE_DATA(); + DBUG_RETURN(res); +} + + +/* + Updates an event from the scheduler queue + + SYNOPSIS + Event_scheduler::update_event() + thd Thread + et The event to replace(add) into the queue + new_schema New schema + new_name New name + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk + OP_ALREADY_EXISTS Event already in the queue +*/ + +int +Event_queue::update_event(THD *thd, Event_parse_data *et, + LEX_STRING *new_schema, + LEX_STRING *new_name) +{ + int res= OP_OK; + Event_timed *et_old, *et_new= NULL; + LEX_STRING old_schema, old_name; + + LINT_INIT(old_schema.str); + LINT_INIT(old_schema.length); + LINT_INIT(old_name.str); + LINT_INIT(old_name.length); + + DBUG_ENTER("Event_queue::update_event"); + DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", + thd, et, et->dbname.str, et->name.str, &LOCK_event_queue)); + + LOCK_QUEUE_DATA(); + if (!(et_old= find_event(et->dbname, et->name, TRUE))) + DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", + et->dbname.str, et->name.str)); + + if (new_schema && new_name) + { + old_schema= et->dbname; + old_name= et->name; + et->dbname= *new_schema; + et->name= *new_name; + } + /* + We need to load the event (it's strings but on the object itself) + on scheduler_root. et_new could be NULL : + 1. Error occured + 2. If the replace is DISABLED, we don't load it into the queue. + */ + if (!(res= db_repository-> + load_named_event(thd, et->dbname, et->name, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + on_queue_change(); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; + + if (new_schema && new_name) + { + et->dbname= old_schema; + et->name= old_name; + } + DBUG_PRINT("info", ("res=%d", res)); + UNLOCK_QUEUE_DATA(); + /* + Andrey: Is this comment still truthful ??? + + We don't move this code above because a potential kill_thread will call + THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, + which is LOCK_event_queue on which the COND_new_work in ::run() locks. + Hence, we try to acquire a lock which we have already acquired and we run + into an assert. Holding LOCK_event_queue however is not needed because + we don't touch any invariant of the scheduler anymore. ::drop_event() does + the same. + */ + if (et_old) + { + switch (et_old->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(res); +} + + +/* + Drops an event from the scheduler queue + + SYNOPSIS + Event_queue::drop_event() + thd Thread + name The event to drop + + RETURN VALUE + FALSE OK (replaced or scheduler not working) + TRUE Failure +*/ + +bool +Event_queue::drop_event(THD *thd, sp_name *name) +{ + int res; + Event_timed *et_old; + DBUG_ENTER("Event_queue::drop_event"); + DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name, + &LOCK_event_queue)); + + LOCK_QUEUE_DATA(); + if (!(et_old= find_event(name->m_db, name->m_name, TRUE))) + DBUG_PRINT("info", ("No such event found, probably DISABLED")); + + UNLOCK_QUEUE_DATA(); + + /* See comments in ::replace_event() why this is split in two parts. */ + if (et_old) + { + switch ((res= et_old->kill_thread(thd))) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + sql_print_error("SCHEDULER: Got unexpected error %d", res); + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(FALSE); +} + + + + +/* + Searches for an event in the scheduler queue + + SYNOPSIS + Event_queue::find_event() + db The schema of the event to find + name The event to find + remove_from_q If found whether to remove from the Q + + RETURN VALUE + NULL Not found + otherwise Address + + NOTE + The caller should do the locking also the caller is responsible for + actual signalling in case an event is removed from the queue + (signalling COND_new_work for instance). +*/ + +Event_timed * +Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) +{ + uint i; + DBUG_ENTER("Event_queue::find_event"); + + for (i= 0; i < queue.elements; ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str, + et->dbname.str, et->name.str)); + if (event_timed_identifier_equal(db, name, et)) + { + if (remove_from_q) + queue_remove(&queue, i); + DBUG_RETURN(et); + } + } + + DBUG_RETURN(NULL); +} + + +/* + Drops all events from the in-memory queue and disk that match + certain pattern evaluated by a comparator function + + SYNOPSIS + Event_queue::drop_matching_events() + thd THD + pattern A pattern string + comparator The function to use for comparing + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events + + NOTE + Expected is the caller to acquire lock on LOCK_event_queue +*/ + +void +Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, + bool (*comparator)(Event_timed *,LEX_STRING *)) +{ + DBUG_ENTER("Event_queue::drop_matching_events"); + DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); + + uint i= 0, dropped= 0; + while (i < queue.elements) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); + if (comparator(et, &pattern)) + { + /* + The queue is ordered. If we remove an element, then all elements after + it will shift one position to the left, if we imagine it as an array + from left to the right. In this case we should not increment the + counter and the (i < queue.elements) condition is ok. + */ + queue_remove(&queue, i); + + /* See replace_event() */ + switch (et->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et->flags |= EVENT_FREE_WHEN_FINISHED; + ++dropped; + break; + case 0: + delete et; + ++dropped; + break; + default: + DBUG_ASSERT(0); + } + } + else + i++; + } + DBUG_PRINT("info", ("Dropped %lu", dropped)); + /* + Don't send COND_new_work because no need to wake up the scheduler thread. + When it wakes next time up it will recalculate how much more it should + sleep if the top of the queue has been changed by this method. + */ + + DBUG_VOID_RETURN; +} + + +/* + Drops all events from the in-memory queue and disk that are from + certain schema. + + SYNOPSIS + Event_queue::drop_schema_events() + thd THD + db The schema name + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events +*/ + +int +Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) +{ + int ret; + DBUG_ENTER("Event_queue::drop_schema_events"); + LOCK_QUEUE_DATA(); + drop_matching_events(thd, schema, event_timed_db_equal); + UNLOCK_QUEUE_DATA(); + + DBUG_RETURN(ret); +} + + +/* + Wrapper for pthread_mutex_lock + + SYNOPSIS + Event_queue::lock_data() + mutex Mutex to lock + line The line number on which the lock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +void +Event_queue::lock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_queue::lock_mutex"); + DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", + &LOCK_event_queue, func, line)); + pthread_mutex_lock(&LOCK_event_queue); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_queue_data_locked= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Wrapper for pthread_mutex_unlock + + SYNOPSIS + Event_queue::unlock_data() + mutex Mutex to unlock + line The line number on which the unlock is done +*/ + +void +Event_queue::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_queue::UNLOCK_mutex"); + DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", + &LOCK_event_queue, func, line)); + mutex_last_unlocked_at_line= line; + mutex_queue_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + pthread_mutex_unlock(&LOCK_event_queue); + DBUG_VOID_RETURN; +} + + +/* + Returns the number of elements in the queue + + SYNOPSIS + Event_queue::events_count() + + RETURN VALUE + 0 Number of Event_timed objects in the queue +*/ + +uint +Event_queue::events_count() +{ + uint n; + DBUG_ENTER("Event_scheduler::events_count"); + LOCK_QUEUE_DATA(); + n= queue.elements; + UNLOCK_QUEUE_DATA(); + + DBUG_RETURN(n); +} + + +/* + Returns the number of elements in the queue + + SYNOPSIS + Event_queue::events_count_no_lock() + + RETURN VALUE + 0 Number of Event_timed objects in the queue +*/ + +uint +Event_queue::events_count_no_lock() +{ + uint n; + DBUG_ENTER("Event_scheduler::events_count_no_lock"); + + n= queue.elements; + + DBUG_RETURN(n); +} + + +/* + Loads all ENABLED events from mysql.event into the prioritized + queue. Called during scheduler main thread initialization. Compiles + the events. Creates Event_timed instances for every ENABLED event + from mysql.event. + + SYNOPSIS + Event_queue::load_events_from_db() + thd - Thread context. Used for memory allocation in some cases. + + RETURN VALUE + 0 OK + !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, + EVEX_COMPILE_ERROR) - in all these cases mysql.event was + tampered. + + NOTES + Reports the error to the console +*/ + +int +Event_queue::load_events_from_db(THD *thd) +{ + TABLE *table; + READ_RECORD read_record_info; + int ret= -1; + uint count= 0; + bool clean_the_queue= FALSE; + /* Compile the events on this root but only for syntax check, then discard */ + MEM_ROOT boot_root; + + DBUG_ENTER("Event_queue::load_events_from_db"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) + { + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); + DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); + } + + init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info))) + { + Event_timed *et; + if (!(et= new Event_timed)) + { + DBUG_PRINT("info", ("Out of memory")); + clean_the_queue= TRUE; + break; + } + DBUG_PRINT("info", ("Loading event from row.")); + + if ((ret= et->load_from_row(&scheduler_root, table))) + { + clean_the_queue= TRUE; + sql_print_error("SCHEDULER: Error while loading from mysql.event. " + "Table probably corrupted"); + break; + } + if (et->status != Event_timed::ENABLED) + { + DBUG_PRINT("info",("%s is disabled",et->name.str)); + delete et; + continue; + } + + DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); + + /* We load only on scheduler root just to check whether the body compiles */ + switch (ret= et->compile(thd, &boot_root)) { + case EVEX_MICROSECOND_UNSUP: + et->free_sp(); + sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " + "supported but found in mysql.event"); + goto end; + case EVEX_COMPILE_ERROR: + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", + et->dbname.str, et->name.str); + goto end; + default: + /* Free it, it will be compiled again on the worker thread */ + et->free_sp(); + break; + } + + /* let's find when to be executed */ + if (et->compute_next_execution_time()) + { + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); + continue; + } + + DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); + queue_insert_safe(&queue, (byte *) et); + count++; + } +end: + end_read_record(&read_record_info); + free_root(&boot_root, MYF(0)); + + if (clean_the_queue) + { + for (count= 0; count < queue.elements; ++count) + queue_remove(&queue, 0); + ret= -1; + } + else + { + ret= 0; + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); + } + + /* Force close to free memory */ + thd->version--; + + close_thread_tables(thd); + + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); + DBUG_RETURN(ret); +} + + +/* + Opens mysql.db and mysql.user and checks whether: + 1. mysql.db has column Event_priv at column 20 (0 based); + 2. mysql.user has column Event_priv at column 29 (0 based); + + SYNOPSIS + Event_queue::check_system_tables() +*/ + +bool +Event_queue::check_system_tables(THD *thd) +{ + TABLE_LIST tables; + bool not_used; + Open_tables_state backup; + bool ret; + + DBUG_ENTER("Event_queue::check_system_tables"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + thd->reset_n_backup_open_tables_state(&backup); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "db"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, + mysql_db_table_fields, &mysql_db_table_last_check, + ER_CANNOT_LOAD_FROM_TABLE); + close_thread_tables(thd); + } + if (ret) + DBUG_RETURN(TRUE); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "user"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + if (tables.table->s->fields < 29 || + strncmp(tables.table->field[29]->field_name, + STRING_WITH_LEN("Event_priv"))) + { + sql_print_error("mysql.user has no `Event_priv` column at position 29"); + ret= TRUE; + } + close_thread_tables(thd); + } + + thd->restore_backup_open_tables_state(&backup); + + DBUG_RETURN(ret); +} + + +/* + Inits mutexes. + + SYNOPSIS + Event_queue::init_mutexes() +*/ + +void +Event_queue::init_mutexes() +{ + pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST); +} + + +/* + Destroys mutexes. + + SYNOPSIS + Event_queue::destroy_mutexes() +*/ + +void +Event_queue::destroy_mutexes() +{ + pthread_mutex_destroy(&singleton->LOCK_event_queue); +} + + +/* + Signals the main scheduler thread that the queue has changed + its state. + + SYNOPSIS + Event_queue::on_queue_change() +*/ + +void +Event_queue::on_queue_change() +{ + DBUG_ENTER("Event_queue::on_queue_change"); + DBUG_PRINT("info", ("Sending COND_new_work")); + singleton->queue_changed(); + DBUG_VOID_RETURN; +} + + +/* + The implementation of full-fledged initialization. + + SYNOPSIS + Event_scheduler::init() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_queue::init(Event_db_repository *db_repo) +{ + int i= 0; + bool ret= FALSE; + DBUG_ENTER("Event_scheduler::init"); + DBUG_PRINT("enter", ("this=%p", this)); + + LOCK_QUEUE_DATA(); + db_repository= db_repo; + /* init memory root */ + init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + + if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, + event_timed_compare_q, NULL, 30 /*auto_extent*/)) + { + sql_print_error("SCHEDULER: Can't initialize the execution queue"); + ret= TRUE; + goto end; + } + + if (sizeof(my_time_t) != sizeof(time_t)) + { + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." + "The scheduler may not work correctly. Stopping."); + DBUG_ASSERT(0); + ret= TRUE; + goto end; + } + +end: + UNLOCK_QUEUE_DATA(); + DBUG_RETURN(ret); +} + + +void +Event_queue::deinit() +{ + DBUG_ENTER("Event_queue::deinit"); + + LOCK_QUEUE_DATA(); + delete_queue(&queue); + free_root(&scheduler_root, MYF(0)); + UNLOCK_QUEUE_DATA(); + + DBUG_VOID_RETURN; +} + + +void +Event_queue::recalculate_queue(THD *thd) +{ + int i; + for (i= 0; i < queue.elements; i++) + { + ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); + ((Event_timed*)queue_element(&queue, i))->update_fields(thd); + } + queue_fix(&queue); +} + + +void +Event_queue::empty_queue() +{ + int i; + /* empty the queue */ + for (i= 0; i < events_count_no_lock(); ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + et->free_sp(); + delete et; + } + resize_queue(&queue, 0); +} diff --git a/sql/event_queue.h b/sql/event_queue.h index b3aa6133840..8c11d7a2042 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -16,5 +16,107 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - +class sp_name; +class Event_timed; +class Event_db_repository; + +class THD; +typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*); + +class Event_scheduler; + +class Event_queue +{ +public: + Event_queue(); + + static void + init_mutexes(); + + static void + destroy_mutexes(); + + bool + init(Event_db_repository *db_repo); + + void + deinit(); + + /* Methods for queue management follow */ + + int + create_event(THD *thd, Event_parse_data *et, bool check_existence); + + int + update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema, + LEX_STRING *new_name); + + bool + drop_event(THD *thd, sp_name *name); + + int + drop_schema_events(THD *thd, LEX_STRING schema); + + int + drop_user_events(THD *thd, LEX_STRING *definer) + { DBUG_ASSERT(0); return 0;} + + uint + events_count(); + + uint + events_count_no_lock(); + + static bool + check_system_tables(THD *thd); + + void + recalculate_queue(THD *thd); + + void + empty_queue(); + +///////////////protected + Event_timed * + find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); + + int + load_events_from_db(THD *thd); + + void + drop_matching_events(THD *thd, LEX_STRING pattern, + bool (*)(Event_timed *,LEX_STRING *)); + + /* LOCK_event_queue is the mutex which protects the access to the queue. */ + pthread_mutex_t LOCK_event_queue; + + Event_db_repository *db_repository; + + /* The MEM_ROOT of the object */ + MEM_ROOT scheduler_root; + + /* The sorted queue with the Event_timed objects */ + QUEUE queue; + + uint mutex_last_locked_at_line; + uint mutex_last_unlocked_at_line; + const char* mutex_last_locked_in_func; + const char* mutex_last_unlocked_in_func; + bool mutex_queue_data_locked; + + /* helper functions for working with mutexes & conditionals */ + void + lock_data(const char *func, uint line); + + void + unlock_data(const char *func, uint line); + + static void + on_queue_change(); +protected: + /* Singleton instance */ + static Event_scheduler *singleton; + +}; + #endif /* _EVENT_QUEUE_H_ */ diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index 35c30e5fc5a..fb60ce8ae6d 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -264,11 +264,6 @@ LEX_STRING states_names[] = }; #endif - -Event_scheduler -Event_scheduler::singleton; - - const char * const Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = { @@ -278,6 +273,13 @@ Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = }; +/* +Event_scheduler* +Event_scheduler::singleton= NULL; +*/ + + + class Worker_thread_param { public: @@ -301,35 +303,6 @@ public: /* - Compares the execute_at members of 2 Event_timed instances. - Used as callback for the prioritized queue when shifting - elements inside. - - SYNOPSIS - event_timed_compare_q() - - vptr - not used (set it to NULL) - a - first Event_timed object - b - second Event_timed object - - RETURN VALUE - -1 - a->execute_at < b->execute_at - 0 - a->execute_at == b->execute_at - 1 - a->execute_at > b->execute_at - - NOTES - execute_at.second_part is not considered during comparison -*/ - -static int -event_timed_compare_q(void *vptr, byte* a, byte *b) -{ - return my_time_compare(&((Event_timed *)a)->execute_at, - &((Event_timed *)b)->execute_at); -} - - -/* Prints the stack of infos, warnings, errors from thd to the console so it can be fetched by the logs-into-tables and checked later. @@ -640,6 +613,8 @@ event_worker_thread(void *arg) } + + /* Constructor of class Event_scheduler. @@ -648,15 +623,35 @@ event_worker_thread(void *arg) */ Event_scheduler::Event_scheduler() - :state(UNINITIALIZED), start_scheduler_suspended(FALSE), - thread_id(0), mutex_last_locked_at_line(0), - mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""), - mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE), - mutex_scheduler_data_locked(FALSE) { + thread_id= 0; + mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0; + mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= ""; + cond_waiting_on= COND_NONE; + mutex_scheduler_data_locked= FALSE; + state= UNINITIALIZED; + start_scheduler_suspended= FALSE; + LOCK_scheduler_data= &LOCK_event_queue; } + +/* + Returns the singleton instance of the class. + + SYNOPSIS + Event_scheduler::create_instance() + + RETURN VALUE + address +*/ + +void +Event_scheduler::create_instance() +{ + singleton= new Event_scheduler(); +} + /* Returns the singleton instance of the class. @@ -671,7 +666,7 @@ Event_scheduler* Event_scheduler::get_instance() { DBUG_ENTER("Event_scheduler::get_instance"); - DBUG_RETURN(&singleton); + DBUG_RETURN(singleton); } @@ -693,9 +688,9 @@ Event_scheduler::init(Event_db_repository *db_repo) bool ret= FALSE; DBUG_ENTER("Event_scheduler::init"); DBUG_PRINT("enter", ("this=%p", this)); - + + Event_queue::init(db_repo); LOCK_SCHEDULER_DATA(); - db_repository= db_repo; for (;i < COND_LAST; i++) if (pthread_cond_init(&cond_vars[i], NULL)) { @@ -703,27 +698,6 @@ Event_scheduler::init(Event_db_repository *db_repo) ret= TRUE; goto end; } - - /* init memory root */ - init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - - if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, - event_timed_compare_q, NULL, 30 /*auto_extent*/)) - { - sql_print_error("SCHEDULER: Can't initialize the execution queue"); - ret= TRUE; - goto end; - } - - if (sizeof(my_time_t) != sizeof(time_t)) - { - sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." - "The scheduler may not work correctly. Stopping."); - DBUG_ASSERT(0); - ret= TRUE; - goto end; - } - state= INITIALIZED; end: UNLOCK_SCHEDULER_DATA(); @@ -746,14 +720,12 @@ void Event_scheduler::destroy() { DBUG_ENTER("Event_scheduler"); - + Event_queue::deinit(); LOCK_SCHEDULER_DATA(); switch (state) { case UNINITIALIZED: break; case INITIALIZED: - delete_queue(&queue); - free_root(&scheduler_root, MYF(0)); int i; for (i= 0; i < COND_LAST; i++) pthread_cond_destroy(&cond_vars[i]); @@ -771,389 +743,6 @@ Event_scheduler::destroy() } -/* - Creates an event in the scheduler queue - - SYNOPSIS - Event_scheduler::create_event() - et The event to add - check_existence Whether to check if already loaded. - - RETURN VALUE - OP_OK OK or scheduler not working - OP_LOAD_ERROR Error during loading from disk -*/ - -int -Event_scheduler::create_event(THD *thd, Event_parse_data *et, bool check_existence) -{ - int res; - Event_timed *et_new; - DBUG_ENTER("Event_scheduler::create_event"); - DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); - - LOCK_SCHEDULER_DATA(); - if (!is_running_or_suspended()) - { - DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(OP_OK); - } - if (check_existence && find_event(et->dbname, et->name, FALSE)) - { - res= OP_ALREADY_EXISTS; - goto end; - } - - /* We need to load the event on scheduler_root */ - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) - { - queue_insert_safe(&queue, (byte *) et_new); - DBUG_PRINT("info", ("Sending COND_new_work")); - pthread_cond_signal(&cond_vars[COND_new_work]); - } - else if (res == OP_DISABLED_EVENT) - res= OP_OK; -end: - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(res); -} - - -/* - Drops an event from the scheduler queue - - SYNOPSIS - Event_scheduler::drop_event() - etn The event to drop - state Wait the event or kill&drop - - RETURN VALUE - FALSE OK (replaced or scheduler not working) - TRUE Failure -*/ - -bool -Event_scheduler::drop_event(THD *thd, sp_name *name) -{ - int res; - Event_timed *et_old; - DBUG_ENTER("Event_scheduler::drop_event"); - DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name, - &LOCK_scheduler_data)); - - LOCK_SCHEDULER_DATA(); - if (!is_running_or_suspended()) - { - DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(OP_OK); - } - - if (!(et_old= find_event(name->m_db, name->m_name, TRUE))) - DBUG_PRINT("info", ("No such event found, probably DISABLED")); - - UNLOCK_SCHEDULER_DATA(); - - /* See comments in ::replace_event() why this is split in two parts. */ - if (et_old) - { - switch ((res= et_old->kill_thread(thd))) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - sql_print_error("SCHEDULER: Got unexpected error %d", res); - DBUG_ASSERT(0); - } - } - - DBUG_RETURN(FALSE); -} - - -/* - Updates an event from the scheduler queue - - SYNOPSIS - Event_scheduler::replace_event() - et The event to replace(add) into the queue - state Async or sync stopping - - RETURN VALUE - OP_OK OK or scheduler not working - OP_LOAD_ERROR Error during loading from disk - OP_ALREADY_EXISTS Event already in the queue -*/ - -int -Event_scheduler::update_event(THD *thd, Event_parse_data *et, - LEX_STRING *new_schema, - LEX_STRING *new_name) -{ - int res= OP_OK; - Event_timed *et_old, *et_new= NULL; - LEX_STRING old_schema, old_name; - - LINT_INIT(old_schema.str); - LINT_INIT(old_schema.length); - LINT_INIT(old_name.str); - LINT_INIT(old_name.length); - - DBUG_ENTER("Event_scheduler::update_event"); - DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", - thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data)); - - LOCK_SCHEDULER_DATA(); - if (!is_running_or_suspended()) - { - DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(OP_OK); - } - - if (!(et_old= find_event(et->dbname, et->name, TRUE))) - DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", - et->dbname.str, et->name.str)); - - if (new_schema && new_name) - { - old_schema= et->dbname; - old_name= et->name; - et->dbname= *new_schema; - et->name= *new_name; - } - /* - We need to load the event (it's strings but on the object itself) - on scheduler_root. et_new could be NULL : - 1. Error occured - 2. If the replace is DISABLED, we don't load it into the queue. - */ - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) - { - queue_insert_safe(&queue, (byte *) et_new); - DBUG_PRINT("info", ("Sending COND_new_work")); - pthread_cond_signal(&cond_vars[COND_new_work]); - } - else if (res == OP_DISABLED_EVENT) - res= OP_OK; - - if (new_schema && new_name) - { - et->dbname= old_schema; - et->name= old_name; - } - DBUG_PRINT("info", ("res=%d", res)); - UNLOCK_SCHEDULER_DATA(); - /* - Andrey: Is this comment still truthful ??? - - We don't move this code above because a potential kill_thread will call - THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, - which is LOCK_scheduler_data on which the COND_new_work in ::run() locks. - Hence, we try to acquire a lock which we have already acquired and we run - into an assert. Holding LOCK_scheduler_data however is not needed because - we don't touch any invariant of the scheduler anymore. ::drop_event() does - the same. - */ - if (et_old) - { - switch (et_old->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - DBUG_ASSERT(0); - } - } - - DBUG_RETURN(res); -} - - -/* - Searches for an event in the scheduler queue - - SYNOPSIS - Event_scheduler::find_event() - db The schema of the event to find - name The event to find - remove_from_q If found whether to remove from the Q - - RETURN VALUE - NULL Not found - otherwise Address - - NOTE - The caller should do the locking also the caller is responsible for - actual signalling in case an event is removed from the queue - (signalling COND_new_work for instance). -*/ - -Event_timed * -Event_scheduler::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) -{ - uint i; - DBUG_ENTER("Event_scheduler::find_event"); - - for (i= 0; i < queue.elements; ++i) - { - Event_timed *et= (Event_timed *) queue_element(&queue, i); - DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str, - et->dbname.str, et->name.str)); - if (event_timed_identifier_equal(db, name, et)) - { - if (remove_from_q) - queue_remove(&queue, i); - DBUG_RETURN(et); - } - } - - DBUG_RETURN(NULL); -} - - -/* - Drops all events from the in-memory queue and disk that match - certain pattern evaluated by a comparator function - - SYNOPSIS - Event_scheduler::drop_matching_events() - thd THD - pattern A pattern string - comparator The function to use for comparing - - RETURN VALUE - -1 Scheduler not working - >=0 Number of dropped events - - NOTE - Expected is the caller to acquire lock on LOCK_scheduler_data -*/ - -void -Event_scheduler::drop_matching_events(THD *thd, LEX_STRING pattern, - bool (*comparator)(Event_timed *,LEX_STRING *)) -{ - DBUG_ENTER("Event_scheduler::drop_matching_events"); - DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str, - state)); - if (is_running_or_suspended()) - { - uint i= 0, dropped= 0; - while (i < queue.elements) - { - Event_timed *et= (Event_timed *) queue_element(&queue, i); - DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); - if (comparator(et, &pattern)) - { - /* - The queue is ordered. If we remove an element, then all elements after - it will shift one position to the left, if we imagine it as an array - from left to the right. In this case we should not increment the - counter and the (i < queue.elements) condition is ok. - */ - queue_remove(&queue, i); - - /* See replace_event() */ - switch (et->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et->flags |= EVENT_FREE_WHEN_FINISHED; - ++dropped; - break; - case 0: - delete et; - ++dropped; - break; - default: - DBUG_ASSERT(0); - } - } - else - i++; - } - DBUG_PRINT("info", ("Dropped %lu", dropped)); - } - /* - Don't send COND_new_work because no need to wake up the scheduler thread. - When it wakes next time up it will recalculate how much more it should - sleep if the top of the queue has been changed by this method. - */ - - DBUG_VOID_RETURN; -} - - -/* - Drops all events from the in-memory queue and disk that are from - certain schema. - - SYNOPSIS - Event_scheduler::drop_schema_events() - thd THD - db The schema name - - RETURN VALUE - -1 Scheduler not working - >=0 Number of dropped events -*/ - -int -Event_scheduler::drop_schema_events(THD *thd, LEX_STRING schema) -{ - int ret; - DBUG_ENTER("Event_scheduler::drop_schema_events"); - LOCK_SCHEDULER_DATA(); - if (is_running_or_suspended()) - drop_matching_events(thd, schema, event_timed_db_equal); - - UNLOCK_SCHEDULER_DATA(); - - DBUG_RETURN(ret); -} - - extern pthread_attr_t connection_attrib; @@ -1205,8 +794,8 @@ Event_scheduler::start() } /* Wait till the child thread has booted (w/ or wo success) */ - while (!is_running_or_suspended() && state != CANTSTART) - cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + while (!(state == SUSPENDED || state == RUNNING) && state != CANTSTART) + cond_wait(COND_started_or_stopped, LOCK_scheduler_data); /* If we cannot start for some reason then don't prohibit further attempts. @@ -1314,7 +903,7 @@ Event_scheduler::run(THD *thd) sql_print_information("SCHEDULER: Manager thread started with id %lu", thd->thread_id); abstime.tv_nsec= 0; - while (is_running_or_suspended()) + while ((state == SUSPENDED || state == RUNNING)) { Event_timed *et; @@ -1374,9 +963,9 @@ Event_scheduler::run(THD *thd) pthread_cond_timedwait() will wait till `abstime`. "Sleeping until next time" */ - thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping"); + thd->enter_cond(&cond_vars[COND_new_work],LOCK_scheduler_data,"Sleeping"); - pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data, + pthread_cond_timedwait(&cond_vars[COND_new_work], LOCK_scheduler_data, &abstime); DBUG_PRINT("info", ("Manager woke up. state is %d", state)); @@ -1397,7 +986,7 @@ Event_scheduler::run(THD *thd) In this case stop the manager. We should enter ::execute_top() with locked LOCK_scheduler_data. */ - int ret= execute_top(thd); + int ret= execute_top(thd, et); UNLOCK_SCHEDULER_DATA(); if (ret) break; @@ -1421,7 +1010,7 @@ Event_scheduler::run(THD *thd) sql_print_information("SCHEDULER: Shutting down"); thd->proc_info= (char *)"Cleaning queue"; - clean_queue(thd); + clean_memory(thd); THD_CHECK_SENTRY(thd); /* free mamager_root memory but don't destroy the root */ @@ -1474,15 +1063,13 @@ Event_scheduler::run(THD *thd) */ bool -Event_scheduler::execute_top(THD *thd) +Event_scheduler::execute_top(THD *thd, Event_timed *et) { int spawn_ret_code; bool ret= FALSE; DBUG_ENTER("Event_scheduler::execute_top"); DBUG_PRINT("enter", ("thd=%p", thd)); - Event_timed *et= (Event_timed *)queue_top(&queue); - /* Is it good idea to pass a stack address ?*/ Worker_thread_param param(et); @@ -1552,7 +1139,7 @@ Event_scheduler::execute_top(THD *thd) */ void -Event_scheduler::clean_queue(THD *thd) +Event_scheduler::clean_memory(THD *thd) { CHARSET_INFO *scs= system_charset_info; uint i; @@ -1565,14 +1152,7 @@ Event_scheduler::clean_queue(THD *thd) sql_print_information("SCHEDULER: Emptying the queue"); - /* empty the queue */ - for (i= 0; i < queue.elements; ++i) - { - Event_timed *et= (Event_timed *) queue_element(&queue, i); - et->free_sp(); - delete et; - } - resize_queue(&queue, 0); + empty_queue(); DBUG_VOID_RETURN; } @@ -1681,7 +1261,7 @@ Event_scheduler::stop() DBUG_PRINT("enter", ("thd=%p", current_thd)); LOCK_SCHEDULER_DATA(); - if (!is_running_or_suspended()) + if (!(state == SUSPENDED || state == RUNNING)) { /* One situation to be here is if there was a start that forked a new @@ -1717,7 +1297,7 @@ Event_scheduler::stop() DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " "thread. Current value of state is %d . " "workers count=%d", state, workers_count())); - cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + cond_wait(COND_started_or_stopped, LOCK_scheduler_data); } DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); UNLOCK_SCHEDULER_DATA(); @@ -1768,7 +1348,7 @@ Event_scheduler::suspend_or_resume( pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); } DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume")); - cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + cond_wait(COND_suspend_or_resume, LOCK_scheduler_data); DBUG_PRINT("info", ("Got response")); } UNLOCK_SCHEDULER_DATA(); @@ -1833,7 +1413,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd) } if (state == SUSPENDED) { - thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data, + thd->enter_cond(&cond_vars[COND_suspend_or_resume], LOCK_scheduler_data, "Suspended"); /* Send back signal to the thread that asked us to suspend operations */ pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); @@ -1842,7 +1422,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd) } while (state == SUSPENDED) { - cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + cond_wait(COND_suspend_or_resume, LOCK_scheduler_data); DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume")); if (state != SUSPENDED) { @@ -1852,18 +1432,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd) } if (was_suspended) { - if (queue.elements) - { - uint i; - DBUG_PRINT("info", ("We have to recompute the execution times")); - - for (i= 0; i < queue.elements; i++) - { - ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); - ((Event_timed*)queue_element(&queue, i))->update_fields(thd); - } - queue_fix(&queue); - } + recalculate_queue(thd); /* This will implicitly unlock LOCK_scheduler_data */ thd->exit_cond(""); } @@ -1892,18 +1461,18 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) bool slept= FALSE; DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); DBUG_PRINT("enter", ("q.elements=%lu state=%s", - queue.elements, states_names[state])); + events_count_no_lock(), states_names[state])); - if (!queue.elements) - thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data, + if (!events_count_no_lock()) + thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data, "Empty queue, sleeping"); /* Wait in a loop protecting against catching spurious signals */ - while (!queue.elements && state == RUNNING) + while (!events_count_no_lock() && state == RUNNING) { slept= TRUE; DBUG_PRINT("info", ("Entering condition because of empty queue")); - cond_wait(COND_new_work, &LOCK_scheduler_data); + cond_wait(COND_new_work, LOCK_scheduler_data); DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d", state)); /* @@ -1916,105 +1485,13 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) thd->exit_cond(""); DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", - queue.elements, states_names[state], thd->killed)); + events_count_no_lock(), states_names[state], thd->killed)); DBUG_RETURN(slept); } /* - Wrapper for pthread_mutex_lock - - SYNOPSIS - Event_scheduler::lock_data() - mutex Mutex to lock - line The line number on which the lock is done - - RETURN VALUE - Error code of pthread_mutex_lock() -*/ - -inline void -Event_scheduler::lock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler::lock_mutex"); - DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", - &LOCK_scheduler_data, func, line)); - pthread_mutex_lock(&LOCK_scheduler_data); - mutex_last_locked_in_func= func; - mutex_last_locked_at_line= line; - mutex_scheduler_data_locked= TRUE; - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_mutex_unlock - - SYNOPSIS - Event_scheduler::unlock_data() - mutex Mutex to unlock - line The line number on which the unlock is done -*/ - -inline void -Event_scheduler::unlock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); - DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", - &LOCK_scheduler_data, func, line)); - mutex_last_unlocked_at_line= line; - mutex_scheduler_data_locked= FALSE; - mutex_last_unlocked_in_func= func; - pthread_mutex_unlock(&LOCK_scheduler_data); - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_cond_wait - - SYNOPSIS - Event_scheduler::cond_wait() - cond Conditional to wait for - mutex Mutex of the conditional - - RETURN VALUE - Error code of pthread_cond_wait() -*/ - -inline int -Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond, - pthread_mutex_t *mutex) -{ - int ret; - DBUG_ENTER("Event_scheduler::cond_wait"); - DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); - ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); - cond_waiting_on= COND_NONE; - DBUG_RETURN(ret); -} - - -/* - Checks whether the scheduler is in a running or suspended state. - - SYNOPSIS - Event_scheduler::is_running_or_suspended() - - RETURN VALUE - TRUE Either running or suspended - FALSE IN_SHUTDOWN, not started, etc. -*/ - -inline bool -Event_scheduler::is_running_or_suspended() -{ - return (state == SUSPENDED || state == RUNNING); -} - - -/* Returns the current state of the scheduler SYNOPSIS @@ -2027,9 +1504,9 @@ Event_scheduler::get_state() enum Event_scheduler::enum_state ret; DBUG_ENTER("Event_scheduler::get_state"); /* lock_data & unlock_data are not static */ - pthread_mutex_lock(&singleton.LOCK_scheduler_data); - ret= singleton.state; - pthread_mutex_unlock(&singleton.LOCK_scheduler_data); + pthread_mutex_lock(singleton->LOCK_scheduler_data); + ret= singleton->state; + pthread_mutex_unlock(singleton->LOCK_scheduler_data); DBUG_RETURN(ret); } @@ -2053,252 +1530,6 @@ Event_scheduler::initialized() } -/* - Returns the number of elements in the queue - - SYNOPSIS - Event_scheduler::events_count() - - RETURN VALUE - 0 Number of Event_timed objects in the queue -*/ - -uint -Event_scheduler::events_count() -{ - uint n; - DBUG_ENTER("Event_scheduler::events_count"); - LOCK_SCHEDULER_DATA(); - n= queue.elements; - UNLOCK_SCHEDULER_DATA(); - - DBUG_RETURN(n); -} - - - - -/* - Loads all ENABLED events from mysql.event into the prioritized - queue. Called during scheduler main thread initialization. Compiles - the events. Creates Event_timed instances for every ENABLED event - from mysql.event. - - SYNOPSIS - Event_scheduler::load_events_from_db() - thd - Thread context. Used for memory allocation in some cases. - - RETURN VALUE - 0 OK - !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, - EVEX_COMPILE_ERROR) - in all these cases mysql.event was - tampered. - - NOTES - Reports the error to the console -*/ - -int -Event_scheduler::load_events_from_db(THD *thd) -{ - TABLE *table; - READ_RECORD read_record_info; - int ret= -1; - uint count= 0; - bool clean_the_queue= FALSE; - /* Compile the events on this root but only for syntax check, then discard */ - MEM_ROOT boot_root; - - DBUG_ENTER("Event_scheduler::load_events_from_db"); - DBUG_PRINT("enter", ("thd=%p", thd)); - - if (state > COMMENCING) - { - DBUG_ASSERT(0); - sql_print_error("SCHEDULER: Trying to load events while already running."); - DBUG_RETURN(EVEX_GENERAL_ERROR); - } - - if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) - { - sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); - DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); - } - - init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - init_read_record(&read_record_info, thd, table ,NULL,1,0); - while (!(read_record_info.read_record(&read_record_info))) - { - Event_timed *et; - if (!(et= new Event_timed)) - { - DBUG_PRINT("info", ("Out of memory")); - clean_the_queue= TRUE; - break; - } - DBUG_PRINT("info", ("Loading event from row.")); - - if ((ret= et->load_from_row(&scheduler_root, table))) - { - clean_the_queue= TRUE; - sql_print_error("SCHEDULER: Error while loading from mysql.event. " - "Table probably corrupted"); - break; - } - if (et->status != Event_timed::ENABLED) - { - DBUG_PRINT("info",("%s is disabled",et->name.str)); - delete et; - continue; - } - - DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); - - /* We load only on scheduler root just to check whether the body compiles */ - switch (ret= et->compile(thd, &boot_root)) { - case EVEX_MICROSECOND_UNSUP: - et->free_sp(); - sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " - "supported but found in mysql.event"); - goto end; - case EVEX_COMPILE_ERROR: - sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", - et->dbname.str, et->name.str); - goto end; - default: - /* Free it, it will be compiled again on the worker thread */ - et->free_sp(); - break; - } - - /* let's find when to be executed */ - if (et->compute_next_execution_time()) - { - sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." - " Skipping", et->dbname.str, et->name.str); - continue; - } - - DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); - queue_insert_safe(&queue, (byte *) et); - count++; - } -end: - end_read_record(&read_record_info); - free_root(&boot_root, MYF(0)); - - if (clean_the_queue) - { - for (count= 0; count < queue.elements; ++count) - queue_remove(&queue, 0); - ret= -1; - } - else - { - ret= 0; - sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); - } - - /* Force close to free memory */ - thd->version--; - - close_thread_tables(thd); - - DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); - DBUG_RETURN(ret); -} - - -/* - Opens mysql.db and mysql.user and checks whether: - 1. mysql.db has column Event_priv at column 20 (0 based); - 2. mysql.user has column Event_priv at column 29 (0 based); - - SYNOPSIS - Event_scheduler::check_system_tables() -*/ - -bool -Event_scheduler::check_system_tables(THD *thd) -{ - TABLE_LIST tables; - bool not_used; - Open_tables_state backup; - bool ret; - - DBUG_ENTER("Event_scheduler::check_system_tables"); - DBUG_PRINT("enter", ("thd=%p", thd)); - - thd->reset_n_backup_open_tables_state(&backup); - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "db"; - tables.lock_type= TL_READ; - - if ((ret= simple_open_n_lock_tables(thd, &tables))) - sql_print_error("Cannot open mysql.db"); - else - { - ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, - mysql_db_table_fields, &mysql_db_table_last_check, - ER_CANNOT_LOAD_FROM_TABLE); - close_thread_tables(thd); - } - if (ret) - DBUG_RETURN(TRUE); - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "user"; - tables.lock_type= TL_READ; - - if ((ret= simple_open_n_lock_tables(thd, &tables))) - sql_print_error("Cannot open mysql.db"); - else - { - if (tables.table->s->fields < 29 || - strncmp(tables.table->field[29]->field_name, - STRING_WITH_LEN("Event_priv"))) - { - sql_print_error("mysql.user has no `Event_priv` column at position 29"); - ret= TRUE; - } - close_thread_tables(thd); - } - - thd->restore_backup_open_tables_state(&backup); - - DBUG_RETURN(ret); -} - - -/* - Inits mutexes. - - SYNOPSIS - Event_scheduler::init_mutexes() -*/ - -void -Event_scheduler::init_mutexes() -{ - pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST); -} - - -/* - Destroys mutexes. - - SYNOPSIS - Event_scheduler::destroy_mutexes() -*/ - -void -Event_scheduler::destroy_mutexes() -{ - pthread_mutex_destroy(&singleton.LOCK_scheduler_data); -} /* @@ -2337,8 +1568,8 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("state"), scs); - protocol->store(states_names[singleton.state].str, - states_names[singleton.state].length, + protocol->store(states_names[singleton->state].str, + states_names[singleton->state].length, scs); ret= protocol->write(); @@ -2346,7 +1577,7 @@ Event_scheduler::dump_internal_status(THD *thd) If not initialized - don't show anything else. get_instance() will otherwise implicitly initialize it. We don't want that. */ - if (singleton.state >= INITIALIZED) + if (singleton->state >= INITIALIZED) { /* last locked at*/ /* @@ -2357,8 +1588,8 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("last locked at"), scs); tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), tmp_string.alloced_length(), "%s::%d", - singleton.mutex_last_locked_in_func, - singleton.mutex_last_locked_at_line)); + singleton->mutex_last_locked_in_func, + singleton->mutex_last_locked_at_line)); protocol->store(&tmp_string); ret= protocol->write(); @@ -2367,8 +1598,8 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("last unlocked at"), scs); tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), tmp_string.alloced_length(), "%s::%d", - singleton.mutex_last_unlocked_in_func, - singleton.mutex_last_unlocked_at_line)); + singleton->mutex_last_unlocked_in_func, + singleton->mutex_last_unlocked_at_line)); protocol->store(&tmp_string); ret= protocol->write(); @@ -2378,8 +1609,8 @@ Event_scheduler::dump_internal_status(THD *thd) tmp_string.length(scs->cset-> snprintf(scs, (char*) tmp_string.ptr(), tmp_string.alloced_length(), "%s", - (singleton.cond_waiting_on != COND_NONE) ? - cond_vars_names[singleton.cond_waiting_on]: + (singleton->cond_waiting_on != COND_NONE) ? + cond_vars_names[singleton->cond_waiting_on]: "NONE")); protocol->store(&tmp_string); ret= protocol->write(); @@ -2396,7 +1627,7 @@ Event_scheduler::dump_internal_status(THD *thd) /* queue.elements */ protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("queue.elements"), scs); - int_string.set((longlong) scheduler->queue.elements, scs); + int_string.set((longlong) scheduler->events_count_no_lock(), scs); protocol->store(&int_string); ret= protocol->write(); @@ -2411,3 +1642,94 @@ Event_scheduler::dump_internal_status(THD *thd) #endif DBUG_RETURN(0); } + + +/* + Wrapper for pthread_mutex_lock + + SYNOPSIS + Event_scheduler::lock_data() + mutex Mutex to lock + line The line number on which the lock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +void +Event_scheduler::lock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::lock_mutex"); + DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", + &LOCK_scheduler_data, func, line)); + pthread_mutex_lock(LOCK_scheduler_data); + mutex_last_locked_in_func_name= func; + mutex_last_locked_at_line_nr= line; + mutex_scheduler_data_locked= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Wrapper for pthread_mutex_unlock + + SYNOPSIS + Event_scheduler::unlock_data() + mutex Mutex to unlock + line The line number on which the unlock is done +*/ + +void +Event_scheduler::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); + DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", + LOCK_scheduler_data, func, line)); + mutex_last_unlocked_at_line_nr= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func_name= func; + pthread_mutex_unlock(LOCK_scheduler_data); + DBUG_VOID_RETURN; +} + + +/* + Wrapper for pthread_cond_wait + + SYNOPSIS + Event_scheduler::cond_wait() + cond Conditional to wait for + mutex Mutex of the conditional + + RETURN VALUE + Error code of pthread_cond_wait() +*/ + +int +Event_scheduler::cond_wait(int cond, pthread_mutex_t *mutex) +{ + int ret; + DBUG_ENTER("Event_scheduler::cond_wait"); + DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); + ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); + cond_waiting_on= COND_NONE; + DBUG_RETURN(ret); +} + + +/* + Signals the main scheduler thread that the queue has changed + its state. + + SYNOPSIS + Event_scheduler::queue_changed() +*/ + +void +Event_scheduler::queue_changed() +{ + DBUG_ENTER("Event_scheduler::queue_changed"); + DBUG_PRINT("info", ("Sending COND_new_work")); + pthread_cond_signal(&cond_vars[COND_new_work]); + DBUG_VOID_RETURN; +} diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index a274636b38b..b4007d88976 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -21,7 +21,6 @@ class Event_timed; class Event_db_repository; class THD; -typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*); int events_init(); @@ -29,10 +28,12 @@ events_init(); void events_shutdown(); -class Event_scheduler +#include "event_queue.h" +#include "event_scheduler.h" + +class Event_scheduler : public Event_queue { public: - enum enum_state { UNINITIALIZED= 0, @@ -50,32 +51,22 @@ public: RESUME= 2 }; - /* Singleton access */ - static Event_scheduler* - get_instance(); - - /* Methods for queue management follow */ + /* This is the current status of the life-cycle of the scheduler. */ + enum enum_state state; - int - create_event(THD *thd, Event_parse_data *et, bool check_existence); - - int - update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema, - LEX_STRING *new_name); - - bool - drop_event(THD *thd, sp_name *name); + static void + create_instance(); - int - drop_schema_events(THD *thd, LEX_STRING schema); + /* Singleton access */ + static Event_scheduler* + get_instance(); - int - drop_user_events(THD *thd, LEX_STRING *definer) - { DBUG_ASSERT(0); return 0;} + bool + init(Event_db_repository *db_repo); - uint - events_count(); + void + destroy(); /* State changing methods follow */ @@ -97,19 +88,13 @@ public: int suspend_or_resume(enum enum_suspend_or_resume action); - - bool - init(Event_db_repository *db_repo); - - void - destroy(); - +/* static void init_mutexes(); static void destroy_mutexes(); - +*/ void report_error_during_start(); @@ -124,35 +109,34 @@ public: static int dump_internal_status(THD *thd); - static bool - check_system_tables(THD *thd); + /* helper functions for working with mutexes & conditionals */ + void + lock_data(const char *func, uint line); -private: - Event_timed * - find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); + void + unlock_data(const char *func, uint line); + + int + cond_wait(int cond, pthread_mutex_t *mutex); + + void + queue_changed(); + +protected: uint workers_count(); - bool - is_running_or_suspended(); - /* helper functions */ bool - execute_top(THD *thd); + execute_top(THD *thd, Event_timed *et); void - clean_queue(THD *thd); + clean_memory(THD *thd); void stop_all_running_events(THD *thd); - int - load_events_from_db(THD *thd); - - void - drop_matching_events(THD *thd, LEX_STRING pattern, - bool (*)(Event_timed *,LEX_STRING *)); bool check_n_suspend_if_needed(THD *thd); @@ -163,48 +147,14 @@ private: /* Singleton DP is used */ Event_scheduler(); - enum enum_cond_vars - { - COND_NONE= -1, - /* - COND_new_work is a conditional used to signal that there is a change - of the queue that should inform the executor thread that new event should - be executed sooner than previously expected, because of add/replace event. - */ - COND_new_work= 0, - /* - COND_started is a conditional used to synchronize the thread in which - ::start() was called and the spawned thread. ::start() spawns a new thread - and then waits on COND_started but also checks when awaken that `state` is - either RUNNING or CANTSTART. Then it returns back. - */ - COND_started_or_stopped, - /* - Conditional used for signalling from the scheduler thread back to the - thread that calls ::suspend() or ::resume. Synchronizing the calls. - */ - COND_suspend_or_resume, - /* Must be always last */ - COND_LAST - }; - /* Singleton instance */ - static Event_scheduler singleton; + pthread_mutex_t *LOCK_scheduler_data; - /* This is the current status of the life-cycle of the manager. */ - enum enum_state state; /* Set to start the scheduler in suspended state */ bool start_scheduler_suspended; /* - LOCK_scheduler_data is the mutex which protects the access to the - manager's queue as well as used when signalling COND_new_work, - COND_started and COND_shutdown. - */ - pthread_mutex_t LOCK_scheduler_data; - - /* Holds the thread id of the executor thread or 0 if the executor is not running. It is used by ::shutdown() to know which thread to kill with kill_one_thread(). The latter wake ups a thread if it is waiting on a @@ -212,33 +162,27 @@ private: */ ulong thread_id; - pthread_cond_t cond_vars[COND_LAST]; - static const char * const cond_vars_names[COND_LAST]; - - /* The MEM_ROOT of the object */ - MEM_ROOT scheduler_root; - - Event_db_repository *db_repository; + enum enum_cond_vars + { + COND_NONE= -1, + COND_new_work= 0, + COND_started_or_stopped, + COND_suspend_or_resume, + /* Must be always last */ + COND_LAST + }; - /* The sorted queue with the Event_timed objects */ - QUEUE queue; - - uint mutex_last_locked_at_line; - uint mutex_last_unlocked_at_line; - const char* mutex_last_locked_in_func; - const char* mutex_last_unlocked_in_func; - enum enum_cond_vars cond_waiting_on; + uint mutex_last_locked_at_line_nr; + uint mutex_last_unlocked_at_line_nr; + const char* mutex_last_locked_in_func_name; + const char* mutex_last_unlocked_in_func_name; + int cond_waiting_on; bool mutex_scheduler_data_locked; - /* helper functions for working with mutexes & conditionals */ - void - lock_data(const char *func, uint line); - void - unlock_data(const char *func, uint line); + static const char * const cond_vars_names[COND_LAST]; - int - cond_wait(enum enum_cond_vars, pthread_mutex_t *mutex); + pthread_cond_t cond_vars[COND_LAST]; private: /* Prevent use of these */ diff --git a/sql/events.cc b/sql/events.cc index 28c57d6b493..09d5ee21a4f 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -559,6 +559,7 @@ void Events::init_mutexes() { db_repository= new Event_db_repository; + Event_scheduler::create_instance(); Event_scheduler::init_mutexes(); } |