diff options
author | unknown <andrey@lmy004.> | 2006-07-04 18:44:35 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-04 18:44:35 +0200 |
commit | a5dfeb02e991e6e5e9e332443522de1bb4592df8 (patch) | |
tree | 4459403077605a188dd88b131d998654761070d2 /sql | |
parent | 377446fa3497ffbc0f2a17614d848bfb79f52662 (diff) | |
download | mariadb-git-a5dfeb02e991e6e5e9e332443522de1bb4592df8.tar.gz |
WL #3337 (Event scheduler new architecture)
Cut Nr. 8.
All tests pass.
Separated Event_scheduler into Event_queue and Event_scheduler.
Added new Event_scheduler_ng which is the new scheduler and is used
system-wide. Will be moved to the event_scheduler.cc in the future.
Using Event_timed in Event_queue as well as cloned during execution.
Next step is to have Event_worker_data which will be used during execution
and will take ::compile()/::execute() out of Event_timed.
mysql-test/r/events.result:
update result
mysql-test/r/events_bugs.result:
update result
mysql-test/r/ps_1general.result:
update result
mysql-test/r/skip_name_resolve.result:
update result
mysql-test/r/sp-threads.result:
update result
mysql-test/r/sp_notembedded.result:
update result
mysql-test/r/status.result:
update result
mysql-test/t/events_stress.test:
Make event_stress a bit longer
sql/Makefile.am:
Add new event_scheduler_ng.h/cc . These are only to be in the experimental
clone. Later their content will be moved to event_scheduler.h/cc
sql/event_data_objects.cc:
Allocate strings memory on own memory root, instead
on the schedulers. Thus don't "leak" memory. This should
fix bug#18683 memory leak in event scheduler
sql/event_data_objects.h:
add mem_root
add THD - this is only temporal, will be moved to class Event_job_data
once Event_job_data is responsible for the execution.
sql/event_db_repository.cc:
Remove unused code.
Cosmetic changes
sql/event_queue.cc:
Now use the Event_scheduler_ng (NextGen)
sql/event_queue.h:
Now use the Event_scheduler_ng (NextGen)
sql/event_scheduler.cc:
This file is no more used, but will be soon.
sql/event_scheduler.h:
This file is no more used but will be soon
sql/events.cc:
Now use the Event_scheduler_ng (NextGen)
sql/events.h:
Now use the Event_scheduler_ng (NextGen)
sql/mysqld.cc:
Make it again possible to kill the scheduler thread
sql/set_var.cc:
Now use the Event_scheduler_ng (NextGen)
sql/share/errmsg.txt:
Shorten the message.
sql/sql_show.cc:
Loading is on a own root, then don't use thd->mem_root
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 6 | ||||
-rw-r--r-- | sql/event_data_objects.cc | 47 | ||||
-rw-r--r-- | sql/event_data_objects.h | 33 | ||||
-rw-r--r-- | sql/event_db_repository.cc | 145 | ||||
-rw-r--r-- | sql/event_queue.cc | 169 | ||||
-rw-r--r-- | sql/event_queue.h | 30 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 302 | ||||
-rw-r--r-- | sql/event_scheduler.h | 31 | ||||
-rw-r--r-- | sql/event_scheduler_ng.cc | 686 | ||||
-rw-r--r-- | sql/event_scheduler_ng.h | 121 | ||||
-rw-r--r-- | sql/events.cc | 78 | ||||
-rw-r--r-- | sql/events.h | 13 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/set_var.cc | 26 | ||||
-rw-r--r-- | sql/share/errmsg.txt | 2 | ||||
-rw-r--r-- | sql/sql_show.cc | 2 |
16 files changed, 1183 insertions, 510 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index 49f85b3921d..4d76cdb5080 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sql_array.h sql_cursor.h events.h \ sql_plugin.h authors.h sql_partition.h event_data_objects.h \ event_queue.h event_db_repository.h \ - partition_info.h partition_element.h event_scheduler.h \ + partition_info.h partition_element.h event_scheduler_ng.h \ contributors.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ gstream.cc spatial.cc sql_help.cc sql_cursor.cc \ tztime.cc my_time.c my_user.c my_decimal.cc\ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ - sp_cache.cc parse_file.cc sql_trigger.cc \ - event_scheduler.cc events.cc event_data_objects.cc \ + sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\ + event_scheduler_ng.cc events.cc event_data_objects.cc \ event_queue.cc event_db_repository.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index f4147d72c3d..97db443e08d 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -556,6 +556,7 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0), Event_timed::~Event_timed() { deinit_mutexes(); + free_root(&mem_root, MYF(0)); if (free_sphead_on_delete) free_sp(); @@ -622,6 +623,8 @@ Event_timed::init() definer_user.length= definer_host.length= 0; sql_mode= 0; + /* init memory root */ + init_alloc_root(&mem_root, 256, 512); DBUG_VOID_RETURN; } @@ -644,7 +647,7 @@ Event_timed::init() */ int -Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) +Event_timed::load_from_row(TABLE *table) { char *ptr; Event_timed *et; @@ -661,22 +664,22 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) if (table->s->fields != ET_FIELD_COUNT) goto error; - if ((et->dbname.str= get_field(mem_root, table->field[ET_FIELD_DB])) == NULL) + if ((et->dbname.str= get_field(&mem_root, table->field[ET_FIELD_DB])) == NULL) goto error; et->dbname.length= strlen(et->dbname.str); - if ((et->name.str= get_field(mem_root, table->field[ET_FIELD_NAME])) == NULL) + if ((et->name.str= get_field(&mem_root, table->field[ET_FIELD_NAME])) == NULL) goto error; et->name.length= strlen(et->name.str); - if ((et->body.str= get_field(mem_root, table->field[ET_FIELD_BODY])) == NULL) + if ((et->body.str= get_field(&mem_root, table->field[ET_FIELD_BODY])) == NULL) goto error; et->body.length= strlen(et->body.str); - if ((et->definer.str= get_field(mem_root, + if ((et->definer.str= get_field(&mem_root, table->field[ET_FIELD_DEFINER])) == NullS) goto error; et->definer.length= strlen(et->definer.str); @@ -688,10 +691,10 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) len= ptr - et->definer.str; - et->definer_user.str= strmake_root(mem_root, et->definer.str, len); + et->definer_user.str= strmake_root(&mem_root, et->definer.str, len); et->definer_user.length= len; len= et->definer.length - len - 1; //1 is because of @ - et->definer_host.str= strmake_root(mem_root, ptr + 1, len);/* 1:because of @*/ + et->definer_host.str= strmake_root(&mem_root, ptr + 1, len);/* 1:because of @*/ et->definer_host.length= len; et->starts_null= table->field[ET_FIELD_STARTS]->is_null(); @@ -737,21 +740,21 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) last_executed_changed= false; /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ - if ((ptr= get_field(mem_root, table->field[ET_FIELD_STATUS])) == NullS) + if ((ptr= get_field(&mem_root, table->field[ET_FIELD_STATUS])) == NullS) goto error; DBUG_PRINT("load_from_row", ("Event [%s] is [%s]", et->name.str, ptr)); et->status= (ptr[0]=='E'? Event_timed::ENABLED:Event_timed::DISABLED); /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ - if ((ptr= get_field(mem_root, + if ((ptr= get_field(&mem_root, table->field[ET_FIELD_ON_COMPLETION])) == NullS) goto error; et->on_completion= (ptr[0]=='D'? Event_timed::ON_COMPLETION_DROP: Event_timed::ON_COMPLETION_PRESERVE); - et->comment.str= get_field(mem_root, table->field[ET_FIELD_COMMENT]); + et->comment.str= get_field(&mem_root, table->field[ET_FIELD_COMMENT]); if (et->comment.str != NullS) et->comment.length= strlen(et->comment.str); else @@ -953,10 +956,10 @@ Event_timed::compute_next_execution_time() int tmp; DBUG_ENTER("Event_timed::compute_next_execution_time"); - DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu", + DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu this=%p", TIME_to_ulonglong_datetime(&starts), TIME_to_ulonglong_datetime(&ends), - TIME_to_ulonglong_datetime(&last_executed))); + TIME_to_ulonglong_datetime(&last_executed), this)); if (status == Event_timed::DISABLED) { @@ -1168,7 +1171,8 @@ Event_timed::compute_next_execution_time() goto ret; } ret: - DBUG_PRINT("info", ("ret=0")); + DBUG_PRINT("info", ("ret=0 execute_at=%llu", + TIME_to_ulonglong_datetime(&execute_at))); DBUG_RETURN(false); err: DBUG_PRINT("info", ("ret=1")); @@ -1392,6 +1396,7 @@ Event_timed::get_create_event(THD *thd, String *buf) int Event_timed::execute(THD *thd, MEM_ROOT *mem_root) { + Security_context *save_ctx; /* this one is local and not needed after exec */ Security_context security_ctx; int ret= 0; @@ -1400,14 +1405,8 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]", dbname.str, name.str, (int) expression)); - VOID(pthread_mutex_lock(&this->LOCK_running)); - if (running) - { - VOID(pthread_mutex_unlock(&this->LOCK_running)); - DBUG_RETURN(-100); - } - running= true; - VOID(pthread_mutex_unlock(&this->LOCK_running)); + thd->change_security_context(definer_user, definer_host, dbname, + &security_ctx, &save_ctx); if (!sphead && (ret= compile(thd, mem_root))) goto done; @@ -1434,14 +1433,11 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) definer_host.str, dbname.str)); ret= -99; } - - VOID(pthread_mutex_lock(&this->LOCK_running)); - running= false; /* Will compile every time a new sp_head on different root */ free_sp(); - VOID(pthread_mutex_unlock(&this->LOCK_running)); done: + thd->restore_security_context(save_ctx); /* 1. Don't cache sphead if allocated on another mem_root 2. Don't call security_ctx.destroy() because this will free our dbname.str @@ -1807,3 +1803,4 @@ event_timed_identifier_equal(LEX_STRING db, LEX_STRING name, Event_timed *b) return !sortcmp_lex_string(name, b->name, system_charset_info) && !sortcmp_lex_string(db, b->dbname, system_charset_info); } + diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h index d8df8dd1e6c..5ae5c7e81ab 100644 --- a/sql/event_data_objects.h +++ b/sql/event_data_objects.h @@ -72,8 +72,11 @@ class Event_timed bool status_changed; bool last_executed_changed; + + MEM_ROOT mem_root; public: + THD *thd; enum enum_status { ENABLED = 1, @@ -147,7 +150,7 @@ public: deinit_mutexes(); int - load_from_row(MEM_ROOT *mem_root, TABLE *table); + load_from_row(TABLE *table); bool compute_next_execution_time(); @@ -264,9 +267,33 @@ public: }; -class Event_queue_element : public Event_timed +class Event_job_data { +public: + LEX_STRING dbname; + LEX_STRING name; + sp_head *sphead; + LEX_STRING definer; + LEX_STRING body; + ulong sql_mode; -}; + THD *thd; + + Event_job_data(){} + ~Event_job_data(){} + int + execute(); + +private: + int + load_from_disk(); + + int + compile(); + + + Event_job_data(const Event_job_data &); /* Prevent use of these */ + void operator=(Event_job_data &); +}; #endif /* _EVENT_DATA_OBJECTS_H_ */ diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index 8886992c839..074e05e5d8f 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -129,136 +129,10 @@ TABLE_FIELD_W_TYPE event_table_fields[ET_FIELD_COUNT] = { SYNOPSIS evex_fill_row() - thd THD - table the row to fill out - et Event's data - - RETURN VALUE - 0 - OK - EVEX_GENERAL_ERROR - bad data - EVEX_GET_FIELD_FAILED - field count does not match. table corrupted? - - DESCRIPTION - Used both when an event is created and when it is altered. -*/ - -static int -evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update) -{ - CHARSET_INFO *scs= system_charset_info; - enum enum_events_table_field field_num; - - DBUG_ENTER("evex_fill_row"); - - DBUG_PRINT("info", ("dbname=[%s]", et->dbname.str)); - DBUG_PRINT("info", ("name =[%s]", et->name.str)); - DBUG_PRINT("info", ("body =[%s]", et->body.str)); - - if (table->field[field_num= ET_FIELD_DEFINER]-> - store(et->definer.str, et->definer.length, scs)) - goto err_truncate; - - if (table->field[field_num= ET_FIELD_DB]-> - store(et->dbname.str, et->dbname.length, scs)) - goto err_truncate; - - if (table->field[field_num= ET_FIELD_NAME]-> - store(et->name.str, et->name.length, scs)) - goto err_truncate; - - /* both ON_COMPLETION and STATUS are NOT NULL thus not calling set_notnull()*/ - table->field[ET_FIELD_ON_COMPLETION]-> - store((longlong)et->on_completion, true); - - table->field[ET_FIELD_STATUS]->store((longlong)et->status, true); - - /* - Change the SQL_MODE only if body was present in an ALTER EVENT and of course - always during CREATE EVENT. - */ - if (et->body.str) - { - table->field[ET_FIELD_SQL_MODE]-> - store((longlong)thd->variables.sql_mode, true); - - if (table->field[field_num= ET_FIELD_BODY]-> - store(et->body.str, et->body.length, scs)) - goto err_truncate; - } - - if (et->expression) - { - table->field[ET_FIELD_INTERVAL_EXPR]->set_notnull(); - table->field[ET_FIELD_INTERVAL_EXPR]->store((longlong)et->expression, true); - - table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_notnull(); - /* - In the enum (C) intervals start from 0 but in mysql enum valid values start - from 1. Thus +1 offset is needed! - */ - table->field[ET_FIELD_TRANSIENT_INTERVAL]-> - store((longlong)et->interval+1, true); - - table->field[ET_FIELD_EXECUTE_AT]->set_null(); - - if (!et->starts_null) - { - table->field[ET_FIELD_STARTS]->set_notnull(); - table->field[ET_FIELD_STARTS]-> - store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME); - } - - if (!et->ends_null) - { - table->field[ET_FIELD_ENDS]->set_notnull(); - table->field[ET_FIELD_ENDS]-> - store_time(&et->ends, MYSQL_TIMESTAMP_DATETIME); - } - } - else if (et->execute_at.year) - { - table->field[ET_FIELD_INTERVAL_EXPR]->set_null(); - table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_null(); - table->field[ET_FIELD_STARTS]->set_null(); - table->field[ET_FIELD_ENDS]->set_null(); - - table->field[ET_FIELD_EXECUTE_AT]->set_notnull(); - table->field[ET_FIELD_EXECUTE_AT]-> - store_time(&et->execute_at, MYSQL_TIMESTAMP_DATETIME); - } - else - { - DBUG_ASSERT(is_update); - /* - it is normal to be here when the action is update - this is an error if the action is create. something is borked - */ - } - - ((Field_timestamp *)table->field[ET_FIELD_MODIFIED])->set_time(); - - if (et->comment.str) - { - if (table->field[field_num= ET_FIELD_COMMENT]-> - store(et->comment.str, et->comment.length, scs)) - goto err_truncate; - } - - DBUG_RETURN(0); -err_truncate: - my_error(ER_EVENT_DATA_TOO_LONG, MYF(0), table->field[field_num]->field_name); - DBUG_RETURN(EVEX_GENERAL_ERROR); -} - - -/* - Puts some data common to CREATE and ALTER EVENT into a row. - - SYNOPSIS - evex_fill_row() - thd THD - table the row to fill out - et Event's data + thd THD + table The row to fill out + et Event's data + is_update CREATE EVENT or ALTER EVENT RETURN VALUE 0 - OK @@ -596,7 +470,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, TABLE *table; int ret; Event_timed *et= NULL; - DBUG_ENTER("db_find_event"); + DBUG_ENTER("Event_db_repository::find_event"); DBUG_PRINT("enter", ("name: %*s", name.length, name.str)); if (tbl) @@ -621,7 +495,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, 2)::load_from_row() is silent on error therefore we emit error msg here */ - if ((ret= et->load_from_row(root, table))) + if ((ret= et->load_from_row(table))) { my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event"); goto done; @@ -722,7 +596,7 @@ evex_check_params(THD *thd, Event_parse_data *parse_data) const char *pos= NULL; Item *bad_item; - DBUG_ENTER("evex_check_timing_params"); + DBUG_ENTER("evex_check_params"); DBUG_PRINT("info", ("execute_at=0x%d expr=0x%d starts=0x%d ends=0x%d", parse_data->item_execute_at, parse_data->item_expression, @@ -1212,7 +1086,7 @@ Event_db_repository::drop_events_by_field(THD *thd, TABLE *table; Open_tables_state backup; READ_RECORD read_record_info; - DBUG_ENTER("drop_events_from_table_by_field"); + DBUG_ENTER("Event_db_repository::drop_events_by_field"); DBUG_PRINT("enter", ("field=%d field_value=%s", field, field_value.str)); if (open_event_table(thd, TL_WRITE, &table)) @@ -1270,7 +1144,7 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na Event_timed *et_loaded= NULL; Open_tables_state backup; - DBUG_ENTER("Event_scheduler::load_and_compile_event"); + DBUG_ENTER("Event_db_repository::load_named_event"); DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str)); thd->reset_n_backup_open_tables_state(&backup); @@ -1297,4 +1171,3 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na DBUG_RETURN(OP_OK); } - diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 32c5a076a62..44920b29c16 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -16,7 +16,7 @@ #include "mysql_priv.h" #include "events.h" -#include "event_scheduler.h" +#include "event_scheduler_ng.h" #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" @@ -35,10 +35,6 @@ #define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__) -Event_scheduler* -Event_queue::singleton= NULL; - - /* Compares the execute_at members of 2 Event_timed instances. Used as callback for the prioritized queue when shifting @@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) goto end; } - /* We need to load the event on scheduler_root */ if (!(res= db_repository-> load_named_event(thd, et->dbname, et->name, &et_new))) { + DBUG_PRINT("info", ("new event in the queue %p", et_new)); queue_insert_safe(&queue, (byte *) et_new); on_queue_change(); } @@ -130,7 +126,7 @@ end: Updates an event from the scheduler queue SYNOPSIS - Event_scheduler::update_event() + Event_queue::update_event() thd Thread et The event to replace(add) into the queue new_schema New schema @@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, et->dbname= *new_schema; et->name= *new_name; } - /* - We need to load the event (it's strings but on the object itself) - on scheduler_root. et_new could be NULL : - 1. Error occured - 2. If the replace is DISABLED, we don't load it into the queue. - */ + if (!(res= db_repository-> load_named_event(thd, et->dbname, et->name, &et_new))) { + DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old)); queue_insert_safe(&queue, (byte *) et_new); on_queue_change(); } @@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, /* - Drops an event from the scheduler queue + Drops an event from the queue SYNOPSIS Event_queue::drop_event() @@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name) } - - /* - Searches for an event in the scheduler queue + Searches for an event in the queue SYNOPSIS Event_queue::find_event() @@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) comparator The function to use for comparing RETURN VALUE - -1 Scheduler not working >=0 Number of dropped events NOTE @@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, db The schema name RETURN VALUE - -1 Scheduler not working >=0 Number of dropped events */ @@ -459,8 +447,7 @@ void Event_queue::lock_data(const char *func, uint line) { DBUG_ENTER("Event_queue::lock_mutex"); - DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", - &LOCK_event_queue, func, line)); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); pthread_mutex_lock(&LOCK_event_queue); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; @@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line) void Event_queue::unlock_data(const char *func, uint line) { - DBUG_ENTER("Event_queue::UNLOCK_mutex"); - DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", - &LOCK_event_queue, func, line)); + DBUG_ENTER("Event_queue::unlock_mutex"); + DBUG_PRINT("enter", ("func=%s line=%u", func, line)); mutex_last_unlocked_at_line= line; mutex_queue_data_locked= FALSE; mutex_last_unlocked_in_func= func; @@ -510,7 +496,7 @@ Event_queue::events_count() LOCK_QUEUE_DATA(); n= queue.elements; UNLOCK_QUEUE_DATA(); - + DBUG_PRINT("info", ("n=%u", n)); DBUG_RETURN(n); } @@ -529,7 +515,7 @@ uint Event_queue::events_count_no_lock() { uint n; - DBUG_ENTER("Event_scheduler::events_count_no_lock"); + DBUG_ENTER("Event_queue::events_count_no_lock"); n= queue.elements; @@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd) } DBUG_PRINT("info", ("Loading event from row.")); - if ((ret= et->load_from_row(&scheduler_root, table))) + if ((ret= et->load_from_row(table))) { clean_the_queue= TRUE; sql_print_error("SCHEDULER: Error while loading from mysql.event. " @@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd) void Event_queue::init_mutexes() { - pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST); } @@ -743,13 +729,13 @@ Event_queue::init_mutexes() Destroys mutexes. SYNOPSIS - Event_queue::destroy_mutexes() + Event_queue::deinit_mutexes() */ void -Event_queue::destroy_mutexes() +Event_queue::deinit_mutexes() { - pthread_mutex_destroy(&singleton->LOCK_event_queue); + pthread_mutex_destroy(&LOCK_event_queue); } @@ -765,8 +751,8 @@ void Event_queue::on_queue_change() { DBUG_ENTER("Event_queue::on_queue_change"); - DBUG_PRINT("info", ("Sending COND_new_work")); - singleton->queue_changed(); + DBUG_PRINT("info", ("Signalling change of the queue")); + scheduler->queue_changed(); DBUG_VOID_RETURN; } @@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo) { int i= 0; bool ret= FALSE; - DBUG_ENTER("Event_scheduler::init"); + DBUG_ENTER("Event_queue::init"); DBUG_PRINT("enter", ("this=%p", this)); LOCK_QUEUE_DATA(); db_repository= db_repo; - /* init memory root */ - init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, event_timed_compare_q, NULL, 30 /*auto_extent*/)) @@ -824,8 +808,8 @@ Event_queue::deinit() DBUG_ENTER("Event_queue::deinit"); LOCK_QUEUE_DATA(); + empty_queue(); delete_queue(&queue); - free_root(&scheduler_root, MYF(0)); UNLOCK_QUEUE_DATA(); DBUG_VOID_RETURN; @@ -835,7 +819,7 @@ Event_queue::deinit() void Event_queue::recalculate_queue(THD *thd) { - int i; + uint i; for (i= 0; i < queue.elements; i++) { ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); @@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd) void Event_queue::empty_queue() { - int i; + uint i; /* empty the queue */ for (i= 0; i < events_count_no_lock(); ++i) { Event_timed *et= (Event_timed *) queue_element(&queue, i); - et->free_sp(); delete et; } resize_queue(&queue, 0); } + + +Event_timed* +Event_queue::get_top() +{ + return (Event_timed *)queue_top(&queue); +} + + +void +Event_queue::remove_top() +{ + queue_remove(&queue, 0);// 0 is top, internally 1 +} + + +void +Event_queue::top_changed() +{ + queue_replaced(&queue); +} + + +Event_timed * +Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, + struct timespec *abstime) +{ + struct timespec top_time; + Event_timed *et_new= NULL; + DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); + DBUG_PRINT("enter", ("thd=%p now=%d", thd, now)); + abstime->tv_nsec= 0; + LOCK_QUEUE_DATA(); + do { + int res; + Event_timed *et= NULL; + if (!queue.elements) + { + abstime->tv_sec= 0; + break; + } + int i; + DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); + for (i = 0; i < queue.elements; i++) + { + et= ((Event_timed*)queue_element(&queue, i)); + DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str)); + DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " + " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", + TIME_to_ulonglong_datetime(&et->execute_at), + TIME_to_ulonglong_datetime(&et->starts), + TIME_to_ulonglong_datetime(&et->ends), + et->expression, sec_since_epoch_TIME(&et->execute_at), now, + (int)(sec_since_epoch_TIME(&et->execute_at) - now), + sec_since_epoch_TIME(&et->execute_at) <= now)); + } + et= ((Event_timed*)queue_element(&queue, 0)); + top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at); + + if (top_time.tv_sec <= now) + { + DBUG_PRINT("info", ("Ready for execution")); + abstime->tv_sec= 0; + if ((res= db_repository->load_named_event(thd, et->dbname, et->name, + &et_new))) + { + DBUG_ASSERT(0); + break; + } + + et->mark_last_executed(thd); + if (et->compute_next_execution_time()) + et->status= Event_timed::DISABLED; + DBUG_PRINT("info", ("event's status is %d", et->status)); + + et->update_fields(thd); + if (((et->execute_at.year && !et->expression) || et->execute_at_null) || + (et->status == Event_timed::DISABLED)) + { + DBUG_PRINT("info", ("removing from the queue")); + if (et->dropped) + et->drop(thd); + delete et; + queue_remove(&queue, 0); + } + else + queue_replaced(&queue); + } + else + { + abstime->tv_sec= top_time.tv_sec; + DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, + abstime->tv_sec)); + } + } while (0); + UNLOCK_QUEUE_DATA(); + + DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new, + abstime->tv_sec)); + if (et_new) + DBUG_PRINT("info", ("db=%s name=%s definer=%s " + "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str, + et_new->definer.str, + TIME_to_ulonglong_datetime(&et_new->execute_at))); + DBUG_RETURN(et_new); +} diff --git a/sql/event_queue.h b/sql/event_queue.h index 8c11d7a2042..1335100be21 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -19,22 +19,23 @@ class sp_name; class Event_timed; class Event_db_repository; +class Event_job_data; class THD; typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*); -class Event_scheduler; +class Event_scheduler_ng; class Event_queue { public: Event_queue(); - static void + void init_mutexes(); - static void - destroy_mutexes(); + void + deinit_mutexes(); bool init(Event_db_repository *db_repo); @@ -76,6 +77,18 @@ public: void empty_queue(); + Event_timed * + get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime); + + Event_timed* + get_top(); + + void + remove_top(); + + void + top_changed(); + ///////////////protected Event_timed * find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); @@ -92,9 +105,6 @@ public: Event_db_repository *db_repository; - /* The MEM_ROOT of the object */ - MEM_ROOT scheduler_root; - /* The sorted queue with the Event_timed objects */ QUEUE queue; @@ -111,11 +121,11 @@ public: void unlock_data(const char *func, uint line); - static void + void on_queue_change(); + + Event_scheduler_ng *scheduler; protected: - /* Singleton instance */ - static Event_scheduler *singleton; }; diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index fb60ce8ae6d..3a9b988f92e 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -20,224 +20,9 @@ #include "event_scheduler.h" #include "event_db_repository.h" #include "sp_head.h" - -/* - ToDo: - 1. Talk to Alik to get a check for configure.in for my_time_t and time_t - 2. Look at guardian.h|cc to see its life cycle, has similarities. -*/ +#include "event_queue.h" -/* - The scheduler is implemented as class Event_scheduler. Only one instance is - kept during the runtime of the server, by implementing the Singleton DP. - Object instance is always there because the memory is allocated statically - and initialized when the OS loader loads mysqld. This initialization is - bare. Extended initialization is done during the call to - Event_scheduler::init() in Events::init(). The reason for that late initialization - is that some subsystems needed to boot the Scheduler are not available at - earlier stages of the mysqld boot procedure. Events::init() is called in - mysqld.cc . If the mysqld is started with --event-scheduler=0 then - no initialization takes place and the scheduler is unavailable during this - server run. The server should be started with --event-scheduler=1 to have - the scheduler initialized and able to execute jobs. This starting alwa - s implies that the jobs execution will start immediately. If the server - is started with --event-scheduler=2 then the scheduler is started in suspended - state. Default state, if --event-scheduler is not specified is 2. - - The scheduler only manages execution of the events. Their creation, - alteration and deletion is delegated to other routines found in event.cc . - These routines interact with the scheduler : - - CREATE EVENT -> Event_scheduler::create_event() - - ALTER EVENT -> Event_scheduler::update_event() - - DROP EVENT -> Event_scheduler::drop_event() - - There is one mutex in the single Event_scheduler object which controls - the simultaneous access to the objects invariants. Using one lock makes - it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is - initialized in Event_scheduler::init(). Which in turn is called by the - Facade class Events in event.cc, coming from init_thread_environment() from - mysqld.cc -> no concurrency at this point. It's destroyed in - Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc . - - The full initialization is done in Event_scheduler::init() called from - Events::init(). It's done before any requests coming in, so this is a - guarantee for not having concurrency. - - The scheduler is started with Event_scheduler::start() and stopped with - Event_scheduler::stop(). When the scheduler starts it loads all events - from mysql.event table. Unfortunately, there is a race condition between - the event disk management functions and the scheduler ones - (add/replace/drop_event & load_events_from_db()), because the operations - do not happen under one global lock but the disk operations are guarded - by the MYISAM lock on mysql.event. In the same time, the queue operations - are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during - server startup and stopped()-ed during server shutdown (in Events::shutdown() - called by kill_server() in mysqld.cc) these races does not exist. - - Since the user may want to temporarily inhibit execution of events the - scheduler can be suspended and then it can be forced to resume its - operations. The API call to perform these is - Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) . - When the scheduler is suspended the main scheduler thread, which ATM - happens to have thread_id 1, locks on a condition COND_suspend_or_resume. - When this is signal is sent for the reverse operation the main scheduler - loops continues to roll and execute events. - - When the scheduler is suspended all add/replace/drop_event() operations - work as expected and the modify the queue but no events execution takes - place. - - In contrast to the previous scheduler implementation, found in - event_executor.cc, the start, shutdown, suspend and resume are synchronous - operations. As a whole all operations are synchronized and no busy waits - are used except in stop_all_running_events(), which waits until all - running event worker threads have finished. It would have been nice to - use a conditional on which this method will wait and the last thread to - finish would signal it but this implies subclassing THD. - - The scheduler does not keep a counter of how many event worker threads are - running, at any specific moment, because this will copy functionality - already existing in the server. Namely, all THDs are registered in the - global `threads` array. THD has member variable system_thread which - identifies the type of thread. Connection threads being NON_SYSTEM_THREAD, - all other have their enum value. Important for the scheduler are - SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER. - - Class THD subclasses class ilink, which is the linked list of all threads. - When a THD instance is destroyed it's being removed from threads, thus - no manual intervention is needed. On the contrary registering is manual - with threads.append() . Traversing the threads array every time a subclass - of THD, for instance if we would have had THD_scheduler_worker to see - how many events we have and whether the scheduler is shutting down will - take much time and lead to a deadlock. stop_all_running_events() is called - under LOCK_scheduler_data. If the THD_scheduler_worker was aware of - the single Event_scheduler instance it will try to check - Event_scheduler::state but for this it would need to acquire - LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a - busy wait. - - DROP DATABASE DDL should drop all events defined in a specific schema. - DROP USER also should drop all events who has as definer the user being - dropped (this one is not addressed at the moment but a hook exists). For - this specific needs Event_scheduler::drop_matching_events() is - implemented. Which expects a callback to be applied on every object in - the queue. Thus events that match specific schema or user, will be - removed from the queue. The exposed interface is : - - Event_scheduler::drop_schema_events() - - Event_scheduler::drop_user_events() - - This bulk dropping happens under LOCK_scheduler_data, thus no two or - more threads can execute it in parallel. However, DROP DATABASE is also - synchronized, currently, in the server thus this does not impact the - overall performance. In addition, DROP DATABASE is not that often - executed DDL. - - Though the interface to the scheduler is only through the public methods - of class Event_scheduler, there are currently few functions which are - used during its operations. Namely : - - static evex_print_warnings() - After every event execution all errors/warnings are dumped, so the user - can see in case of a problem what the problem was. - - - static init_event_thread() - This function is both used by event_scheduler_thread() and - event_worker_thread(). It initializes the THD structure. The - initialization looks pretty similar to the one in slave.cc done for the - replication threads. However, though the similarities it cannot be - factored out to have one routine. - - - static event_scheduler_thread() - Because our way to register functions to be used by the threading library - does not allow usage of static methods this function is used to start the - scheduler in it. It does THD initialization and then calls - Event_scheduler::run(). - - - static event_worker_thread() - With already stated the reason for not being able to use methods, this - function executes the worker threads. - - The execution of events is, to some extent, synchronized to inhibit race - conditions when Event_timed::thread_id is being updated with the thread_id of - the THD in which the event is being executed. The thread_id is in the - Event_timed object because we need to be able to kill quickly a specific - event during ALTER/DROP EVENT without traversing the global `threads` array. - However, this makes the scheduler's code more complicated. The event worker - thread is started by Event_timed::spawn_now(), which in turn calls - pthread_create(). The thread_id which will be associated in init_event_thread - is not known in advance thus the registering takes place in - event_worker_thread(). This registering has to be synchronized under - LOCK_scheduler_data, so no kill_event() on a object in - replace_event/drop_event/drop_matching_events() could take place. - - This synchronization is done through class Worker_thread_param that is - local to this file. Event_scheduler::execute_top() is called under - LOCK_scheduler_data. This method : - 1. Creates an instance of Worker_thread_param on the stack - 2. Locks Worker_thread_param::LOCK_started - 3. Calls Event_timed::spawn_now() which in turn creates a new thread. - 4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the - worker thread send signal. The code is spurious wake-up safe because - Worker_thread_param::started is checked. - 5. The worker thread initializes its THD, then sets Event_timed::thread_id, - sets Worker_thread_param::started to TRUE and sends back - Worker_thread_param::COND_started. From this moment on, the event - is being executed and could be killed by using Event_timed::thread_id. - When Event_timed::spawn_thread_finish() is called in the worker thread, - it sets thread_id to 0. From this moment on, the worker thread should not - touch the Event_timed instance. - - - The life-cycle of the server is a FSA. - enum enum_state Event_scheduler::state keeps the state of the scheduler. - - The states are: - - |---UNINITIALIZED - | - | |------------------> IN_SHUTDOWN - --> INITIALIZED -> COMMENCING ---> RUNNING ----------| - ^ ^ | | ^ | - | |- CANTSTART <--| | |- SUSPENDED <-| - |______________________________| - - - UNINITIALIZED :The object is created and only the mutex is initialized - - INITIALIZED :All member variables are initialized - - COMMENCING :The scheduler is starting, no other attempt to start - should succeed before the state is back to INITIALIZED. - - CANTSTART :Set by the ::run() method in case it can't start for some - reason. In this case the connection thread that tries to - start the scheduler sees that some error has occurred and - returns an error to the user. Finally, the connection - thread sets the state to INITIALIZED, so further attempts - to start the scheduler could be made. - - RUNNING :The scheduler is running. New events could be added, - dropped, altered. The scheduler could be stopped. - - SUSPENDED :Like RUNNING but execution of events does not take place. - Operations on the memory queue are possible. - - IN_SHUTDOWN :The scheduler is shutting down, due to request by setting - the global event_scheduler to 0/FALSE, or because of a - KILL command sent by a user to the master thread. - - In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA() - are used for (un)locking purposes. They are used to save the programmer - from typing everytime - lock_data(__FUNCTION__, __LINE__); - All locking goes through Event_scheduler::lock_data() and ::unlock_data(). - These two functions then record in variables where for last time - LOCK_scheduler_data was locked and unlocked (two different variables). In - multithreaded environment, in some cases they make no sense but are useful for - inspecting deadlocks without having the server debug log turned on and the - server is still running. - - The same strategy is used for conditional variables. - Event_scheduler::cond_wait() is invoked from all places with parameter - an enum enum_cond_vars. In this manner, it's possible to inspect the last - on which condition the last call to cond_wait() was waiting. If the server - was started with debug trace switched on, the trace file also holds information - about conditional variables used. -*/ - #ifdef __GNUC__ #if __GNUC__ >= 2 #define SCHED_FUNC __FUNCTION__ @@ -250,6 +35,10 @@ #define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) +Event_scheduler* +Event_scheduler::singleton= NULL; + + #ifndef DBUG_OFF static LEX_STRING states_names[] = @@ -462,7 +251,7 @@ event_scheduler_thread(void *arg) thd->security_ctx->set_user((char*)"event_scheduler"); sql_print_information("SCHEDULER: Manager thread booting"); - if (Event_scheduler::check_system_tables(thd)) + if (Event_scheduler::get_instance()->event_queue->check_system_tables(thd)) scheduler->report_error_during_start(); else scheduler->run(thd); @@ -625,13 +414,13 @@ event_worker_thread(void *arg) Event_scheduler::Event_scheduler() { thread_id= 0; - mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0; - mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= ""; + mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0; + mutex_last_unlocked_in_func= mutex_last_locked_in_func= ""; cond_waiting_on= COND_NONE; mutex_scheduler_data_locked= FALSE; state= UNINITIALIZED; start_scheduler_suspended= FALSE; - LOCK_scheduler_data= &LOCK_event_queue; + LOCK_scheduler_data= &LOCK_data; } @@ -647,9 +436,10 @@ Event_scheduler::Event_scheduler() */ void -Event_scheduler::create_instance() +Event_scheduler::create_instance(Event_queue *queue) { singleton= new Event_scheduler(); + singleton->event_queue= queue; } /* @@ -689,8 +479,8 @@ Event_scheduler::init(Event_db_repository *db_repo) DBUG_ENTER("Event_scheduler::init"); DBUG_PRINT("enter", ("this=%p", this)); - Event_queue::init(db_repo); LOCK_SCHEDULER_DATA(); + init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); for (;i < COND_LAST; i++) if (pthread_cond_init(&cond_vars[i], NULL)) { @@ -720,7 +510,6 @@ void Event_scheduler::destroy() { DBUG_ENTER("Event_scheduler"); - Event_queue::deinit(); LOCK_SCHEDULER_DATA(); switch (state) { case UNINITIALIZED: @@ -879,7 +668,7 @@ Event_scheduler::run(THD *thd) DBUG_PRINT("enter", ("thd=%p", thd)); LOCK_SCHEDULER_DATA(); - ret= load_events_from_db(thd); + ret= event_queue->load_events_from_db(thd); if (!ret) { @@ -923,8 +712,9 @@ Event_scheduler::run(THD *thd) } DBUG_ASSERT(state == RUNNING); - et= (Event_timed *)queue_top(&queue); - +// et= (Event_timed *)queue_top(&event_queue->queue); + et= event_queue->get_top(); + /* Skip disabled events */ if (et->status != Event_timed::ENABLED) { @@ -935,7 +725,7 @@ Event_scheduler::run(THD *thd) sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue", et->dbname.length, et->dbname.str, et->name.length, et->name.str); - queue_remove(&queue, 0); + queue_remove(&event_queue->queue, 0); /* ToDo: check this again */ if (et->dropped) et->drop(thd); @@ -1095,16 +885,16 @@ Event_scheduler::execute_top(THD *thd, Event_timed *et) sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", et->dbname.str, et->name.str); if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) - queue_remove(&queue, 0);// 0 is top, internally 1 + event_queue->remove_top(); else - queue_replaced(&queue); + event_queue->top_changed(); break; default: DBUG_ASSERT(!spawn_ret_code); if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) - queue_remove(&queue, 0);// 0 is top, internally 1 + event_queue->remove_top(); else - queue_replaced(&queue); + event_queue->top_changed(); /* We don't lock LOCK_scheduler_data here because it's a pre-requisite for calling the current_method. @@ -1152,7 +942,7 @@ Event_scheduler::clean_memory(THD *thd) sql_print_information("SCHEDULER: Emptying the queue"); - empty_queue(); + event_queue->empty_queue(); DBUG_VOID_RETURN; } @@ -1432,7 +1222,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd) } if (was_suspended) { - recalculate_queue(thd); + event_queue->recalculate_queue(thd); /* This will implicitly unlock LOCK_scheduler_data */ thd->exit_cond(""); } @@ -1461,14 +1251,14 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) bool slept= FALSE; DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); DBUG_PRINT("enter", ("q.elements=%lu state=%s", - events_count_no_lock(), states_names[state])); + event_queue->events_count_no_lock(), states_names[state])); - if (!events_count_no_lock()) + if (!event_queue->events_count_no_lock()) thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data, "Empty queue, sleeping"); /* Wait in a loop protecting against catching spurious signals */ - while (!events_count_no_lock() && state == RUNNING) + while (!event_queue->events_count_no_lock() && state == RUNNING) { slept= TRUE; DBUG_PRINT("info", ("Entering condition because of empty queue")); @@ -1485,7 +1275,7 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) thd->exit_cond(""); DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", - events_count_no_lock(), states_names[state], thd->killed)); + event_queue->events_count_no_lock(), states_names[state], thd->killed)); DBUG_RETURN(slept); } @@ -1627,7 +1417,7 @@ Event_scheduler::dump_internal_status(THD *thd) /* queue.elements */ protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("queue.elements"), scs); - int_string.set((longlong) scheduler->events_count_no_lock(), scs); + int_string.set((longlong) scheduler->event_queue->events_count_no_lock(), scs); protocol->store(&int_string); ret= protocol->write(); @@ -1663,8 +1453,8 @@ Event_scheduler::lock_data(const char *func, uint line) DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", &LOCK_scheduler_data, func, line)); pthread_mutex_lock(LOCK_scheduler_data); - mutex_last_locked_in_func_name= func; - mutex_last_locked_at_line_nr= line; + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; mutex_scheduler_data_locked= TRUE; DBUG_VOID_RETURN; } @@ -1685,9 +1475,9 @@ Event_scheduler::unlock_data(const char *func, uint line) DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", LOCK_scheduler_data, func, line)); - mutex_last_unlocked_at_line_nr= line; + mutex_last_unlocked_at_line= line; mutex_scheduler_data_locked= FALSE; - mutex_last_unlocked_in_func_name= func; + mutex_last_unlocked_in_func= func; pthread_mutex_unlock(LOCK_scheduler_data); DBUG_VOID_RETURN; } @@ -1733,3 +1523,31 @@ Event_scheduler::queue_changed() pthread_cond_signal(&cond_vars[COND_new_work]); DBUG_VOID_RETURN; } + + +/* + Inits mutexes. + + SYNOPSIS + Event_scheduler::init_mutexes() +*/ + +void +Event_scheduler::init_mutexes() +{ + pthread_mutex_init(singleton->LOCK_scheduler_data, MY_MUTEX_INIT_FAST); +} + + +/* + Destroys mutexes. + + SYNOPSIS + Event_queue::destroy_mutexes() +*/ + +void +Event_scheduler::destroy_mutexes() +{ + pthread_mutex_destroy(singleton->LOCK_scheduler_data); +} diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index b4007d88976..7d02e98d4fe 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -19,6 +19,7 @@ class sp_name; class Event_timed; class Event_db_repository; +class Event_queue; class THD; @@ -31,7 +32,7 @@ events_shutdown(); #include "event_queue.h" #include "event_scheduler.h" -class Event_scheduler : public Event_queue +class Event_scheduler { public: enum enum_state @@ -56,7 +57,13 @@ public: static void - create_instance(); + create_instance(Event_queue *queue); + + static void + init_mutexes(); + + static void + destroy_mutexes(); /* Singleton access */ static Event_scheduler* @@ -122,6 +129,8 @@ public: void queue_changed(); + Event_queue *event_queue; + protected: uint @@ -147,9 +156,11 @@ protected: /* Singleton DP is used */ Event_scheduler(); - + pthread_mutex_t LOCK_data; pthread_mutex_t *LOCK_scheduler_data; - + + /* The MEM_ROOT of the object */ + MEM_ROOT scheduler_root; /* Set to start the scheduler in suspended state */ bool start_scheduler_suspended; @@ -172,18 +183,20 @@ protected: COND_LAST }; - uint mutex_last_locked_at_line_nr; - uint mutex_last_unlocked_at_line_nr; - const char* mutex_last_locked_in_func_name; - const char* mutex_last_unlocked_in_func_name; + uint mutex_last_locked_at_line; + uint mutex_last_unlocked_at_line; + const char* mutex_last_locked_in_func; + const char* mutex_last_unlocked_in_func; int cond_waiting_on; bool mutex_scheduler_data_locked; - static const char * const cond_vars_names[COND_LAST]; pthread_cond_t cond_vars[COND_LAST]; + /* Singleton instance */ + static Event_scheduler *singleton; + private: /* Prevent use of these */ Event_scheduler(const Event_scheduler &); diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc new file mode 100644 index 00000000000..9dc3bb26bc7 --- /dev/null +++ b/sql/event_scheduler_ng.cc @@ -0,0 +1,686 @@ +/* Copyright (C) 2004-2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" +#include "events.h" +#include "event_data_objects.h" +#include "event_scheduler_ng.h" +#include "event_queue.h" + +#ifdef __GNUC__ +#if __GNUC__ >= 2 +#define SCHED_FUNC __FUNCTION__ +#endif +#else +#define SCHED_FUNC "<unknown>" +#endif + +#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) +#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) + +extern pthread_attr_t connection_attrib; + +struct scheduler_param +{ + THD *thd; + Event_scheduler_ng *scheduler; +}; + +struct scheduler_param scheduler_param_value; + + + +static +LEX_STRING scheduler_states_names[] = +{ + { C_STRING_WITH_LEN("INITIALIZED")}, + { C_STRING_WITH_LEN("RUNNING")}, + { C_STRING_WITH_LEN("STOPPING")} +}; + + +class Worker_thread_param +{ +public: + Event_timed *et; + pthread_mutex_t LOCK_started; + pthread_cond_t COND_started; + bool started; + + Worker_thread_param(Event_timed *etn):et(etn), started(FALSE) + { + pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_started, NULL); + } + + ~Worker_thread_param() + { + pthread_mutex_destroy(&LOCK_started); + pthread_cond_destroy(&COND_started); + } +}; + + +/* + Prints the stack of infos, warnings, errors from thd to + the console so it can be fetched by the logs-into-tables and + checked later. + + SYNOPSIS + evex_print_warnings + thd - thread used during the execution of the event + et - the event itself +*/ + +static void +evex_print_warnings(THD *thd, Event_timed *et) +{ + MYSQL_ERROR *err; + DBUG_ENTER("evex_print_warnings"); + if (!thd->warn_list.elements) + DBUG_VOID_RETURN; + + char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; + char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; + String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); + prefix.length(0); + prefix.append("SCHEDULER: ["); + + append_identifier(thd, &prefix, et->definer.str, et->definer.length); + prefix.append("][", 2); + append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); + prefix.append('.'); + append_identifier(thd,&prefix, et->name.str, et->name.length); + prefix.append("] ", 2); + + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + while ((err= it++)) + { + String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); + /* set it to 0 or we start adding at the end. That's the trick ;) */ + err_msg.length(0); + err_msg.append(prefix); + err_msg.append(err->msg, strlen(err->msg), system_charset_info); + err_msg.append("]"); + DBUG_ASSERT(err->level < 3); + (sql_print_message_handlers[err->level])("%*s", err_msg.length(), + err_msg.c_ptr()); + } + DBUG_VOID_RETURN; +} + + +/* + Inits an scheduler thread handler, both the main and a worker + + SYNOPSIS + init_event_thread() + thd - the THD of the thread. Has to be allocated by the caller. + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + + RETURN VALUE + 0 OK + -1 Error +*/ + +static int +init_scheduler_thread(THD* thd) +{ + DBUG_ENTER("init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->master_access= 0; + thd->security_ctx->db_access= 0; + thd->security_ctx->host_or_ip= (char*)my_localhost; + thd->security_ctx->set_user((char*)"event_scheduler"); + my_net_init(&thd->net, 0); + thd->net.read_timeout= slave_net_timeout; + thd->slave_thread= 0; + thd->options|= OPTION_AUTO_IS_NULL; + thd->client_capabilities|= CLIENT_MULTI_RESULTS; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->thread_id= thread_id++; + threads.append(thd); + thread_count++; + thread_running++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + /* + Guarantees that we will see the thread in SHOW PROCESSLIST though its + vio is NULL. + */ + thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER; + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + + DBUG_RETURN(0); +} + + +pthread_handler_t +event_scheduler_ng_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd= (THD *)(*(struct scheduler_param *) arg).thd; + + thd->thread_stack= (char *)&thd; // remember where our stack is + DBUG_ENTER("event_scheduler_ng_thread"); + + my_thread_init(); + pthread_detach_this_thread(); + thd->real_id=pthread_self(); + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + goto end; + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + ((struct scheduler_param *) arg)->scheduler->run(thd); + +end: + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); + DBUG_PRINT("exit", ("Scheduler thread finishing")); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + + my_thread_end(); +} + + +/* + Function that executes an event in a child thread. Setups the + environment for the event execution and cleans after that. + + SYNOPSIS + event_worker_ng_thread() + arg The Event_timed object to be processed + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_worker_ng_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd; + Event_timed *event= (Event_timed *)arg; + int ret; + + thd= event->thd; + thd->thread_stack= (char *) &thd; + + DBUG_ENTER("event_worker_thread"); + DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str)); + + my_thread_init(); + pthread_detach_this_thread(); + thd->real_id=pthread_self(); + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + goto end; + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals)); +#endif + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id); + + thd->init_for_queries(); + thd->enable_slow_log= TRUE; + + ret= event->execute(thd, thd->mem_root); + + evex_print_warnings(thd, event); + + sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d", + event->dbname.str, event->name.str, + event->definer.str, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + event->dbname.str, event->name.str, + event->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) + sql_print_information("SCHEDULER: MICROSECOND is not supported"); + + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + +end: + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + /* + Free it here because net.vio is NULL for us => THD::~THD will check it + and won't call net_end(&net); See also replication code. + */ + net_end(&thd->net); + DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id)); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thread_count--; + thread_running--; + delete thd; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + delete event; + + my_thread_end(); +} + + +bool +Event_scheduler_ng::init(Event_queue *q) +{ + thread_id= 0; + state= INITIALIZED; + /* init memory root */ + + queue= q; + + return FALSE; +} + + +void +Event_scheduler_ng::deinit() +{ +} + + +void +Event_scheduler_ng::init_mutexes() +{ + pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_state, NULL); +} + + +void +Event_scheduler_ng::deinit_mutexes() +{ + pthread_mutex_destroy(&LOCK_scheduler_state); + pthread_cond_destroy(&COND_state); +} + + +bool +Event_scheduler_ng::start() +{ + THD *new_thd= NULL; + bool ret= FALSE; + pthread_t th; + DBUG_ENTER("Event_scheduler_ng::start"); + + LOCK_SCHEDULER_DATA(); + if (state > INITIALIZED) + goto end; + + if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) + { + sql_print_error("SCHEDULER: Cannot init manager event thread."); + ret= TRUE; + goto end; + } + + scheduler_param_value.thd= new_thd; + scheduler_param_value.scheduler= this; + + if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread, + (void*)&scheduler_param_value)) + { + DBUG_PRINT("error", ("cannot create a new thread")); + state= INITIALIZED; + ret= TRUE; + } + + state= RUNNING; +end: + UNLOCK_SCHEDULER_DATA(); + + if (ret && new_thd) + { + new_thd->proc_info= "Clearing"; + DBUG_ASSERT(new_thd->net.buff != 0); + net_end(&new_thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete new_thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + DBUG_RETURN(ret); +} + + +bool +Event_scheduler_ng::stop() +{ + THD *thd= current_thd; + DBUG_ENTER("Event_scheduler_ng::stop"); + DBUG_PRINT("enter", ("thd=%p", current_thd)); + + LOCK_SCHEDULER_DATA(); + if (state != RUNNING) + goto end; + + state= STOPPING; + + DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); + sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); + + pthread_cond_signal(&COND_state); + + /* Guarantee we don't catch spurious signals */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); + do { + DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " + "thread. Current value of state is %s . " + "workers count=%d", scheduler_states_names[state].str, + workers_count())); + /* thd could be 0x0, when shutting down */ + pthread_cond_wait(&COND_state, &LOCK_scheduler_state); + } while (state == STOPPING); + DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(FALSE); +} + + +bool +Event_scheduler_ng::run(THD *thd) +{ + struct timespec abstime; + Event_timed *job_data; + + LOCK_SCHEDULER_DATA(); + + thread_id= thd->thread_id; + sql_print_information("SCHEDULER: Manager thread started with id %lu", + thread_id); + while (state == RUNNING) + { + thd->end_time(); + /* Gets a minimized version */ + job_data= queue->get_top_for_execution_if_time(thd, thd->query_start(), + &abstime); + DBUG_PRINT("info", ("get_top returned job_data=%p now=%d abs_time.tv_sec=%d", + job_data, thd->query_start(), abstime.tv_sec)); + if (!job_data && !abstime.tv_sec) + { + thd->enter_cond(&COND_state, &LOCK_scheduler_state, + "Waiting on empty queue"); + pthread_cond_wait(&COND_state, &LOCK_scheduler_state); + thd->exit_cond(""); + DBUG_PRINT("info", ("Woke up. Got COND_state")); + LOCK_SCHEDULER_DATA(); + } + else if (abstime.tv_sec) + { + thd->enter_cond(&COND_state, &LOCK_scheduler_state, + "Waiting for next activation"); + pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, &abstime); + /* + If we get signal we should recalculate the whether it's the right time + because there could be : + 1. Spurious wake-up + 2. The top of the queue was changed (new one becase of create/update) + */ + /* This will do implicit UNLOCK_SCHEDULER_DATA() */ + thd->exit_cond(""); + DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); + LOCK_SCHEDULER_DATA(); + } + else + { + int res; + UNLOCK_SCHEDULER_DATA(); + res= execute_top(thd, job_data); + LOCK_SCHEDULER_DATA(); + if (res) + break; + } + DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); + } + DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); + pthread_cond_signal(&COND_state); +error: + state= INITIALIZED; + stop_all_running_events(thd); + UNLOCK_SCHEDULER_DATA(); + sql_print_information("SCHEDULER: Stopped"); + + return FALSE; +} + + +bool +Event_scheduler_ng::execute_top(THD *thd, Event_timed *job_data) +{ + THD *new_thd; + pthread_t th; + DBUG_ENTER("Event_scheduler_ng::execute_top"); + if (!(new_thd= new THD) || init_scheduler_thread(new_thd)) + goto error; + + /* Major failure */ + job_data->thd= new_thd; + DBUG_PRINT("info", ("Starting new thread for %s@%s", + job_data->dbname.str, job_data->name.str)); + if (pthread_create(&th, &connection_attrib, event_worker_ng_thread, job_data)) + goto error; + + DBUG_RETURN(FALSE); + +error: + if (new_thd) + { + new_thd->proc_info= "Clearing"; + DBUG_ASSERT(new_thd->net.buff != 0); + net_end(&new_thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete new_thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + DBUG_RETURN(TRUE); +} + + +enum Event_scheduler_ng::enum_state +Event_scheduler_ng::get_state() +{ + enum Event_scheduler_ng::enum_state ret; + LOCK_SCHEDULER_DATA(); + ret= state; + UNLOCK_SCHEDULER_DATA(); + return ret; +} + + +int +Event_scheduler_ng::dump_internal_status(THD *thd) +{ + return 1; + +} + + +uint +Event_scheduler_ng::workers_count() +{ + THD *tmp; + uint count= 0; + + DBUG_ENTER("Event_scheduler_ng::workers_count"); + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + ++count; + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + DBUG_PRINT("exit", ("%d", count)); + DBUG_RETURN(count); +} + + +/* + Stops all running events + + SYNOPSIS + Event_scheduler::stop_all_running_events() + thd Thread + + NOTE + LOCK_scheduler data must be acquired prior to call to this method +*/ + +void +Event_scheduler_ng::stop_all_running_events(THD *thd) +{ + CHARSET_INFO *scs= system_charset_info; + uint i; + DYNAMIC_ARRAY running_threads; + THD *tmp; + DBUG_ENTER("Event_scheduler::stop_all_running_events"); + DBUG_PRINT("enter", ("workers_count=%d", workers_count())); + + my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); + + bool had_super= FALSE; + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + push_dynamic(&running_threads, (gptr) &tmp->thread_id); + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + /* We need temporarily SUPER_ACL to be able to kill our offsprings */ + if (!(thd->security_ctx->master_access & SUPER_ACL)) + thd->security_ctx->master_access|= SUPER_ACL; + else + had_super= TRUE; + + char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + + for (i= 0; i < running_threads.elements; ++i) + { + int ret; + ulong thd_id= *dynamic_element(&running_threads, i, ulong*); + + int_string.set((longlong) thd_id,scs); + tmp_string.append(int_string); + if (i < running_threads.elements - 1) + tmp_string.append(' '); + + if ((ret= kill_one_thread(thd, thd_id, FALSE))) + { + sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); + break; + } + } + if (running_threads.elements) + sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); + + if (!had_super) + thd->security_ctx->master_access &= ~SUPER_ACL; + + delete_dynamic(&running_threads); + + sql_print_information("SCHEDULER: Waiting for worker threads to finish"); + + while (workers_count()) + my_sleep(100000); + + DBUG_VOID_RETURN; +} + + +/* + Signals the main scheduler thread that the queue has changed + its state. + + SYNOPSIS + Event_scheduler::queue_changed() +*/ + +void +Event_scheduler_ng::queue_changed() +{ + DBUG_ENTER("Event_scheduler::queue_changed"); + DBUG_PRINT("info", ("Sending COND_state")); + pthread_cond_signal(&COND_state); + DBUG_VOID_RETURN; +} + + +void +Event_scheduler_ng::lock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler_ng::lock_mutex"); + DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", + &LOCK_scheduler_state, func, line)); + pthread_mutex_lock(&LOCK_scheduler_state); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_scheduler_data_locked= TRUE; + DBUG_VOID_RETURN; +} + + +void +Event_scheduler_ng::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex"); + DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", + &LOCK_scheduler_state, func, line)); + mutex_last_unlocked_at_line= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + pthread_mutex_unlock(&LOCK_scheduler_state); + DBUG_VOID_RETURN; +} diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h new file mode 100644 index 00000000000..b250923d23e --- /dev/null +++ b/sql/event_scheduler_ng.h @@ -0,0 +1,121 @@ +#ifndef _EVENT_SCHEDULER_NG_H_ +#define _EVENT_SCHEDULER_NG_H_ +/* Copyright (C) 2004-2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +class Event_timed; +class Event_queue; + +class Event_scheduler_ng +{ +public: + Event_scheduler_ng(){} + ~Event_scheduler_ng(){} + + enum enum_state + { + INITIALIZED = 0, + RUNNING, + STOPPING + }; + + /* State changing methods follow */ + + bool + start(); + + bool + stop(); + + /* + Need to be public because has to be called from the function + passed to pthread_create. + */ + bool + run(THD *thd); + + bool + init(Event_queue *queue); + + void + deinit(); + + void + init_mutexes(); + + void + deinit_mutexes(); + + /* Information retrieving methods follow */ + + enum enum_state + get_state(); + + void + queue_changed(); + + static int + dump_internal_status(THD *thd); + +private: + uint + workers_count(); + + /* helper functions */ + bool + execute_top(THD *thd, Event_timed *job_data); + + void + stop_all_running_events(THD *thd); + + /* helper functions for working with mutexes & conditionals */ + void + lock_data(const char *func, uint line); + + void + unlock_data(const char *func, uint line); + + pthread_mutex_t LOCK_scheduler_state; + + /* This is the current status of the life-cycle of the scheduler. */ + enum enum_state state; + + /* + Holds the thread id of the executor thread or 0 if the scheduler is not + running. It is used by ::shutdown() to know which thread to kill with + kill_one_thread(). The latter wake ups a thread if it is waiting on a + conditional variable and sets thd->killed to non-zero. + */ + ulong thread_id; + + pthread_cond_t COND_state; + + Event_queue *queue; + Event_db_repository *db_repository; + + uint mutex_last_locked_at_line; + uint mutex_last_unlocked_at_line; + const char* mutex_last_locked_in_func; + const char* mutex_last_unlocked_in_func; + bool mutex_scheduler_data_locked; + +private: + /* Prevent use of these */ + Event_scheduler_ng(const Event_scheduler_ng &); + void operator=(Event_scheduler_ng &); +}; + +#endif /* _EVENT_SCHEDULER_NG_H_ */ diff --git a/sql/events.cc b/sql/events.cc index 09d5ee21a4f..e4b6de965f7 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -20,6 +20,7 @@ #include "event_scheduler.h" #include "event_db_repository.h" #include "sp_head.h" +#include "event_scheduler_ng.h" /* TODO list : @@ -293,9 +294,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options create_options & HA_LEX_CREATE_IF_NOT_EXISTS, rows_affected))) { - Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (scheduler->initialized() && - (ret= scheduler->create_event(thd, parse_data, true))) + if ((ret= event_queue->create_event(thd, parse_data, true))) my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } /* No need to close the table, it will be closed in sql_parse::do_command */ @@ -336,11 +335,9 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, */ if (!(ret= db_repository->update_event(thd, parse_data, new_name))) { - Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (scheduler->initialized() && - (ret= scheduler->update_event(thd, parse_data, - new_name? &new_name->m_db: NULL, - new_name? &new_name->m_name: NULL))) + if ((ret= event_queue->update_event(thd, parse_data, + new_name? &new_name->m_db: NULL, + new_name? &new_name->m_name: NULL))) my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } DBUG_RETURN(ret); @@ -373,8 +370,7 @@ Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists, if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, drop_if_exists, rows_affected))) { - Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (scheduler->initialized() && (ret= scheduler->drop_event(thd, name))) + if ((ret= event_queue->drop_event(thd, name))) my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } DBUG_RETURN(ret); @@ -476,8 +472,7 @@ Events::drop_schema_events(THD *thd, char *db) DBUG_ENTER("evex_drop_db_events"); DBUG_PRINT("enter", ("dropping events from %s", db)); - Event_scheduler *scheduler= Event_scheduler::get_instance(); - ret= scheduler->drop_schema_events(thd, db_lex); + ret= event_queue->drop_schema_events(thd, db_lex); ret= db_repository->drop_schema_events(thd, db_lex); DBUG_RETURN(ret); @@ -505,16 +500,18 @@ Events::init() Event_db_repository *db_repo; DBUG_ENTER("Events::init"); db_repository->init_repository(); + event_queue->init(db_repository); + event_queue->scheduler= scheduler_ng; + scheduler_ng->init(event_queue); /* it should be an assignment! */ if (opt_event_scheduler) { - Event_scheduler *scheduler= Event_scheduler::get_instance(); DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2); - DBUG_RETURN(scheduler->init(db_repository) || - (opt_event_scheduler == 1? scheduler->start(): - scheduler->start_suspended())); + if (opt_event_scheduler == 1) + DBUG_RETURN(scheduler_ng->start()); } + DBUG_RETURN(0); } @@ -534,13 +531,9 @@ Events::deinit() { DBUG_ENTER("Events::deinit"); - Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (scheduler->initialized()) - { - scheduler->stop(); - scheduler->destroy(); - } - + scheduler_ng->stop(); + scheduler_ng->deinit(); + event_queue->deinit(); db_repository->deinit_repository(); DBUG_VOID_RETURN; @@ -559,8 +552,12 @@ void Events::init_mutexes() { db_repository= new Event_db_repository; - Event_scheduler::create_instance(); - Event_scheduler::init_mutexes(); + + event_queue= new Event_queue; + event_queue->init_mutexes(); + + scheduler_ng= new Event_scheduler_ng(); + scheduler_ng->init_mutexes(); } @@ -574,9 +571,11 @@ Events::init_mutexes() void Events::destroy_mutexes() { - Event_scheduler::destroy_mutexes(); + event_queue->deinit_mutexes(); + scheduler_ng->deinit_mutexes(); + + delete scheduler_ng; delete db_repository; - db_repository= NULL; } @@ -595,7 +594,7 @@ Events::destroy_mutexes() int Events::dump_internal_status(THD *thd) { - return Event_scheduler::dump_internal_status(thd); + return Event_scheduler_ng::dump_internal_status(thd); } @@ -633,3 +632,26 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */) } DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db)); } + + +bool +Events::start_execution_of_events() +{ + DBUG_ENTER("Events::start_execution_of_events"); + DBUG_RETURN(scheduler_ng->start()); +} + + +bool +Events::stop_execution_of_events() +{ + DBUG_ENTER("Events::stop_execution_of_events"); + DBUG_RETURN(scheduler_ng->stop()); +} + +bool +Events::is_started() +{ + DBUG_ENTER("Events::is_started"); + DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING); +} diff --git a/sql/events.h b/sql/events.h index 1239cf58c7d..357312b44d1 100644 --- a/sql/events.h +++ b/sql/events.h @@ -19,6 +19,8 @@ class sp_name; class Event_parse_data; class Event_db_repository; +class Event_queue; +class Event_scheduler_ng; /* Return codes */ enum enum_events_error_code @@ -60,6 +62,15 @@ public: void destroy_mutexes(); + bool + start_execution_of_events(); + + bool + stop_execution_of_events(); + + bool + is_started(); + static Events* get_instance(); @@ -95,6 +106,8 @@ public: dump_internal_status(THD *thd); Event_db_repository *db_repository; + Event_queue *event_queue; + Event_scheduler_ng *scheduler_ng; private: /* Singleton DP is used */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index a27be384ee2..e0c0243b301 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -864,7 +864,7 @@ static void close_connections(void) DBUG_PRINT("quit",("Informing thread %ld that it's time to die", tmp->thread_id)); /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->slave_thread || tmp->system_thread == SYSTEM_THREAD_EVENT_SCHEDULER) + if (tmp->slave_thread) continue; tmp->killed= THD::KILL_CONNECTION; diff --git a/sql/set_var.cc b/sql/set_var.cc index 6c3606c9150..1f55d9ea3cf 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -58,6 +58,7 @@ #include <my_dir.h> #include "event_scheduler.h" +#include "events.h" /* WITH_BERKELEY_STORAGE_ENGINE */ extern bool berkeley_shared_data; @@ -3896,26 +3897,29 @@ sys_var_event_scheduler::update(THD *thd, set_var *var) Event_scheduler *scheduler= Event_scheduler::get_instance(); /* here start the thread if not running. */ DBUG_ENTER("sys_var_event_scheduler::update"); - - DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value)); - if (!scheduler->initialized()) + if (Events::opt_event_scheduler == 0) { my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--event-scheduler=0"); - DBUG_RETURN(true); + DBUG_RETURN(TRUE); } + DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value)); + if (var->save_result.ulonglong_value < 1 || var->save_result.ulonglong_value > 2) { char buf[64]; my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "event_scheduler", llstr(var->save_result.ulonglong_value, buf)); - DBUG_RETURN(true); + DBUG_RETURN(TRUE); } - if ((res= scheduler->suspend_or_resume(var->save_result.ulonglong_value == 1? - Event_scheduler::RESUME : - Event_scheduler::SUSPEND))) - my_error(ER_EVENT_SET_VAR_ERROR, MYF(0), (uint) res); + if (var->save_result.ulonglong_value == 1) + res= Events::get_instance()->start_execution_of_events(); + else + res= Events::get_instance()->stop_execution_of_events(); + + if (res) + my_error(ER_EVENT_SET_VAR_ERROR, MYF(0)); DBUG_RETURN((bool) res); } @@ -3925,9 +3929,9 @@ byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, { Event_scheduler *scheduler= Event_scheduler::get_instance(); - if (!scheduler->initialized()) + if (Events::opt_event_scheduler == 0) thd->sys_var_tmp.long_value= 0; - else if (scheduler->get_state() == Event_scheduler::RUNNING) + else if (Events::get_instance()->is_started()) thd->sys_var_tmp.long_value= 1; else thd->sys_var_tmp.long_value= 2; diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index ac4f2dd9237..40596d85f56 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -5831,7 +5831,7 @@ ER_DUP_ENTRY_AUTOINCREMENT_CASE ER_EVENT_MODIFY_QUEUE_ERROR eng "Internal scheduler error %d" ER_EVENT_SET_VAR_ERROR - eng "Error during starting/stopping of the scheduler. Error code %u" + eng "Error during starting/stopping of the scheduler." ER_PARTITION_MERGE_ERROR eng "%s handler cannot be used in partitioned tables" swe "%s kan inte användas i en partitionerad tabell" diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 86a9c380ee1..52bd8da3785 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -4177,7 +4177,7 @@ copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table) restore_record(sch_table, s->default_values); - if (et.load_from_row(thd->mem_root, event_table)) + if (et.load_from_row(event_table)) { my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0)); DBUG_RETURN(1); |