diff options
author | unknown <andrey@lmy004.> | 2006-07-05 17:12:50 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-05 17:12:50 +0200 |
commit | 4a3c079bc0b5be47567ba2d99d58ea48bd7ce6d3 (patch) | |
tree | b703fe1427d5c31cc528ae515b4514eeb6c9dce8 /sql | |
parent | 73c795e6e2efad2b89a1b7444825a4c64d0a1b28 (diff) | |
download | mariadb-git-4a3c079bc0b5be47567ba2d99d58ea48bd7ce6d3.tar.gz |
WL#3337 (Event scheduler new architecture)
Cleaned up the code a bit. Fixed few leaks.
This code still does not load events on server startup
from disk. The problem is that there is a need for a THD instance, which
does not exist during server boot. This will be solved soon.
Still Event_timed is used both for the memory queue and for exectution.
This will be changed according to WL#3337 probably in the next commit.
sql/event_data_objects.cc:
Strip unneeded stuff from class Event_timed
Event_timed is still used for the queue and execution.
That will be changed in the next commit.
sql/event_data_objects.h:
Strip unneeded stuff from class Event_timed
Event_timed is still used for the queue and execution.
That will be changed in the next commit.
sql/event_db_repository.cc:
Cosmetics.
Add a new method load_named_event_job, to be made complete in the
next commit. It will load from disk an instance of Event_job_data to
be used during execution.
sql/event_db_repository.h:
find_event does not need MEM_ROOT anymore
because the memory is allocated on Event's own root.
sql/event_queue.cc:
Remove dead code.
Move dumping of the queue to separate method.
Make critical sections in create_event & update_event
as small as possible - load the new event outside of the section
and free the object also outside of it.
sql/event_queue.h:
init -> init_queue -> easier for ctags
deinit -> deinit_queue -> easier for ctags
sql/event_scheduler.cc:
empty this file
sql/event_scheduler.h:
empty this file
sql/event_scheduler_ng.cc:
add back DBUG_RETURN(0) in thread handlers.
We don't stop running events when stopping the scheduler. Therefore
remove this method now. If it is needed later it can be added back.
sql/event_scheduler_ng.h:
Remove stop_all_running_threads()
init -> init_scheduler
deinit -> deinit_scheduler
easier for ctags
sql/events.cc:
Cosmetics
sql/events.h:
Cosmetics
sql/set_var.cc:
Remove references to dead code
sql/sql_parse.cc:
Reorganize a bit.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/event_data_objects.cc | 195 | ||||
-rw-r--r-- | sql/event_data_objects.h | 25 | ||||
-rw-r--r-- | sql/event_db_repository.cc | 84 | ||||
-rw-r--r-- | sql/event_db_repository.h | 12 | ||||
-rw-r--r-- | sql/event_queue.cc | 249 | ||||
-rw-r--r-- | sql/event_queue.h | 28 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 1537 | ||||
-rw-r--r-- | sql/event_scheduler.h | 187 | ||||
-rw-r--r-- | sql/event_scheduler_ng.cc | 104 | ||||
-rw-r--r-- | sql/event_scheduler_ng.h | 8 | ||||
-rw-r--r-- | sql/events.cc | 58 | ||||
-rw-r--r-- | sql/events.h | 8 | ||||
-rw-r--r-- | sql/set_var.cc | 4 | ||||
-rw-r--r-- | sql/sql_parse.cc | 31 |
14 files changed, 243 insertions, 2287 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index 97db443e08d..0e984bd4c7a 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -530,18 +530,14 @@ Event_parse_data::init_ends(THD *thd, Item *new_ends) Event_timed::Event_timed() */ -Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0), - running(0), thread_id(0), status_changed(false), +Event_timed::Event_timed():status_changed(false), last_executed_changed(false), expression(0), created(0), modified(0), on_completion(Event_timed::ON_COMPLETION_DROP), status(Event_timed::ENABLED), sphead(0), - sql_mode(0), dropped(false), - free_sphead_on_delete(true), flags(0) + sql_mode(0), dropped(false), flags(0) { - pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST); - pthread_cond_init(&this->COND_finished, NULL); init(); } @@ -555,46 +551,8 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0), Event_timed::~Event_timed() { - deinit_mutexes(); free_root(&mem_root, MYF(0)); - - if (free_sphead_on_delete) - free_sp(); -} - - -/* - Destructor - - SYNOPSIS - Event_timed::deinit_mutexes() -*/ - -void -Event_timed::deinit_mutexes() -{ - pthread_mutex_destroy(&this->LOCK_running); - pthread_cond_destroy(&this->COND_finished); -} - - -/* - Checks whether the event is running - - SYNOPSIS - Event_timed::is_running() -*/ - -bool -Event_timed::is_running() -{ - bool ret; - - VOID(pthread_mutex_lock(&this->LOCK_running)); - ret= running; - VOID(pthread_mutex_unlock(&this->LOCK_running)); - - return ret; + free_sp(); } @@ -1253,7 +1211,7 @@ Event_timed::update_fields(THD *thd) Open_tables_state backup; int ret; - DBUG_ENTER("Event_timed::update_time_fields"); + DBUG_ENTER("Event_timed::update_fields"); DBUG_PRINT("enter", ("name: %*s", name.length, name.str)); @@ -1382,7 +1340,7 @@ Event_timed::get_create_event(THD *thd, String *buf) Executes the event (the underlying sp_head object); SYNOPSIS - evex_fill_row() + Event_timed::execute() thd THD mem_root If != NULL use it to compile the event on it @@ -1607,149 +1565,6 @@ done: } -extern pthread_attr_t connection_attrib; - -/* - Checks whether is possible and forks a thread. Passes self as argument. - - RETURN VALUE - EVENT_EXEC_STARTED OK - EVENT_EXEC_ALREADY_EXEC Thread not forked, already working - EVENT_EXEC_CANT_FORK Unable to spawn thread (error) -*/ - -int -Event_timed::spawn_now(void * (*thread_func)(void*), void *arg) -{ - THD *thd= current_thd; - int ret= EVENT_EXEC_STARTED; - DBUG_ENTER("Event_timed::spawn_now"); - DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str)); - - VOID(pthread_mutex_lock(&this->LOCK_running)); - - DBUG_PRINT("info", ("SCHEDULER: execute_at of %s is %lld", name.str, - TIME_to_ulonglong_datetime(&execute_at))); - mark_last_executed(thd); - if (compute_next_execution_time()) - { - sql_print_error("SCHEDULER: Error while computing time of %s.%s . " - "Disabling after execution.", dbname.str, name.str); - status= DISABLED; - } - DBUG_PRINT("evex manager", ("[%10s] next exec at [%llu]", name.str, - TIME_to_ulonglong_datetime(&execute_at))); - /* - 1. For one-time event : year is > 0 and expression is 0 - 2. For recurring, expression is != -=> check execute_at_null in this case - */ - if ((execute_at.year && !expression) || execute_at_null) - { - sql_print_information("SCHEDULER: [%s.%s of %s] no more executions " - "after this one", dbname.str, name.str, - definer.str); - flags |= EVENT_EXEC_NO_MORE | EVENT_FREE_WHEN_FINISHED; - } - - update_fields(thd); - - if (!in_spawned_thread) - { - pthread_t th; - in_spawned_thread= true; - - if (pthread_create(&th, &connection_attrib, thread_func, arg)) - { - DBUG_PRINT("info", ("problem while spawning thread")); - ret= EVENT_EXEC_CANT_FORK; - in_spawned_thread= false; - } - } - else - { - DBUG_PRINT("info", ("already in spawned thread. skipping")); - ret= EVENT_EXEC_ALREADY_EXEC; - } - VOID(pthread_mutex_unlock(&this->LOCK_running)); - - DBUG_RETURN(ret); -} - - -bool -Event_timed::spawn_thread_finish(THD *thd) -{ - bool should_free; - DBUG_ENTER("Event_timed::spawn_thread_finish"); - VOID(pthread_mutex_lock(&LOCK_running)); - in_spawned_thread= false; - DBUG_PRINT("info", ("Sending COND_finished for thread %d", thread_id)); - thread_id= 0; - if (dropped) - drop(thd); - pthread_cond_broadcast(&COND_finished); - should_free= flags & EVENT_FREE_WHEN_FINISHED; - VOID(pthread_mutex_unlock(&LOCK_running)); - DBUG_RETURN(should_free); -} - - -/* - Kills a running event - SYNOPSIS - Event_timed::kill_thread() - - RETURN VALUE - 0 OK - -1 EVEX_CANT_KILL - !0 Error -*/ - -int -Event_timed::kill_thread(THD *thd) -{ - int ret= 0; - DBUG_ENTER("Event_timed::kill_thread"); - pthread_mutex_lock(&LOCK_running); - DBUG_PRINT("info", ("thread_id=%lu", thread_id)); - - if (thread_id == thd->thread_id) - { - /* - We don't kill ourselves in cases like : - alter event e_43 do alter event e_43 do set @a = 4 because - we will never receive COND_finished. - */ - DBUG_PRINT("info", ("It's not safe to kill ourselves in self altering queries")); - ret= EVEX_CANT_KILL; - } - else if (thread_id && !(ret= kill_one_thread(thd, thread_id, false))) - { - thd->enter_cond(&COND_finished, &LOCK_running, "Waiting for finished"); - DBUG_PRINT("info", ("Waiting for COND_finished from thread %d", thread_id)); - while (thread_id) - pthread_cond_wait(&COND_finished, &LOCK_running); - - DBUG_PRINT("info", ("Got COND_finished")); - /* This will implicitly unlock LOCK_running. Hence we return before that */ - thd->exit_cond(""); - - DBUG_RETURN(0); - } - else if (!thread_id && in_spawned_thread) - { - /* - Because the manager thread waits for the forked thread to update thread_id - this situation is impossible. - */ - DBUG_ASSERT(0); - } - pthread_mutex_unlock(&LOCK_running); - DBUG_PRINT("exit", ("%d", ret)); - DBUG_RETURN(ret); -} - - /* Checks whether two events have the same name diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h index 5ae5c7e81ab..a5aaf0e66fa 100644 --- a/sql/event_data_objects.h +++ b/sql/event_data_objects.h @@ -63,12 +63,6 @@ class Event_timed { Event_timed(const Event_timed &); /* Prevent use of these */ void operator=(Event_timed &); - my_bool in_spawned_thread; - ulong locked_by_thread_id; - my_bool running; - ulong thread_id; - pthread_mutex_t LOCK_running; - pthread_cond_t COND_finished; bool status_changed; bool last_executed_changed; @@ -118,7 +112,6 @@ public: ulong sql_mode; bool dropped; - bool free_sphead_on_delete; uint flags;//all kind of purposes static void *operator new(size_t size) @@ -146,9 +139,6 @@ public: void init(); - void - deinit_mutexes(); - int load_from_row(TABLE *table); @@ -172,24 +162,9 @@ public: int compile(THD *thd, MEM_ROOT *mem_root); - - bool - is_running(); - - int - spawn_now(void * (*thread_func)(void*), void *arg); - - bool - spawn_thread_finish(THD *thd); void free_sp(); - - int - kill_thread(THD *thd); - - void - set_thread_id(ulong tid) { thread_id= tid; } }; diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index 074e05e5d8f..a7daed113bb 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -207,7 +207,7 @@ evex_fill_row(THD *thd, TABLE *table, Event_parse_data *et, my_bool is_update) table->field[ET_FIELD_STARTS]->set_notnull(); table->field[ET_FIELD_STARTS]-> store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME); - } + } if (!et->ends_null) { @@ -374,8 +374,7 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table, ret= read_record_info.read_record(&read_record_info); if (ret == 0) ret= copy_event_to_schema_table(thd, schema_table, event_table); - } - while (ret == 0); + } while (ret == 0); DBUG_PRINT("info", ("Scan finished. ret=%d", ret)); end_read_record(&read_record_info); @@ -464,8 +463,7 @@ Event_db_repository::fill_schema_events(THD *thd, TABLE_LIST *tables, char *db) int Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, - Event_timed **ett, - TABLE *tbl, MEM_ROOT *root) + Event_timed **ett, TABLE *tbl) { TABLE *table; int ret; @@ -505,7 +503,7 @@ done: if (ret) { delete et; - et= 0; + et= NULL; } /* don't close the table if we haven't opened it ourselves */ if (!tbl && table) @@ -518,7 +516,6 @@ done: int Event_db_repository::init_repository() { - init_alloc_root(&repo_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); return 0; } @@ -526,7 +523,6 @@ Event_db_repository::init_repository() void Event_db_repository::deinit_repository() { - free_root(&repo_root, MYF(0)); } @@ -731,7 +727,8 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data, parse_data->name.str)); DBUG_PRINT("info", ("check existance of an event with the same name")); - if (!evex_db_find_event_by_name(thd, parse_data->dbname, parse_data->name, table)) + if (!evex_db_find_event_by_name(thd, parse_data->dbname, + parse_data->name, table)) { if (create_if_not) { @@ -1026,14 +1023,12 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db, */ if (db.length > table->field[ET_FIELD_DB]->field_length || name.length > table->field[ET_FIELD_NAME]->field_length) - DBUG_RETURN(EVEX_KEY_NOT_FOUND); table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin); table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin); - key_copy(key, table->record[0], table->key_info, - table->key_info->key_length); + key_copy(key, table->record[0], table->key_info, table->key_info->key_length); if (table->file->index_read_idx(table->record[0], 0, key, table->key_info->key_length, @@ -1125,7 +1120,7 @@ Event_db_repository::drop_events_by_field(THD *thd, the table, compiles and inserts it into the cache. SYNOPSIS - Event_scheduler::load_named_event() + Event_db_repository::load_named_event_timed() thd THD etn The name of the event to load and compile on scheduler's root etn_new The loaded event @@ -1136,20 +1131,21 @@ Event_db_repository::drop_events_by_field(THD *thd, */ int -Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, - Event_timed **etn_new) +Event_db_repository::load_named_event_timed(THD *thd, LEX_STRING dbname, + LEX_STRING name, + Event_timed **etn_new) { int ret= 0; MEM_ROOT *tmp_mem_root; Event_timed *et_loaded= NULL; Open_tables_state backup; - DBUG_ENTER("Event_db_repository::load_named_event"); + DBUG_ENTER("Event_db_repository::load_named_event_timed"); DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str)); thd->reset_n_backup_open_tables_state(&backup); /* No need to use my_error() here because db_find_event() has done it */ - ret= find_event(thd, dbname, name, &et_loaded, NULL, &repo_root); + ret= find_event(thd, dbname, name, &et_loaded, NULL); thd->restore_backup_open_tables_state(&backup); /* In this case no memory was allocated so we don't need to clean */ if (ret) @@ -1171,3 +1167,57 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na DBUG_RETURN(OP_OK); } + + +/* + Looks for a named event in mysql.event and then loads it from + the table, compiles and inserts it into the cache. + + SYNOPSIS + Event_db_repository::load_named_event_job() + thd THD + etn The name of the event to load and compile on scheduler's root + etn_new The loaded event + + RETURN VALUE + NULL Error during compile or the event is non-enabled. + otherwise Address +*/ + +int +Event_db_repository::load_named_event_job(THD *thd, LEX_STRING dbname, + LEX_STRING name, + Event_job_data **etn_new) +{ + int ret= 0; + MEM_ROOT *tmp_mem_root; + Event_timed *et_loaded= NULL; + Open_tables_state backup; + + DBUG_ENTER("Event_db_repository::load_named_event_job"); + DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str)); +#if 0 + thd->reset_n_backup_open_tables_state(&backup); + /* No need to use my_error() here because db_find_event() has done it */ + ret= find_event(thd, dbname, name, &et_loaded, NULL); + thd->restore_backup_open_tables_state(&backup); + /* In this case no memory was allocated so we don't need to clean */ + if (ret) + DBUG_RETURN(OP_LOAD_ERROR); + + if (et_loaded->status != Event_timed::ENABLED) + { + /* + We don't load non-enabled events. + In db_find_event() `et_new` was allocated on the heap and not on + scheduler_root therefore we delete it here. + */ + delete et_loaded; + DBUG_RETURN(OP_DISABLED_EVENT); + } + + et_loaded->compute_next_execution_time(); + *etn_new= et_loaded; +#endif + DBUG_RETURN(OP_OK); +} diff --git a/sql/event_db_repository.h b/sql/event_db_repository.h index e1c64c8aded..c000247cff2 100644 --- a/sql/event_db_repository.h +++ b/sql/event_db_repository.h @@ -56,6 +56,7 @@ fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */); class Event_timed; class Event_parse_data; class Event_queue_element; +class Event_job_data; class Event_db_repository { @@ -88,10 +89,15 @@ public: int find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **ett, - TABLE *tbl, MEM_ROOT *root); + TABLE *tbl); int - load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **etn_new); + load_named_event_timed(THD *thd, LEX_STRING dbname, LEX_STRING name, + Event_timed **etn_new); + + int + load_named_event_job(THD *thd, LEX_STRING dbname, LEX_STRING name, + Event_job_data **etn_new); int find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table); @@ -116,8 +122,6 @@ private: static bool check_system_tables(THD *thd); - MEM_ROOT repo_root; - /* Prevent use of these */ Event_db_repository(const Event_db_repository &); void operator=(Event_db_repository &); diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 44920b29c16..b42372ba6bd 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -90,34 +90,29 @@ Event_queue::Event_queue() RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk + OP_ALREADY_EXISTS Event already in the queue */ int -Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) +Event_queue::create_event(THD *thd, Event_parse_data *et) { int res; Event_timed *et_new; DBUG_ENTER("Event_queue::create_event"); DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue)); + res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new); LOCK_QUEUE_DATA(); - if (check_existence && find_event(et->dbname, et->name, FALSE)) - { - res= OP_ALREADY_EXISTS; - goto end; - } - - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) + if (!res) { DBUG_PRINT("info", ("new event in the queue %p", et_new)); queue_insert_safe(&queue, (byte *) et_new); - on_queue_change(); + notify_observers(); } else if (res == OP_DISABLED_EVENT) res= OP_OK; -end: UNLOCK_QUEUE_DATA(); + DBUG_RETURN(res); } @@ -129,104 +124,54 @@ end: Event_queue::update_event() thd Thread et The event to replace(add) into the queue - new_schema New schema - new_name New name + new_schema New schema, in case of RENAME TO + new_name New name, in case of RENAME TO RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk - OP_ALREADY_EXISTS Event already in the queue */ int Event_queue::update_event(THD *thd, Event_parse_data *et, - LEX_STRING *new_schema, - LEX_STRING *new_name) + LEX_STRING *new_schema, LEX_STRING *new_name) { - int res= OP_OK; - Event_timed *et_old, *et_new= NULL; - LEX_STRING old_schema, old_name; - - LINT_INIT(old_schema.str); - LINT_INIT(old_schema.length); - LINT_INIT(old_name.str); - LINT_INIT(old_name.length); + int res; + Event_timed *et_old= NULL, *et_new= NULL; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", thd, et, et->dbname.str, et->name.str, &LOCK_event_queue)); + res= db_repository-> + load_named_event_timed(thd, new_schema?*new_schema:et->dbname, + new_name? *new_name:et->name, &et_new); + + if (res && res != OP_DISABLED_EVENT) + goto end; + LOCK_QUEUE_DATA(); if (!(et_old= find_event(et->dbname, et->name, TRUE))) + { DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", et->dbname.str, et->name.str)); - - if (new_schema && new_name) - { - old_schema= et->dbname; - old_name= et->name; - et->dbname= *new_schema; - et->name= *new_name; } - if (!(res= db_repository-> - load_named_event(thd, et->dbname, et->name, &et_new))) + if (!res) { DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old)); queue_insert_safe(&queue, (byte *) et_new); - on_queue_change(); } else if (res == OP_DISABLED_EVENT) res= OP_OK; - - if (new_schema && new_name) - { - et->dbname= old_schema; - et->name= old_name; - } - DBUG_PRINT("info", ("res=%d", res)); UNLOCK_QUEUE_DATA(); - /* - Andrey: Is this comment still truthful ??? - - We don't move this code above because a potential kill_thread will call - THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, - which is LOCK_event_queue on which the COND_new_work in ::run() locks. - Hence, we try to acquire a lock which we have already acquired and we run - into an assert. Holding LOCK_event_queue however is not needed because - we don't touch any invariant of the scheduler anymore. ::drop_event() does - the same. - */ - if (et_old) - { - switch (et_old->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - DBUG_ASSERT(0); - } - } + notify_observers(); + + if (et_old) + delete et_old; +end: + DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); } @@ -256,40 +201,13 @@ Event_queue::drop_event(THD *thd, sp_name *name) LOCK_QUEUE_DATA(); if (!(et_old= find_event(name->m_db, name->m_name, TRUE))) DBUG_PRINT("info", ("No such event found, probably DISABLED")); - UNLOCK_QUEUE_DATA(); - - /* See comments in ::replace_event() why this is split in two parts. */ if (et_old) - { - switch ((res= et_old->kill_thread(thd))) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et_old->flags |= EVENT_FREE_WHEN_FINISHED; - break; - case 0: - /* - kill_thread() waits till the spawned thread finishes after it's - killed. Hence, we delete here memory which is no more referenced from - a running thread. - */ - delete et_old; - /* - We don't signal COND_new_work here because: - 1. Even if the dropped event is on top of the queue this will not - move another one to be executed before the time the one on the - top (but could be at the same second as the dropped one) - 2. If this was the last event on the queue, then pthread_cond_timedwait - in ::run() will finish and then see that the queue is empty and - call cond_wait(). Hence, no need to interrupt the blocked - ::run() thread. - */ - break; - default: - sql_print_error("SCHEDULER: Got unexpected error %d", res); - DBUG_ASSERT(0); - } - } + delete et_old; + /* + We don't signal here because the scheduler will catch the change + next time it wakes up. + */ DBUG_RETURN(FALSE); } @@ -361,7 +279,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); - uint i= 0, dropped= 0; + uint i= 0; while (i < queue.elements) { Event_timed *et= (Event_timed *) queue_element(&queue, i); @@ -375,32 +293,22 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, counter and the (i < queue.elements) condition is ok. */ queue_remove(&queue, i); - - /* See replace_event() */ - switch (et->kill_thread(thd)) { - case EVEX_CANT_KILL: - /* Don't delete but continue */ - et->flags |= EVENT_FREE_WHEN_FINISHED; - ++dropped; - break; - case 0: - delete et; - ++dropped; - break; - default: - DBUG_ASSERT(0); - } + delete et; } else i++; } - DBUG_PRINT("info", ("Dropped %lu", dropped)); /* - Don't send COND_new_work because no need to wake up the scheduler thread. - When it wakes next time up it will recalculate how much more it should - sleep if the top of the queue has been changed by this method. + We don't call notify_observers() . If we remove the top event: + 1. The queue is empty. The scheduler will wake up at some time and realize + that the queue is empty. If create_event() comes inbetween it will + signal the scheduler + 2. The queue is not empty, but the next event after the previous top, won't + be executed any time sooner than the element we removed. Hence, we may + not notify the scheduler and it will realize the change when it + wakes up from timedwait. */ - + DBUG_VOID_RETURN; } @@ -418,16 +326,14 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, >=0 Number of dropped events */ -int +void Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) { - int ret; DBUG_ENTER("Event_queue::drop_schema_events"); LOCK_QUEUE_DATA(); drop_matching_events(thd, schema, event_timed_db_equal); UNLOCK_QUEUE_DATA(); - - DBUG_RETURN(ret); + DBUG_VOID_RETURN; } @@ -744,13 +650,13 @@ Event_queue::deinit_mutexes() its state. SYNOPSIS - Event_queue::on_queue_change() + Event_queue::notify_observers() */ void -Event_queue::on_queue_change() +Event_queue::notify_observers() { - DBUG_ENTER("Event_queue::on_queue_change"); + DBUG_ENTER("Event_queue::notify_observers"); DBUG_PRINT("info", ("Signalling change of the queue")); scheduler->queue_changed(); DBUG_VOID_RETURN; @@ -761,7 +667,7 @@ Event_queue::on_queue_change() The implementation of full-fledged initialization. SYNOPSIS - Event_scheduler::init() + Event_queue::init() RETURN VALUE FALSE OK @@ -769,15 +675,16 @@ Event_queue::on_queue_change() */ bool -Event_queue::init(Event_db_repository *db_repo) +Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched) { int i= 0; bool ret= FALSE; - DBUG_ENTER("Event_queue::init"); + DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this=%p", this)); LOCK_QUEUE_DATA(); db_repository= db_repo; + scheduler= sched; if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 30 /*auto_extent*/)) @@ -803,9 +710,9 @@ end: void -Event_queue::deinit() +Event_queue::deinit_queue() { - DBUG_ENTER("Event_queue::deinit"); + DBUG_ENTER("Event_queue::deinit_queue"); LOCK_QUEUE_DATA(); empty_queue(); @@ -833,6 +740,8 @@ void Event_queue::empty_queue() { uint i; + DBUG_ENTER("Event_queue::empty_queue"); + DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); /* empty the queue */ for (i= 0; i < events_count_no_lock(); ++i) { @@ -840,6 +749,7 @@ Event_queue::empty_queue() delete et; } resize_queue(&queue, 0); + DBUG_VOID_RETURN; } @@ -864,6 +774,29 @@ Event_queue::top_changed() } +inline void +Event_queue::dbug_dump_queue(time_t now) +{ +#ifndef DBUG_OFF + Event_timed *et; + uint i; + DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); + for (i = 0; i < queue.elements; i++) + { + et= ((Event_timed*)queue_element(&queue, i)); + DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); + DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " + " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", + TIME_to_ulonglong_datetime(&et->execute_at), + TIME_to_ulonglong_datetime(&et->starts), + TIME_to_ulonglong_datetime(&et->ends), + et->expression, sec_since_epoch_TIME(&et->execute_at), now, + (int)(sec_since_epoch_TIME(&et->execute_at) - now), + sec_since_epoch_TIME(&et->execute_at) <= now)); + } +#endif +} + Event_timed * Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime) @@ -876,36 +809,22 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, LOCK_QUEUE_DATA(); do { int res; - Event_timed *et= NULL; if (!queue.elements) { abstime->tv_sec= 0; break; } - int i; - DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); - for (i = 0; i < queue.elements; i++) - { - et= ((Event_timed*)queue_element(&queue, i)); - DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); - DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " - " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", - TIME_to_ulonglong_datetime(&et->execute_at), - TIME_to_ulonglong_datetime(&et->starts), - TIME_to_ulonglong_datetime(&et->ends), - et->expression, sec_since_epoch_TIME(&et->execute_at), now, - (int)(sec_since_epoch_TIME(&et->execute_at) - now), - sec_since_epoch_TIME(&et->execute_at) <= now)); - } - et= ((Event_timed*)queue_element(&queue, 0)); + dbug_dump_queue(now); + + Event_timed *et= ((Event_timed*)queue_element(&queue, 0)); top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); if (top_time.tv_sec <= now) { DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; - if ((res= db_repository->load_named_event(thd, et->dbname, et->name, - &et_new))) + if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name, + &et_new))) { DBUG_ASSERT(0); break; diff --git a/sql/event_queue.h b/sql/event_queue.h index 1335100be21..d253e3c7597 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -38,15 +38,15 @@ public: deinit_mutexes(); bool - init(Event_db_repository *db_repo); + init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched); void - deinit(); + deinit_queue(); /* Methods for queue management follow */ int - create_event(THD *thd, Event_parse_data *et, bool check_existence); + create_event(THD *thd, Event_parse_data *et); int update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema, @@ -55,13 +55,9 @@ public: bool drop_event(THD *thd, sp_name *name); - int + void drop_schema_events(THD *thd, LEX_STRING schema); - int - drop_user_events(THD *thd, LEX_STRING *definer) - { DBUG_ASSERT(0); return 0;} - uint events_count(); @@ -89,7 +85,7 @@ public: void top_changed(); -///////////////protected +protected: Event_timed * find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); @@ -105,8 +101,6 @@ public: Event_db_repository *db_repository; - /* The sorted queue with the Event_timed objects */ - QUEUE queue; uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; @@ -122,10 +116,16 @@ public: unlock_data(const char *func, uint line); void - on_queue_change(); - + notify_observers(); + + void + dbug_dump_queue(time_t now); + Event_scheduler_ng *scheduler; -protected: + +//public: + /* The sorted queue with the Event_timed objects */ + QUEUE queue; }; diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index 3a9b988f92e..f1c7d8394e3 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -14,1540 +14,3 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "mysql_priv.h" -#include "events.h" -#include "event_data_objects.h" -#include "event_scheduler.h" -#include "event_db_repository.h" -#include "sp_head.h" -#include "event_queue.h" - - -#ifdef __GNUC__ -#if __GNUC__ >= 2 -#define SCHED_FUNC __FUNCTION__ -#endif -#else -#define SCHED_FUNC "<unknown>" -#endif - -#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) -#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) - - -Event_scheduler* -Event_scheduler::singleton= NULL; - - -#ifndef DBUG_OFF -static -LEX_STRING states_names[] = -{ - {(char*) STRING_WITH_LEN("UNINITIALIZED")}, - {(char*) STRING_WITH_LEN("INITIALIZED")}, - {(char*) STRING_WITH_LEN("COMMENCING")}, - {(char*) STRING_WITH_LEN("CANTSTART")}, - {(char*) STRING_WITH_LEN("RUNNING")}, - {(char*) STRING_WITH_LEN("SUSPENDED")}, - {(char*) STRING_WITH_LEN("IN_SHUTDOWN")} -}; -#endif - -const char * const -Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = -{ - "new work", - "started or stopped", - "suspend or resume" -}; - - -/* -Event_scheduler* -Event_scheduler::singleton= NULL; -*/ - - - -class Worker_thread_param -{ -public: - Event_timed *et; - pthread_mutex_t LOCK_started; - pthread_cond_t COND_started; - bool started; - - Worker_thread_param(Event_timed *etn):et(etn), started(FALSE) - { - pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST); - pthread_cond_init(&COND_started, NULL); - } - - ~Worker_thread_param() - { - pthread_mutex_destroy(&LOCK_started); - pthread_cond_destroy(&COND_started); - } -}; - - -/* - Prints the stack of infos, warnings, errors from thd to - the console so it can be fetched by the logs-into-tables and - checked later. - - SYNOPSIS - evex_print_warnings - thd - thread used during the execution of the event - et - the event itself -*/ - -static void -evex_print_warnings(THD *thd, Event_timed *et) -{ - MYSQL_ERROR *err; - DBUG_ENTER("evex_print_warnings"); - if (!thd->warn_list.elements) - DBUG_VOID_RETURN; - - char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; - char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; - String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); - prefix.length(0); - prefix.append("SCHEDULER: ["); - - append_identifier(thd, &prefix, et->definer_user.str, et->definer_user.length); - prefix.append('@'); - append_identifier(thd, &prefix, et->definer_host.str, et->definer_host.length); - prefix.append("][", 2); - append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); - prefix.append('.'); - append_identifier(thd,&prefix, et->name.str, et->name.length); - prefix.append("] ", 2); - - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); - while ((err= it++)) - { - String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); - /* set it to 0 or we start adding at the end. That's the trick ;) */ - err_msg.length(0); - err_msg.append(prefix); - err_msg.append(err->msg, strlen(err->msg), system_charset_info); - err_msg.append("]"); - DBUG_ASSERT(err->level < 3); - (sql_print_message_handlers[err->level])("%*s", err_msg.length(), - err_msg.c_ptr()); - } - DBUG_VOID_RETURN; -} - - -/* - Inits an scheduler thread handler, both the main and a worker - - SYNOPSIS - init_event_thread() - thd - the THD of the thread. Has to be allocated by the caller. - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. - - RETURN VALUE - 0 OK - -1 Error -*/ - -static int -init_event_thread(THD** t, enum enum_thread_type thread_type) -{ - THD *thd= *t; - thd->thread_stack= (char*)t; // remember where our stack is - DBUG_ENTER("init_event_thread"); - thd->client_capabilities= 0; - thd->security_ctx->master_access= 0; - thd->security_ctx->db_access= 0; - thd->security_ctx->host_or_ip= (char*)my_localhost; - my_net_init(&thd->net, 0); - thd->net.read_timeout= slave_net_timeout; - thd->slave_thread= 0; - thd->options|= OPTION_AUTO_IS_NULL; - thd->client_capabilities|= CLIENT_MULTI_RESULTS; - thd->real_id=pthread_self(); - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->thread_id= thread_id++; - threads.append(thd); - thread_count++; - thread_running++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - if (init_thr_lock() || thd->store_globals()) - { - thd->cleanup(); - DBUG_RETURN(-1); - } - -#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) - sigset_t set; - VOID(sigemptyset(&set)); // Get mask in use - VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); -#endif - - /* - Guarantees that we will see the thread in SHOW PROCESSLIST though its - vio is NULL. - */ - thd->system_thread= thread_type; - - thd->proc_info= "Initialized"; - thd->version= refresh_version; - thd->set_time(); - - DBUG_RETURN(0); -} - - -/* - Inits the main scheduler thread and then calls Event_scheduler::run() - of arg. - - SYNOPSIS - event_scheduler_thread() - arg void* ptr to Event_scheduler - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. - 3. The reason to have a proxy function is that it's not possible to - use a method as function to be executed in a spawned thread: - - our pthread_hander_t macro uses extern "C" - - separating thread setup from the real execution loop is also to be - considered good. - - RETURN VALUE - 0 OK -*/ - -pthread_handler_t -event_scheduler_thread(void *arg) -{ - /* needs to be first for thread_stack */ - THD *thd= NULL; - Event_scheduler *scheduler= (Event_scheduler *) arg; - - DBUG_ENTER("event_scheduler_thread"); - - my_thread_init(); - pthread_detach_this_thread(); - - /* note that constructor of THD uses DBUG_ ! */ - if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_SCHEDULER)) - { - sql_print_error("SCHEDULER: Cannot init manager event thread."); - scheduler->report_error_during_start(); - } - else - { - thd->security_ctx->set_user((char*)"event_scheduler"); - - sql_print_information("SCHEDULER: Manager thread booting"); - if (Event_scheduler::get_instance()->event_queue->check_system_tables(thd)) - scheduler->report_error_during_start(); - else - scheduler->run(thd); - - /* - NOTE: Don't touch `scheduler` after this point because we have notified - the - thread which shuts us down that we have finished cleaning. In this - very moment a new scheduler thread could be started and a crash is - not welcome. - */ - } - - /* - If we cannot create THD then don't decrease because we haven't touched - thread_count and thread_running in init_event_thread() which was never - called. In init_event_thread() thread_count and thread_running are - always increased even in the case the method returns an error. - */ - if (thd) - { - thd->proc_info= "Clearing"; - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); - pthread_mutex_lock(&LOCK_thread_count); - thread_count--; - thread_running--; - delete thd; - pthread_mutex_unlock(&LOCK_thread_count); - } - my_thread_end(); - DBUG_RETURN(0); // Can't return anything here -} - - -/* - Function that executes an event in a child thread. Setups the - environment for the event execution and cleans after that. - - SYNOPSIS - event_worker_thread() - arg The Event_timed object to be processed - - RETURN VALUE - 0 OK -*/ - -pthread_handler_t -event_worker_thread(void *arg) -{ - THD *thd; /* needs to be first for thread_stack */ - Worker_thread_param *param= (Worker_thread_param *) arg; - Event_timed *event= param->et; - int ret; - bool startup_error= FALSE; - Security_context *save_ctx; - /* this one is local and not needed after exec */ - Security_context security_ctx; - - DBUG_ENTER("event_worker_thread"); - DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str)); - - my_thread_init(); - pthread_detach_this_thread(); - - if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_WORKER)) - { - sql_print_error("SCHEDULER: Startup failure."); - startup_error= TRUE; - event->spawn_thread_finish(thd); - } - else - event->set_thread_id(thd->thread_id); - - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); - /* - If we don't change it before we send the signal back, then an intermittent - DROP EVENT will take LOCK_scheduler_data and try to kill this thread, because - event->thread_id is already real. However, because thd->security_ctx->user - is not initialized then a crash occurs in kill_one_thread(). Thus, we have - to change the context before sending the signal. We are under - LOCK_scheduler_data being held by Event_scheduler::run() -> ::execute_top(). - */ - thd->change_security_context(event->definer_user, event->definer_host, - event->dbname, &security_ctx, &save_ctx); - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); - - /* Signal the scheduler thread that we have started successfully */ - pthread_mutex_lock(¶m->LOCK_started); - param->started= TRUE; - pthread_cond_signal(¶m->COND_started); - pthread_mutex_unlock(¶m->LOCK_started); - - if (!startup_error) - { - thd->init_for_queries(); - thd->enable_slow_log= TRUE; - - event->set_thread_id(thd->thread_id); - sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id); - - ret= event->execute(thd, thd->mem_root); - evex_print_warnings(thd, event); - sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d", - event->dbname.str, event->name.str, - event->definer.str, ret); - if (ret == EVEX_COMPILE_ERROR) - sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", - event->dbname.str, event->name.str, - event->definer.str); - else if (ret == EVEX_MICROSECOND_UNSUP) - sql_print_information("SCHEDULER: MICROSECOND is not supported"); - - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); - - /* If true is returned, we are expected to free it */ - if (event->spawn_thread_finish(thd)) - { - DBUG_PRINT("info", ("Freeing object pointer")); - delete event; - } - } - - if (thd) - { - thd->proc_info= "Clearing"; - DBUG_ASSERT(thd->net.buff != 0); - /* - Free it here because net.vio is NULL for us => THD::~THD will check it - and won't call net_end(&net); See also replication code. - */ - net_end(&thd->net); - DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id)); - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thread_count--; - thread_running--; - delete thd; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - } - - my_thread_end(); - DBUG_RETURN(0); // Can't return anything here -} - - - - -/* - Constructor of class Event_scheduler. - - SYNOPSIS - Event_scheduler::Event_scheduler() -*/ - -Event_scheduler::Event_scheduler() -{ - thread_id= 0; - mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0; - mutex_last_unlocked_in_func= mutex_last_locked_in_func= ""; - cond_waiting_on= COND_NONE; - mutex_scheduler_data_locked= FALSE; - state= UNINITIALIZED; - start_scheduler_suspended= FALSE; - LOCK_scheduler_data= &LOCK_data; -} - - - -/* - Returns the singleton instance of the class. - - SYNOPSIS - Event_scheduler::create_instance() - - RETURN VALUE - address -*/ - -void -Event_scheduler::create_instance(Event_queue *queue) -{ - singleton= new Event_scheduler(); - singleton->event_queue= queue; -} - -/* - Returns the singleton instance of the class. - - SYNOPSIS - Event_scheduler::get_instance() - - RETURN VALUE - address -*/ - -Event_scheduler* -Event_scheduler::get_instance() -{ - DBUG_ENTER("Event_scheduler::get_instance"); - DBUG_RETURN(singleton); -} - - -/* - The implementation of full-fledged initialization. - - SYNOPSIS - Event_scheduler::init() - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -bool -Event_scheduler::init(Event_db_repository *db_repo) -{ - int i= 0; - bool ret= FALSE; - DBUG_ENTER("Event_scheduler::init"); - DBUG_PRINT("enter", ("this=%p", this)); - - LOCK_SCHEDULER_DATA(); - init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - for (;i < COND_LAST; i++) - if (pthread_cond_init(&cond_vars[i], NULL)) - { - sql_print_error("SCHEDULER: Unable to initalize conditions"); - ret= TRUE; - goto end; - } - state= INITIALIZED; -end: - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(ret); -} - - -/* - Frees all memory allocated by the scheduler object. - - SYNOPSIS - Event_scheduler::destroy() - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -void -Event_scheduler::destroy() -{ - DBUG_ENTER("Event_scheduler"); - LOCK_SCHEDULER_DATA(); - switch (state) { - case UNINITIALIZED: - break; - case INITIALIZED: - int i; - for (i= 0; i < COND_LAST; i++) - pthread_cond_destroy(&cond_vars[i]); - state= UNINITIALIZED; - break; - default: - sql_print_error("SCHEDULER: Destroying while state is %d", state); - /* I trust my code but ::safe() > ::sorry() */ - DBUG_ASSERT(0); - break; - } - UNLOCK_SCHEDULER_DATA(); - - DBUG_VOID_RETURN; -} - - -extern pthread_attr_t connection_attrib; - - -/* - Starts the event scheduler - - SYNOPSIS - Event_scheduler::start() - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -bool -Event_scheduler::start() -{ - bool ret= FALSE; - pthread_t th; - DBUG_ENTER("Event_scheduler::start"); - - LOCK_SCHEDULER_DATA(); - /* If already working or starting don't make another attempt */ - DBUG_ASSERT(state == INITIALIZED); - if (state > INITIALIZED) - { - DBUG_PRINT("info", ("scheduler is already running or starting")); - ret= TRUE; - goto end; - } - - /* - Now if another thread calls start it will bail-out because the branch - above will be executed. Thus no two or more child threads will be forked. - If the child thread cannot start for some reason then `state` is set - to CANTSTART and COND_started is also signaled. In this case we - set `state` back to INITIALIZED so another attempt to start the scheduler - can be made. - */ - state= COMMENCING; - /* Fork */ - if (pthread_create(&th, &connection_attrib, event_scheduler_thread, - (void*)this)) - { - DBUG_PRINT("error", ("cannot create a new thread")); - state= INITIALIZED; - ret= TRUE; - goto end; - } - - /* Wait till the child thread has booted (w/ or wo success) */ - while (!(state == SUSPENDED || state == RUNNING) && state != CANTSTART) - cond_wait(COND_started_or_stopped, LOCK_scheduler_data); - - /* - If we cannot start for some reason then don't prohibit further attempts. - Set back to INITIALIZED. - */ - if (state == CANTSTART) - { - state= INITIALIZED; - ret= TRUE; - goto end; - } - -end: - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(ret); -} - - -/* - Starts the event scheduler in suspended mode. - - SYNOPSIS - Event_scheduler::start_suspended() - - RETURN VALUE - TRUE OK - FALSE Error -*/ - -bool -Event_scheduler::start_suspended() -{ - DBUG_ENTER("Event_scheduler::start_suspended"); - start_scheduler_suspended= TRUE; - DBUG_RETURN(start()); -} - - - -/* - Report back that we cannot start. Used for ocasions where - we can't go into ::run() and have to report externally. - - SYNOPSIS - Event_scheduler::report_error_during_start() -*/ - -inline void -Event_scheduler::report_error_during_start() -{ - DBUG_ENTER("Event_scheduler::report_error_during_start"); - - LOCK_SCHEDULER_DATA(); - state= CANTSTART; - DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); - pthread_cond_signal(&cond_vars[COND_started_or_stopped]); - UNLOCK_SCHEDULER_DATA(); - - DBUG_VOID_RETURN; -} - - -/* - The internal loop of the event scheduler - - SYNOPSIS - Event_scheduler::run() - thd Thread - - RETURN VALUE - FALSE OK - TRUE Failure -*/ - -bool -Event_scheduler::run(THD *thd) -{ - int ret; - struct timespec abstime; - DBUG_ENTER("Event_scheduler::run"); - DBUG_PRINT("enter", ("thd=%p", thd)); - - LOCK_SCHEDULER_DATA(); - ret= event_queue->load_events_from_db(thd); - - if (!ret) - { - thread_id= thd->thread_id; - state= start_scheduler_suspended? SUSPENDED:RUNNING; - start_scheduler_suspended= FALSE; - } - else - state= CANTSTART; - - DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); - pthread_cond_signal(&cond_vars[COND_started_or_stopped]); - if (ret) - { - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(TRUE); - } - if (!check_n_suspend_if_needed(thd)) - UNLOCK_SCHEDULER_DATA(); - - sql_print_information("SCHEDULER: Manager thread started with id %lu", - thd->thread_id); - abstime.tv_nsec= 0; - while ((state == SUSPENDED || state == RUNNING)) - { - Event_timed *et; - - LOCK_SCHEDULER_DATA(); - if (check_n_wait_for_non_empty_queue(thd)) - continue; - - /* On TRUE data is unlocked, go back to the beginning */ - if (check_n_suspend_if_needed(thd)) - continue; - - /* Guaranteed locked here */ - if (state == IN_SHUTDOWN || shutdown_in_progress) - { - UNLOCK_SCHEDULER_DATA(); - break; - } - DBUG_ASSERT(state == RUNNING); - -// et= (Event_timed *)queue_top(&event_queue->queue); - et= event_queue->get_top(); - - /* Skip disabled events */ - if (et->status != Event_timed::ENABLED) - { - /* - It could be a one-timer scheduled for a time, already in the past when the - scheduler was suspended. - */ - sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue", - et->dbname.length, et->dbname.str, et->name.length, - et->name.str); - queue_remove(&event_queue->queue, 0); - /* ToDo: check this again */ - if (et->dropped) - et->drop(thd); - delete et; - UNLOCK_SCHEDULER_DATA(); - continue; - } - thd->proc_info= (char *)"Computing"; - DBUG_PRINT("evex manager",("computing time to sleep till next exec")); - /* Timestamp is in UTC */ - abstime.tv_sec= sec_since_epoch_TIME(&et->execute_at); - - thd->end_time(); - if (abstime.tv_sec > thd->query_start()) - { - /* Event trigger time is in the future */ - thd->proc_info= (char *)"Sleep"; - DBUG_PRINT("info", ("Going to sleep. Should wakeup after approx %d secs", - abstime.tv_sec - thd->query_start())); - DBUG_PRINT("info", ("Entering condition because waiting for activation")); - /* - Use THD::enter_cond()/exit_cond() or we won't be able to kill a - sleeping thread. Though ::stop() can do it by sending COND_new_work - an user can't by just issuing 'KILL x'; . In the latter case - pthread_cond_timedwait() will wait till `abstime`. - "Sleeping until next time" - */ - thd->enter_cond(&cond_vars[COND_new_work],LOCK_scheduler_data,"Sleeping"); - - pthread_cond_timedwait(&cond_vars[COND_new_work], LOCK_scheduler_data, - &abstime); - - DBUG_PRINT("info", ("Manager woke up. state is %d", state)); - /* - If we get signal we should recalculate the whether it's the right time - because there could be : - 1. Spurious wake-up - 2. The top of the queue was changed (new one becase of add/drop/replace) - */ - /* This will do implicit UNLOCK_SCHEDULER_DATA() */ - thd->exit_cond(""); - } - else - { - thd->proc_info= (char *)"Executing"; - /* - Execute the event. An error may occur if a thread cannot be forked. - In this case stop the manager. - We should enter ::execute_top() with locked LOCK_scheduler_data. - */ - int ret= execute_top(thd, et); - UNLOCK_SCHEDULER_DATA(); - if (ret) - break; - } - } - - thd->proc_info= (char *)"Cleaning"; - - LOCK_SCHEDULER_DATA(); - /* - It's possible that a user has used (SQL)COM_KILL. Hence set the appropriate - state because it is only set by ::stop(). - */ - if (state != IN_SHUTDOWN) - { - DBUG_PRINT("info", ("We got KILL but the but not from ::stop()")); - state= IN_SHUTDOWN; - } - UNLOCK_SCHEDULER_DATA(); - - sql_print_information("SCHEDULER: Shutting down"); - - thd->proc_info= (char *)"Cleaning queue"; - clean_memory(thd); - THD_CHECK_SENTRY(thd); - - /* free mamager_root memory but don't destroy the root */ - thd->proc_info= (char *)"Cleaning memory root"; - free_root(&scheduler_root, MYF(0)); - THD_CHECK_SENTRY(thd); - - /* - We notify the waiting thread which shutdowns us that we have cleaned. - There are few more instructions to be executed in this pthread but - they don't affect manager structures thus it's safe to signal already - at this point. - */ - LOCK_SCHEDULER_DATA(); - thd->proc_info= (char *)"Sending shutdown signal"; - DBUG_PRINT("info", ("Sending COND_started_or_stopped")); - if (state == IN_SHUTDOWN) - pthread_cond_signal(&cond_vars[COND_started_or_stopped]); - - state= INITIALIZED; - /* - We set it here because ::run() can stop not only because of ::stop() - call but also because of `KILL x` - */ - thread_id= 0; - sql_print_information("SCHEDULER: Stopped"); - UNLOCK_SCHEDULER_DATA(); - - /* We have modified, we set back */ - thd->query= NULL; - thd->query_length= 0; - - DBUG_RETURN(FALSE); -} - - -/* - Executes the top element of the queue. Auxiliary method for ::run(). - - SYNOPSIS - Event_scheduler::execute_top() - - RETURN VALUE - FALSE OK - TRUE Failure - - NOTE - NO locking is done. EXPECTED is that the caller should have locked - the queue (w/ LOCK_scheduler_data). -*/ - -bool -Event_scheduler::execute_top(THD *thd, Event_timed *et) -{ - int spawn_ret_code; - bool ret= FALSE; - DBUG_ENTER("Event_scheduler::execute_top"); - DBUG_PRINT("enter", ("thd=%p", thd)); - - /* Is it good idea to pass a stack address ?*/ - Worker_thread_param param(et); - - pthread_mutex_lock(¶m.LOCK_started); - /* - We don't lock LOCK_scheduler_data fpr workers_increment() because it's a - pre-requisite for calling the current_method. - */ - switch ((spawn_ret_code= et->spawn_now(event_worker_thread, ¶m))) { - case EVENT_EXEC_CANT_FORK: - /* - We don't lock LOCK_scheduler_data here because it's a pre-requisite - for calling the current_method. - */ - sql_print_error("SCHEDULER: Problem while trying to create a thread"); - ret= TRUE; - break; - case EVENT_EXEC_ALREADY_EXEC: - /* - We don't lock LOCK_scheduler_data here because it's a pre-requisite - for calling the current_method. - */ - sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", - et->dbname.str, et->name.str); - if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) - event_queue->remove_top(); - else - event_queue->top_changed(); - break; - default: - DBUG_ASSERT(!spawn_ret_code); - if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) - event_queue->remove_top(); - else - event_queue->top_changed(); - /* - We don't lock LOCK_scheduler_data here because it's a pre-requisite - for calling the current_method. - */ - if (likely(!spawn_ret_code)) - { - /* Wait the forked thread to start */ - do { - pthread_cond_wait(¶m.COND_started, ¶m.LOCK_started); - } while (!param.started); - } - /* - param was allocated on the stack so no explicit delete as well as - in this moment it's no more used in the spawned thread so it's safe - to be deleted. - */ - break; - } - pthread_mutex_unlock(¶m.LOCK_started); - /* `param` is on the stack and will be destructed by the compiler */ - - DBUG_RETURN(ret); -} - - -/* - Cleans the scheduler's queue. Auxiliary method for ::run(). - - SYNOPSIS - Event_scheduler::clean_queue() - thd Thread -*/ - -void -Event_scheduler::clean_memory(THD *thd) -{ - CHARSET_INFO *scs= system_charset_info; - uint i; - DBUG_ENTER("Event_scheduler::clean_queue"); - DBUG_PRINT("enter", ("thd=%p", thd)); - - LOCK_SCHEDULER_DATA(); - stop_all_running_events(thd); - UNLOCK_SCHEDULER_DATA(); - - sql_print_information("SCHEDULER: Emptying the queue"); - - event_queue->empty_queue(); - - DBUG_VOID_RETURN; -} - - -/* - Stops all running events - - SYNOPSIS - Event_scheduler::stop_all_running_events() - thd Thread - - NOTE - LOCK_scheduler data must be acquired prior to call to this method -*/ - -void -Event_scheduler::stop_all_running_events(THD *thd) -{ - CHARSET_INFO *scs= system_charset_info; - uint i; - DYNAMIC_ARRAY running_threads; - THD *tmp; - DBUG_ENTER("Event_scheduler::stop_all_running_events"); - DBUG_PRINT("enter", ("workers_count=%d", workers_count())); - - my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); - - bool had_super= FALSE; - VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; - if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) - push_dynamic(&running_threads, (gptr) &tmp->thread_id); - } - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - /* We need temporarily SUPER_ACL to be able to kill our offsprings */ - if (!(thd->security_ctx->master_access & SUPER_ACL)) - thd->security_ctx->master_access|= SUPER_ACL; - else - had_super= TRUE; - - char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; - char int_buff[STRING_BUFFER_USUAL_SIZE]; - String tmp_string(tmp_buff, sizeof(tmp_buff), scs); - String int_string(int_buff, sizeof(int_buff), scs); - tmp_string.length(0); - - for (i= 0; i < running_threads.elements; ++i) - { - int ret; - ulong thd_id= *dynamic_element(&running_threads, i, ulong*); - - int_string.set((longlong) thd_id,scs); - tmp_string.append(int_string); - if (i < running_threads.elements - 1) - tmp_string.append(' '); - - if ((ret= kill_one_thread(thd, thd_id, FALSE))) - { - sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); - break; - } - } - if (running_threads.elements) - sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); - - if (!had_super) - thd->security_ctx->master_access &= ~SUPER_ACL; - - delete_dynamic(&running_threads); - - sql_print_information("SCHEDULER: Waiting for worker threads to finish"); - - while (workers_count()) - my_sleep(100000); - - DBUG_VOID_RETURN; -} - - -/* - Stops the event scheduler - - SYNOPSIS - Event_scheduler::stop() - - RETURN VALUE - OP_OK OK - OP_CANT_KILL Error during stopping of manager thread - OP_NOT_RUNNING Manager not working - - NOTE - The caller must have acquited LOCK_scheduler_data. -*/ - -int -Event_scheduler::stop() -{ - THD *thd= current_thd; - DBUG_ENTER("Event_scheduler::stop"); - DBUG_PRINT("enter", ("thd=%p", current_thd)); - - LOCK_SCHEDULER_DATA(); - if (!(state == SUSPENDED || state == RUNNING)) - { - /* - One situation to be here is if there was a start that forked a new - thread but the new thread did not acquire yet LOCK_scheduler_data. - Hence, in this case return an error. - */ - DBUG_PRINT("info", ("manager not running but %d. doing nothing", state)); - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(OP_NOT_RUNNING); - } - state= IN_SHUTDOWN; - - DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); - sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); - - /* - Sending the COND_new_work to ::run() is a way to get this working without - race conditions. If we use kill_one_thread() it will call THD::awake() and - because in ::run() both THD::enter_cond()/::exit_cond() are used, - THD::awake() will try to lock LOCK_scheduler_data. If we UNLOCK it before, - then the pthread_cond_signal(COND_started_or_stopped) could be signaled in - ::run() and we can miss the signal before we relock. A way is to use - another mutex for this shutdown procedure but better not. - */ - pthread_cond_signal(&cond_vars[COND_new_work]); - /* Or we are suspended - then we should wake up */ - pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); - - /* Guarantee we don't catch spurious signals */ - sql_print_information("SCHEDULER: Waiting the manager thread to reply"); - while (state != INITIALIZED) - { - DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " - "thread. Current value of state is %d . " - "workers count=%d", state, workers_count())); - cond_wait(COND_started_or_stopped, LOCK_scheduler_data); - } - DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); - UNLOCK_SCHEDULER_DATA(); - - DBUG_RETURN(OP_OK); -} - - -/* - Suspends or resumes the scheduler. - SUSPEND - it won't execute any event till resumed. - RESUME - it will resume if suspended. - - SYNOPSIS - Event_scheduler::suspend_or_resume() - - RETURN VALUE - OP_OK OK -*/ - -int -Event_scheduler::suspend_or_resume( - enum Event_scheduler::enum_suspend_or_resume action) -{ - DBUG_ENTER("Event_scheduler::suspend_or_resume"); - DBUG_PRINT("enter", ("action=%d", action)); - - LOCK_SCHEDULER_DATA(); - - if ((action == SUSPEND && state == SUSPENDED) || - (action == RESUME && state == RUNNING)) - { - DBUG_PRINT("info", ("Either trying to suspend suspended or resume " - "running scheduler. Doing nothing.")); - } - else - { - /* Wake the main thread up if he is asleep */ - DBUG_PRINT("info", ("Sending signal")); - if (action==SUSPEND) - { - state= SUSPENDED; - pthread_cond_signal(&cond_vars[COND_new_work]); - } - else - { - state= RUNNING; - pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); - } - DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume")); - cond_wait(COND_suspend_or_resume, LOCK_scheduler_data); - DBUG_PRINT("info", ("Got response")); - } - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(OP_OK); -} - - -/* - Returns the number of executing events. - - SYNOPSIS - Event_scheduler::workers_count() -*/ - -uint -Event_scheduler::workers_count() -{ - THD *tmp; - uint count= 0; - - DBUG_ENTER("Event_scheduler::workers_count"); - VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; - if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) - ++count; - } - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - DBUG_PRINT("exit", ("%d", count)); - DBUG_RETURN(count); -} - - -/* - Checks and suspends if needed - - SYNOPSIS - Event_scheduler::check_n_suspend_if_needed() - thd Thread - - RETURN VALUE - FALSE Not suspended, we haven't slept - TRUE We were suspended. LOCK_scheduler_data is unlocked. - - NOTE - The caller should have locked LOCK_scheduler_data! - The mutex will be unlocked in case this function returns TRUE -*/ - -bool -Event_scheduler::check_n_suspend_if_needed(THD *thd) -{ - bool was_suspended= FALSE; - DBUG_ENTER("Event_scheduler::check_n_suspend_if_needed"); - if (thd->killed && !shutdown_in_progress) - { - state= SUSPENDED; - thd->killed= THD::NOT_KILLED; - } - if (state == SUSPENDED) - { - thd->enter_cond(&cond_vars[COND_suspend_or_resume], LOCK_scheduler_data, - "Suspended"); - /* Send back signal to the thread that asked us to suspend operations */ - pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); - sql_print_information("SCHEDULER: Suspending operations"); - was_suspended= TRUE; - } - while (state == SUSPENDED) - { - cond_wait(COND_suspend_or_resume, LOCK_scheduler_data); - DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume")); - if (state != SUSPENDED) - { - pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); - sql_print_information("SCHEDULER: Resuming operations"); - } - } - if (was_suspended) - { - event_queue->recalculate_queue(thd); - /* This will implicitly unlock LOCK_scheduler_data */ - thd->exit_cond(""); - } - DBUG_RETURN(was_suspended); -} - - -/* - Checks for empty queue and waits till new element gets in - - SYNOPSIS - Event_scheduler::check_n_wait_for_non_empty_queue() - thd Thread - - RETURN VALUE - FALSE Did not wait - LOCK_scheduler_data still locked. - TRUE Waited - LOCK_scheduler_data unlocked. - - NOTE - The caller should have locked LOCK_scheduler_data! -*/ - -bool -Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) -{ - bool slept= FALSE; - DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); - DBUG_PRINT("enter", ("q.elements=%lu state=%s", - event_queue->events_count_no_lock(), states_names[state])); - - if (!event_queue->events_count_no_lock()) - thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data, - "Empty queue, sleeping"); - - /* Wait in a loop protecting against catching spurious signals */ - while (!event_queue->events_count_no_lock() && state == RUNNING) - { - slept= TRUE; - DBUG_PRINT("info", ("Entering condition because of empty queue")); - cond_wait(COND_new_work, LOCK_scheduler_data); - DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d", - state)); - /* - exit_cond does implicit mutex_UNLOCK, we needed it locked if - 1. we loop again - 2. end the current loop and start doing calculations - */ - } - if (slept) - thd->exit_cond(""); - - DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", - event_queue->events_count_no_lock(), states_names[state], thd->killed)); - - DBUG_RETURN(slept); -} - - -/* - Returns the current state of the scheduler - - SYNOPSIS - Event_scheduler::get_state() -*/ - -enum Event_scheduler::enum_state -Event_scheduler::get_state() -{ - enum Event_scheduler::enum_state ret; - DBUG_ENTER("Event_scheduler::get_state"); - /* lock_data & unlock_data are not static */ - pthread_mutex_lock(singleton->LOCK_scheduler_data); - ret= singleton->state; - pthread_mutex_unlock(singleton->LOCK_scheduler_data); - DBUG_RETURN(ret); -} - - -/* - Returns whether the scheduler was initialized. - - SYNOPSIS - Event_scheduler::initialized() - - RETURN VALUE - FALSE Was not initialized so far - TRUE Was initialized -*/ - -bool -Event_scheduler::initialized() -{ - DBUG_ENTER("Event_scheduler::initialized"); - DBUG_RETURN(Event_scheduler::get_state() != UNINITIALIZED); -} - - - - -/* - Dumps some data about the internal status of the scheduler. - - SYNOPSIS - Event_scheduler::dump_internal_status() - thd THD - - RETURN VALUE - 0 OK - 1 Error -*/ - -int -Event_scheduler::dump_internal_status(THD *thd) -{ - DBUG_ENTER("dump_internal_status"); -#ifndef DBUG_OFF - CHARSET_INFO *scs= system_charset_info; - Protocol *protocol= thd->protocol; - List<Item> field_list; - int ret; - char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; - char int_buff[STRING_BUFFER_USUAL_SIZE]; - String tmp_string(tmp_buff, sizeof(tmp_buff), scs); - String int_string(int_buff, sizeof(int_buff), scs); - tmp_string.length(0); - int_string.length(0); - - field_list.push_back(new Item_empty_string("Name", 20)); - field_list.push_back(new Item_empty_string("Value",20)); - if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | - Protocol::SEND_EOF)) - DBUG_RETURN(1); - - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("state"), scs); - protocol->store(states_names[singleton->state].str, - states_names[singleton->state].length, - scs); - - ret= protocol->write(); - /* - If not initialized - don't show anything else. get_instance() - will otherwise implicitly initialize it. We don't want that. - */ - if (singleton->state >= INITIALIZED) - { - /* last locked at*/ - /* - The first thing to do, or get_instance() will overwrite the values. - mutex_last_locked_at_line / mutex_last_unlocked_at_line - */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("last locked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - singleton->mutex_last_locked_in_func, - singleton->mutex_last_locked_at_line)); - protocol->store(&tmp_string); - ret= protocol->write(); - - /* last unlocked at*/ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("last unlocked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - singleton->mutex_last_unlocked_in_func, - singleton->mutex_last_unlocked_at_line)); - protocol->store(&tmp_string); - ret= protocol->write(); - - /* waiting on */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("waiting on condition"), scs); - tmp_string.length(scs->cset-> - snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s", - (singleton->cond_waiting_on != COND_NONE) ? - cond_vars_names[singleton->cond_waiting_on]: - "NONE")); - protocol->store(&tmp_string); - ret= protocol->write(); - - Event_scheduler *scheduler= get_instance(); - - /* workers_count */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("workers_count"), scs); - int_string.set((longlong) scheduler->workers_count(), scs); - protocol->store(&int_string); - ret= protocol->write(); - - /* queue.elements */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("queue.elements"), scs); - int_string.set((longlong) scheduler->event_queue->events_count_no_lock(), scs); - protocol->store(&int_string); - ret= protocol->write(); - - /* scheduler_data_locked */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); - int_string.set((longlong) scheduler->mutex_scheduler_data_locked, scs); - protocol->store(&int_string); - ret= protocol->write(); - } - send_eof(thd); -#endif - DBUG_RETURN(0); -} - - -/* - Wrapper for pthread_mutex_lock - - SYNOPSIS - Event_scheduler::lock_data() - mutex Mutex to lock - line The line number on which the lock is done - - RETURN VALUE - Error code of pthread_mutex_lock() -*/ - -void -Event_scheduler::lock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler::lock_mutex"); - DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", - &LOCK_scheduler_data, func, line)); - pthread_mutex_lock(LOCK_scheduler_data); - mutex_last_locked_in_func= func; - mutex_last_locked_at_line= line; - mutex_scheduler_data_locked= TRUE; - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_mutex_unlock - - SYNOPSIS - Event_scheduler::unlock_data() - mutex Mutex to unlock - line The line number on which the unlock is done -*/ - -void -Event_scheduler::unlock_data(const char *func, uint line) -{ - DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); - DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", - LOCK_scheduler_data, func, line)); - mutex_last_unlocked_at_line= line; - mutex_scheduler_data_locked= FALSE; - mutex_last_unlocked_in_func= func; - pthread_mutex_unlock(LOCK_scheduler_data); - DBUG_VOID_RETURN; -} - - -/* - Wrapper for pthread_cond_wait - - SYNOPSIS - Event_scheduler::cond_wait() - cond Conditional to wait for - mutex Mutex of the conditional - - RETURN VALUE - Error code of pthread_cond_wait() -*/ - -int -Event_scheduler::cond_wait(int cond, pthread_mutex_t *mutex) -{ - int ret; - DBUG_ENTER("Event_scheduler::cond_wait"); - DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); - ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); - cond_waiting_on= COND_NONE; - DBUG_RETURN(ret); -} - - -/* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_scheduler::queue_changed() -*/ - -void -Event_scheduler::queue_changed() -{ - DBUG_ENTER("Event_scheduler::queue_changed"); - DBUG_PRINT("info", ("Sending COND_new_work")); - pthread_cond_signal(&cond_vars[COND_new_work]); - DBUG_VOID_RETURN; -} - - -/* - Inits mutexes. - - SYNOPSIS - Event_scheduler::init_mutexes() -*/ - -void -Event_scheduler::init_mutexes() -{ - pthread_mutex_init(singleton->LOCK_scheduler_data, MY_MUTEX_INIT_FAST); -} - - -/* - Destroys mutexes. - - SYNOPSIS - Event_queue::destroy_mutexes() -*/ - -void -Event_scheduler::destroy_mutexes() -{ - pthread_mutex_destroy(singleton->LOCK_scheduler_data); -} diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index 7d02e98d4fe..acd0debe391 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -16,191 +16,4 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -class sp_name; -class Event_timed; -class Event_db_repository; -class Event_queue; - -class THD; - -int -events_init(); - -void -events_shutdown(); - -#include "event_queue.h" -#include "event_scheduler.h" - -class Event_scheduler -{ -public: - enum enum_state - { - UNINITIALIZED= 0, - INITIALIZED, - COMMENCING, - CANTSTART, - RUNNING, - SUSPENDED, - IN_SHUTDOWN - }; - - enum enum_suspend_or_resume - { - SUSPEND= 1, - RESUME= 2 - }; - - /* This is the current status of the life-cycle of the scheduler. */ - enum enum_state state; - - - static void - create_instance(Event_queue *queue); - - static void - init_mutexes(); - - static void - destroy_mutexes(); - - /* Singleton access */ - static Event_scheduler* - get_instance(); - - bool - init(Event_db_repository *db_repo); - - void - destroy(); - - /* State changing methods follow */ - - bool - start(); - - int - stop(); - - bool - start_suspended(); - - /* - Need to be public because has to be called from the function - passed to pthread_create. - */ - bool - run(THD *thd); - - int - suspend_or_resume(enum enum_suspend_or_resume action); -/* - static void - init_mutexes(); - - static void - destroy_mutexes(); -*/ - void - report_error_during_start(); - - /* Information retrieving methods follow */ - - enum enum_state - get_state(); - - bool - initialized(); - - static int - dump_internal_status(THD *thd); - - /* helper functions for working with mutexes & conditionals */ - void - lock_data(const char *func, uint line); - - void - unlock_data(const char *func, uint line); - - int - cond_wait(int cond, pthread_mutex_t *mutex); - - void - queue_changed(); - - Event_queue *event_queue; - -protected: - - uint - workers_count(); - - /* helper functions */ - bool - execute_top(THD *thd, Event_timed *et); - - void - clean_memory(THD *thd); - - void - stop_all_running_events(THD *thd); - - - bool - check_n_suspend_if_needed(THD *thd); - - bool - check_n_wait_for_non_empty_queue(THD *thd); - - /* Singleton DP is used */ - Event_scheduler(); - - pthread_mutex_t LOCK_data; - pthread_mutex_t *LOCK_scheduler_data; - - /* The MEM_ROOT of the object */ - MEM_ROOT scheduler_root; - - /* Set to start the scheduler in suspended state */ - bool start_scheduler_suspended; - - /* - Holds the thread id of the executor thread or 0 if the executor is not - running. It is used by ::shutdown() to know which thread to kill with - kill_one_thread(). The latter wake ups a thread if it is waiting on a - conditional variable and sets thd->killed to non-zero. - */ - ulong thread_id; - - enum enum_cond_vars - { - COND_NONE= -1, - COND_new_work= 0, - COND_started_or_stopped, - COND_suspend_or_resume, - /* Must be always last */ - COND_LAST - }; - - uint mutex_last_locked_at_line; - uint mutex_last_unlocked_at_line; - const char* mutex_last_locked_in_func; - const char* mutex_last_unlocked_in_func; - int cond_waiting_on; - bool mutex_scheduler_data_locked; - - static const char * const cond_vars_names[COND_LAST]; - - pthread_cond_t cond_vars[COND_LAST]; - - /* Singleton instance */ - static Event_scheduler *singleton; - -private: - /* Prevent use of these */ - Event_scheduler(const Event_scheduler &); - void operator=(Event_scheduler &); -}; - #endif /* _EVENT_SCHEDULER_H_ */ diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc index 9dc3bb26bc7..b256ef8411f 100644 --- a/sql/event_scheduler_ng.cc +++ b/sql/event_scheduler_ng.cc @@ -212,6 +212,7 @@ end: pthread_mutex_unlock(&LOCK_thread_count); my_thread_end(); + DBUG_RETURN(0); // Against gcc warnings } @@ -296,26 +297,22 @@ end: delete event; my_thread_end(); + DBUG_RETURN(0); // Against gcc warnings } bool -Event_scheduler_ng::init(Event_queue *q) +Event_scheduler_ng::init_scheduler(Event_queue *q) { thread_id= 0; state= INITIALIZED; - /* init memory root */ - queue= q; - return FALSE; } void -Event_scheduler_ng::deinit() -{ -} +Event_scheduler_ng::deinit_scheduler() {} void @@ -477,7 +474,6 @@ Event_scheduler_ng::run(THD *thd) pthread_cond_signal(&COND_state); error: state= INITIALIZED; - stop_all_running_events(thd); UNLOCK_SCHEDULER_DATA(); sql_print_information("SCHEDULER: Stopped"); @@ -561,97 +557,17 @@ Event_scheduler_ng::workers_count() /* - Stops all running events - - SYNOPSIS - Event_scheduler::stop_all_running_events() - thd Thread - - NOTE - LOCK_scheduler data must be acquired prior to call to this method -*/ - -void -Event_scheduler_ng::stop_all_running_events(THD *thd) -{ - CHARSET_INFO *scs= system_charset_info; - uint i; - DYNAMIC_ARRAY running_threads; - THD *tmp; - DBUG_ENTER("Event_scheduler::stop_all_running_events"); - DBUG_PRINT("enter", ("workers_count=%d", workers_count())); - - my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); - - bool had_super= FALSE; - VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; - if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) - push_dynamic(&running_threads, (gptr) &tmp->thread_id); - } - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - /* We need temporarily SUPER_ACL to be able to kill our offsprings */ - if (!(thd->security_ctx->master_access & SUPER_ACL)) - thd->security_ctx->master_access|= SUPER_ACL; - else - had_super= TRUE; - - char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; - char int_buff[STRING_BUFFER_USUAL_SIZE]; - String tmp_string(tmp_buff, sizeof(tmp_buff), scs); - String int_string(int_buff, sizeof(int_buff), scs); - tmp_string.length(0); - - for (i= 0; i < running_threads.elements; ++i) - { - int ret; - ulong thd_id= *dynamic_element(&running_threads, i, ulong*); - - int_string.set((longlong) thd_id,scs); - tmp_string.append(int_string); - if (i < running_threads.elements - 1) - tmp_string.append(' '); - - if ((ret= kill_one_thread(thd, thd_id, FALSE))) - { - sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); - break; - } - } - if (running_threads.elements) - sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); - - if (!had_super) - thd->security_ctx->master_access &= ~SUPER_ACL; - - delete_dynamic(&running_threads); - - sql_print_information("SCHEDULER: Waiting for worker threads to finish"); - - while (workers_count()) - my_sleep(100000); - - DBUG_VOID_RETURN; -} - - -/* Signals the main scheduler thread that the queue has changed its state. SYNOPSIS - Event_scheduler::queue_changed() + Event_scheduler_ng::queue_changed() */ void Event_scheduler_ng::queue_changed() { - DBUG_ENTER("Event_scheduler::queue_changed"); + DBUG_ENTER("Event_scheduler_ng::queue_changed"); DBUG_PRINT("info", ("Sending COND_state")); pthread_cond_signal(&COND_state); DBUG_VOID_RETURN; @@ -662,8 +578,7 @@ void Event_scheduler_ng::lock_data(const char *func, uint line) { DBUG_ENTER("Event_scheduler_ng::lock_mutex"); - DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", - &LOCK_scheduler_state, func, line)); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); pthread_mutex_lock(&LOCK_scheduler_state); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; @@ -675,9 +590,8 @@ Event_scheduler_ng::lock_data(const char *func, uint line) void Event_scheduler_ng::unlock_data(const char *func, uint line) { - DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex"); - DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", - &LOCK_scheduler_state, func, line)); + DBUG_ENTER("Event_scheduler_ng::unlock_mutex"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); mutex_last_unlocked_at_line= line; mutex_scheduler_data_locked= FALSE; mutex_last_unlocked_in_func= func; diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h index b250923d23e..41642161fc6 100644 --- a/sql/event_scheduler_ng.h +++ b/sql/event_scheduler_ng.h @@ -48,10 +48,10 @@ public: run(THD *thd); bool - init(Event_queue *queue); + init_scheduler(Event_queue *queue); void - deinit(); + deinit_scheduler(); void init_mutexes(); @@ -78,9 +78,6 @@ private: bool execute_top(THD *thd, Event_timed *job_data); - void - stop_all_running_events(THD *thd); - /* helper functions for working with mutexes & conditionals */ void lock_data(const char *func, uint line); @@ -104,7 +101,6 @@ private: pthread_cond_t COND_state; Event_queue *queue; - Event_db_repository *db_repository; uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; diff --git a/sql/events.cc b/sql/events.cc index e4b6de965f7..d1bbb6be884 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -17,10 +17,10 @@ #include "mysql_priv.h" #include "events.h" #include "event_data_objects.h" -#include "event_scheduler.h" #include "event_db_repository.h" -#include "sp_head.h" +#include "event_queue.h" #include "event_scheduler_ng.h" +#include "sp_head.h" /* TODO list : @@ -48,6 +48,21 @@ Warning: */ +/* + If the user (un)intentionally removes an event directly from mysql.event + the following sequence has to be used to be able to remove the in-memory + counterpart. + 1. CREATE EVENT the_name ON SCHEDULE EVERY 1 SECOND DISABLE DO SELECT 1; + 2. DROP EVENT the_name + + In other words, the first one will create a row in mysql.event . In the + second step because there will be a line, disk based drop will pass and + the scheduler will remove the memory counterpart. The reason is that + in-memory queue does not check whether the event we try to drop from memory + is disabled. Disabled events are not kept in-memory because they are not + eligible for execution. +*/ + const char *event_scheduler_state_names[]= { "OFF", "0", "ON", "1", "SUSPEND", "2", NullS }; @@ -284,17 +299,15 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type, */ int -Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options, +Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists, uint *rows_affected) { int ret; DBUG_ENTER("Events::create_event"); - if (!(ret= db_repository-> - create_event(thd, parse_data, - create_options & HA_LEX_CREATE_IF_NOT_EXISTS, - rows_affected))) + if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists, + rows_affected))) { - if ((ret= event_queue->create_event(thd, parse_data, true))) + if ((ret= event_queue->create_event(thd, parse_data))) my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } /* No need to close the table, it will be closed in sql_parse::do_command */ @@ -350,9 +363,10 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, SYNOPSIS Events::drop_event() thd THD - name event's name - drop_if_exists if set and the event not existing => warning onto the stack - rows_affected affected number of rows is returned heres + name Event's name + if_exists When set and the event does not exist => warning onto + the stack + rows_affected Affected number of rows is returned heres RETURN VALUE 0 OK @@ -360,15 +374,13 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, */ int -Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists, - uint *rows_affected) +Events::drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected) { int ret; - DBUG_ENTER("Events::drop_event"); - if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, - drop_if_exists, rows_affected))) + if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, if_exists, + rows_affected))) { if ((ret= event_queue->drop_event(thd, name))) my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); @@ -401,7 +413,7 @@ Events::show_create_event(THD *thd, sp_name *spn) DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str)); thd->reset_n_backup_open_tables_state(&backup); - ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL, thd->mem_root); + ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL); thd->restore_backup_open_tables_state(&backup); if (!ret) @@ -472,7 +484,7 @@ Events::drop_schema_events(THD *thd, char *db) DBUG_ENTER("evex_drop_db_events"); DBUG_PRINT("enter", ("dropping events from %s", db)); - ret= event_queue->drop_schema_events(thd, db_lex); + event_queue->drop_schema_events(thd, db_lex); ret= db_repository->drop_schema_events(thd, db_lex); DBUG_RETURN(ret); @@ -500,9 +512,8 @@ Events::init() Event_db_repository *db_repo; DBUG_ENTER("Events::init"); db_repository->init_repository(); - event_queue->init(db_repository); - event_queue->scheduler= scheduler_ng; - scheduler_ng->init(event_queue); + event_queue->init_queue(db_repository, scheduler_ng); + scheduler_ng->init_scheduler(event_queue); /* it should be an assignment! */ if (opt_event_scheduler) @@ -532,8 +543,9 @@ Events::deinit() DBUG_ENTER("Events::deinit"); scheduler_ng->stop(); - scheduler_ng->deinit(); - event_queue->deinit(); + scheduler_ng->deinit_scheduler(); + + event_queue->deinit_queue(); db_repository->deinit_repository(); DBUG_VOID_RETURN; diff --git a/sql/events.h b/sql/events.h index 357312b44d1..bb9fe7a8fc5 100644 --- a/sql/events.h +++ b/sql/events.h @@ -75,7 +75,7 @@ public: get_instance(); int - create_event(THD *thd, Event_parse_data *parse_data, uint create_options, + create_event(THD *thd, Event_parse_data *parse_data, bool if_exists, uint *rows_affected); int @@ -83,7 +83,7 @@ public: uint *rows_affected); int - drop_event(THD *thd, sp_name *name, bool drop_if_exists, uint *rows_affected); + drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected); int drop_schema_events(THD *thd, char *db); @@ -105,9 +105,9 @@ public: int dump_internal_status(THD *thd); + Event_queue *event_queue; + Event_scheduler_ng *scheduler_ng; Event_db_repository *db_repository; - Event_queue *event_queue; - Event_scheduler_ng *scheduler_ng; private: /* Singleton DP is used */ diff --git a/sql/set_var.cc b/sql/set_var.cc index 1f55d9ea3cf..15f6ccc2089 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -57,7 +57,6 @@ #include <myisam.h> #include <my_dir.h> -#include "event_scheduler.h" #include "events.h" /* WITH_BERKELEY_STORAGE_ENGINE */ @@ -3894,7 +3893,6 @@ bool sys_var_event_scheduler::update(THD *thd, set_var *var) { int res; - Event_scheduler *scheduler= Event_scheduler::get_instance(); /* here start the thread if not running. */ DBUG_ENTER("sys_var_event_scheduler::update"); if (Events::opt_event_scheduler == 0) @@ -3927,8 +3925,6 @@ sys_var_event_scheduler::update(THD *thd, set_var *var) byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, LEX_STRING *base) { - Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (Events::opt_event_scheduler == 0) thd->sys_var_tmp.long_value= 0; else if (Events::get_instance()->is_started()) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 80e5fff5a58..a4987080b02 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3832,25 +3832,26 @@ end_with_restore_list: case SQLCOM_CREATE_EVENT: case SQLCOM_ALTER_EVENT: { - uint rows_affected= 1; + uint affected= 1; DBUG_ASSERT(lex->event_parse_data); switch (lex->sql_command) { case SQLCOM_CREATE_EVENT: - res= Events::get_instance()->create_event(thd, lex->event_parse_data, - (uint) lex->create_info.options, - &rows_affected); + res= Events::get_instance()-> + create_event(thd, lex->event_parse_data, + lex->create_info.options & HA_LEX_CREATE_IF_NOT_EXISTS, + &affected); break; case SQLCOM_ALTER_EVENT: - res= Events::get_instance()->update_event(thd, lex->event_parse_data, - lex->spname, &rows_affected); + res= Events::get_instance()-> + update_event(thd, lex->event_parse_data, lex->spname, &affected); break; default:; } - DBUG_PRINT("info", ("CREATE/ALTER/DROP returned error code=%d af_rows=%d", - res, rows_affected)); + DBUG_PRINT("info",("DDL error code=%d affected=%d", res, affected)); if (!res) - send_ok(thd, rows_affected); + send_ok(thd, affected); + /* Don't do it, if we are inside a SP */ if (!thd->spcont) { delete lex->sphead; @@ -3867,8 +3868,7 @@ end_with_restore_list: if (! lex->spname->m_db.str) { my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0)); - res= true; - break; + goto error; } if (check_access(thd, EVENT_ACL, lex->spname->m_db.str, 0, 0, 0, is_schema_db(lex->spname->m_db.str))) @@ -3885,11 +3885,10 @@ end_with_restore_list: res= Events::get_instance()->show_create_event(thd, lex->spname); else { - uint rows_affected= 1; - if (!(res= Events::get_instance()->drop_event(thd, lex->spname, - lex->drop_if_exists, - &rows_affected))) - send_ok(thd, rows_affected); + uint affected= 1; + if (!(res= Events::get_instance()-> + drop_event(thd, lex->spname, lex->drop_if_exists, &affected))) + send_ok(thd, affected); } break; } |