diff options
author | unknown <andrey@lmy004.> | 2006-02-16 00:43:11 +0100 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-02-16 00:43:11 +0100 |
commit | fea4742db5b5f5531b0e2d30ccee7883f54b0e80 (patch) | |
tree | 7ade9988cce658b1efc550ee4228f4bef4a4d6d8 | |
parent | 7088b39da895cf22a2b71d21a8313cbf0dda3760 (diff) | |
download | mariadb-git-fea4742db5b5f5531b0e2d30ccee7883f54b0e80.tar.gz |
fix for bug#16406 (Events: DROP DATABASE doesn't automatically drop events)
WL#1034
- This changeset also changes the executor so its quite more stable now.
Stressing test case added that executes ~800 events per second and dropping
hundreds of events at once using DROP DATABASE.
(with fixes after review of JimW)
(with fixes after review of Serg)
mysql-test/r/events.result:
update results after TRIGGER_ACL was added
mysql-test/t/events.test:
-redundant line
sql/event.cc:
Implemented evex_db_drop_events() which drops all events
from a specific database. Needed for SQLCOM_DROP_DATABASE
sql/event.h:
- protect the event better (see the changes to event_executor.cc
and event.cc). An event object could be used in a spawned thread
before it's executed but till now the object is marked as being
executed when the anonymous sp_head is executed. However, there are
timeframes before and after that during which the event is not marked
as executed and other thread may delete the object -> so we end with
a nirvana pointer.
sql/event_executor.cc:
- extract some of the code executed in the main thread to a function. Too long
functions are bad for the overview.
- prepend all information/error messages to the console with "SCHEDULER:" for
better overview, and easied searching in the log tables.
sql/event_priv.h:
- change the name, of evex_db_find_event_by_name() and don't
used C++ features like function overloading
- define consts for result returned from event_timed::spawn_now()
sql/event_timed.cc:
- add few methods related to event execution.
now the event spawns the worker thread and
passes itself as parameter. This way it locks itself for exectution
first and then spawning -> no race condition. When the worker thread
has finished working with the reference it calls back
event_timed::spawn_thread_finish() to unlock itself.
sql/sql_db.cc:
- call evex_drop_db_events() on DROP DATABASE
-rw-r--r-- | mysql-test/r/events.result | 2 | ||||
-rw-r--r-- | mysql-test/r/events_stress.result | 46 | ||||
-rw-r--r-- | mysql-test/t/events.test | 1 | ||||
-rw-r--r-- | mysql-test/t/events_stress.test | 80 | ||||
-rw-r--r-- | sql/event.cc | 231 | ||||
-rw-r--r-- | sql/event.h | 54 | ||||
-rw-r--r-- | sql/event_executor.cc | 277 | ||||
-rw-r--r-- | sql/event_priv.h | 12 | ||||
-rw-r--r-- | sql/event_timed.cc | 140 | ||||
-rw-r--r-- | sql/sql_db.cc | 2 |
10 files changed, 710 insertions, 135 deletions
diff --git a/mysql-test/r/events.result b/mysql-test/r/events.result index 41f944ab089..4c7b9faec05 100644 --- a/mysql-test/r/events.result +++ b/mysql-test/r/events.result @@ -64,7 +64,7 @@ SHOW GRANTS; Grants for ev_test@localhost GRANT USAGE ON *.* TO 'ev_test'@'localhost' GRANT ALL PRIVILEGES ON `events_test`.* TO 'ev_test'@'localhost' -GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, REFERENCES, INDEX, ALTER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE ON `events_test2`.* TO 'ev_test'@'localhost' +GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, REFERENCES, INDEX, ALTER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, TRIGGER ON `events_test2`.* TO 'ev_test'@'localhost' "Here comes an error:"; SHOW EVENTS; ERROR 42000: Access denied for user 'ev_test'@'localhost' to database 'events_test2' diff --git a/mysql-test/r/events_stress.result b/mysql-test/r/events_stress.result new file mode 100644 index 00000000000..9f95cfad75d --- /dev/null +++ b/mysql-test/r/events_stress.result @@ -0,0 +1,46 @@ +CREATE DATABASE IF NOT EXISTS events_test; +CREATE DATABASE events_test2; +USE events_test2; +CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +USE events_test; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +3 +DROP DATABASE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +0 +"Now testing stability - dropping db -> events while they are running" +CREATE DATABASE events_test2; +USE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +1000 +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +0 +CREATE DATABASE events_test3; +USE events_test3; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; +COUNT(*) +950 +CREATE DATABASE events_test4; +USE events_test4; +CREATE DATABASE events_test2; +USE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +COUNT(*) +1050 +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +DROP DATABASE events_test3; +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test4; +SET GLOBAL event_scheduler=1; +USE events_test; +DROP DATABASE events_test; diff --git a/mysql-test/t/events.test b/mysql-test/t/events.test index be24d490393..dff7f48f4d5 100644 --- a/mysql-test/t/events.test +++ b/mysql-test/t/events.test @@ -109,7 +109,6 @@ drop event one_event; - create event e_26 on schedule at '2017-01-01 00:00:00' disable do set @a = 5; select db, name, body, definer, convert_tz(execute_at, 'UTC', 'SYSTEM'), on_completion from mysql.event; drop event e_26; diff --git a/mysql-test/t/events_stress.test b/mysql-test/t/events_stress.test new file mode 100644 index 00000000000..f6eed79425c --- /dev/null +++ b/mysql-test/t/events_stress.test @@ -0,0 +1,80 @@ +CREATE DATABASE IF NOT EXISTS events_test; +# +# DROP DATABASE test start (bug #16406) +# +CREATE DATABASE events_test2; +USE events_test2; +CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; +USE events_test; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +DROP DATABASE events_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; + +--echo "Now testing stability - dropping db -> events while they are running" +CREATE DATABASE events_test2; +USE events_test2; +--disable_query_log +let $1= 1000; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SET GLOBAL event_scheduler=1; +--sleep 4 +DROP DATABASE events_test2; + +SET GLOBAL event_scheduler=0; +--sleep 2 +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +CREATE DATABASE events_test3; +USE events_test3; +--disable_query_log +let $1= 950; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; +--sleep 3 +CREATE DATABASE events_test4; +USE events_test4; +--disable_query_log +let $1= 860; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log + + +CREATE DATABASE events_test2; +USE events_test2; +--disable_query_log +let $1= 1050; +while ($1) +{ + eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + dec $1; +} +--enable_query_log +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +--sleep 6 +DROP DATABASE events_test2; +SET GLOBAL event_scheduler=0; +DROP DATABASE events_test3; +SET GLOBAL event_scheduler=1; +DROP DATABASE events_test4; +SET GLOBAL event_scheduler=1; +USE events_test; +# +# DROP DATABASE test end (bug #16406) +# +DROP DATABASE events_test; diff --git a/sql/event.cc b/sql/event.cc index abca622835a..063a4ef9333 100644 --- a/sql/event.cc +++ b/sql/event.cc @@ -165,12 +165,35 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) } + /* Find row in open mysql.event table representing event SYNOPSIS evex_db_find_event_aux() thd Thread context + et evet_timed object containing dbname, name & definer + table TABLE object for open mysql.event table. + + RETURN VALUE + 0 - Routine found + EVEX_KEY_NOT_FOUND - No routine with given name +*/ + +inline int +evex_db_find_event_aux(THD *thd, event_timed *et, TABLE *table) +{ + return evex_db_find_event_by_name(thd, et->dbname, et->name, + et->definer, table); +} + + +/* + Find row in open mysql.event table representing event + + SYNOPSIS + evex_db_find_event_by_name() + thd Thread context dbname Name of event's database rname Name of the event inside the db table TABLE object for open mysql.event table. @@ -181,13 +204,13 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) */ int -evex_db_find_event_aux(THD *thd, const LEX_STRING dbname, - const LEX_STRING ev_name, - const LEX_STRING user_name, - TABLE *table) +evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, + const LEX_STRING ev_name, + const LEX_STRING user_name, + TABLE *table) { byte key[MAX_KEY_LENGTH]; - DBUG_ENTER("evex_db_find_event_aux"); + DBUG_ENTER("evex_db_find_event_by_name"); DBUG_PRINT("enter", ("name: %.*s", ev_name.length, ev_name.str)); /* @@ -373,7 +396,7 @@ db_create_event(THD *thd, event_timed *et, my_bool create_if_not, } DBUG_PRINT("info", ("check existance of an event with the same name")); - if (!evex_db_find_event_aux(thd, et->dbname, et->name, et->definer, table)) + if (!evex_db_find_event_aux(thd, et, table)) { if (create_if_not) { @@ -511,7 +534,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name) goto err; } - if (!evex_db_find_event_aux(thd, new_name->m_db, new_name->m_name, + if (!evex_db_find_event_by_name(thd, new_name->m_db, new_name->m_name, et->definer, table)) { my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), new_name->m_name.str); @@ -524,8 +547,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name) overwrite the key and SE will tell us that it cannot find the already found row (copied into record[1] later */ - if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et->dbname, et->name, - et->definer, table)) + if (EVEX_KEY_NOT_FOUND == evex_db_find_event_aux(thd, et, table)) { my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), et->name.str); goto err; @@ -603,8 +625,8 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING definer, event_timed **ett, goto done; } - if ((ret= evex_db_find_event_aux(thd, name->m_db, name->m_name, definer, - table))) + if ((ret= evex_db_find_event_by_name(thd, name->m_db, name->m_name, definer, + table))) { my_error(ER_EVENT_DOES_NOT_EXIST, MYF(0), name->m_name.str); goto done; @@ -727,7 +749,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock, if (!sortcmp_lex_string(*name, et->name, system_charset_info) && !sortcmp_lex_string(*db, et->dbname, system_charset_info)) { - if (!et->is_running()) + if (et->can_spawn_now()) { DBUG_PRINT("evex_remove_from_cache", ("not running - free and delete")); et->free_sp(); @@ -887,7 +909,7 @@ evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists, goto done; } - if (!(ret= evex_db_find_event_aux(thd, et->dbname,et->name,et->definer,table))) + if (!(ret= evex_db_find_event_aux(thd, et, table))) { if ((ret= table->file->ha_delete_row(table->record[0]))) { @@ -923,3 +945,186 @@ done: DBUG_RETURN(ret); } + +/* + evex_drop_db_events - Drops all events in the selected database + + thd - Thread + db - ASCIIZ the name of the database + + Returns: + 0 - OK + 1 - Failed to delete a specific row + 2 - Got NULL while reading db name from a row + + Note: + The algo is the following + 1. Go through the in-memory cache, if the scheduler is working + and for every event whose dbname matches the database we drop + check whether is currently in execution: + - event_timed::can_spawn() returns true -> the event is not + being executed in a child thread. The reason not to use + event_timed::is_running() is that the latter shows only if + it is being executed, which is 99% of the time in the thread + but there are some initiliazations before and after the + anonymous SP is being called. So if we delete in this moment + -=> *boom*, so we have to check whether the thread has been + spawned and can_spawn() is the right method. + - event_timed::can_spawn() returns false -> being runned ATM + just set the flags so it should drop itself. + +*/ + +int +evex_drop_db_events(THD *thd, char *db) +{ + TABLE *table; + READ_RECORD read_record_info; + MYSQL_LOCK *lock; + int ret= 0; + int i; + LEX_STRING db_lex= {db, strlen(db)}; + + DBUG_ENTER("evex_drop_db_events"); + DBUG_PRINT("info",("dropping events from %s", db)); + + + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + + if ((ret= evex_open_event_table(thd, TL_WRITE, &table))) + { + sql_print_error("Table mysql.event is damaged."); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_RETURN(SP_OPEN_TABLE_FAILED); + } + + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + VOID(pthread_mutex_lock(&LOCK_evex_running)); + if (!evex_is_running) + goto skip_memory; + + for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) + { + event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); + if (sortcmp_lex_string(et->dbname, db_lex, system_charset_info)) + continue; + + if (et->can_spawn_now_n_lock(thd)) + { + DBUG_PRINT("info",("event %s not running - direct delete", et->name.str)); + if (!(ret= evex_db_find_event_aux(thd, et, table))) + { + DBUG_PRINT("info",("event %s found on disk", et->name.str)); + if ((ret= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error while deleting a row - dropping " + "a database. Skipping the rest."); + my_error(ER_EVENT_DROP_FAILED, MYF(0), et->name.str); + goto end; + } + DBUG_PRINT("info",("deleted event [%s] num [%d]. Time to free mem", + et->name.str, i)); + } + else if (ret == EVEX_KEY_NOT_FOUND) + { + sql_print_error("Expected to find event %s.%s of %s on disk-not there.", + et->dbname.str, et->name.str, et->definer.str); + } + et->free_sp(); + delete et; + et= 0; + /* no need to call et->spawn_unlock because we already cleaned et */ + } + else + { + DBUG_PRINT("info",("event %s is running. setting exec_no_more and dropped", + et->name.str)); + et->flags|= EVENT_EXEC_NO_MORE; + et->dropped= TRUE; + } + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + evex_queue_delete_element(&EVEX_EQ_NAME, i);// 1 is top + DBUG_PRINT("info",("%d elements in the queue", + evex_queue_num_elements(EVEX_EQ_NAME))); + /* + decrease so we start at the same position, there will be + less elements in the queue, it will still be ordered so on + next iteration it will be again i the current element or if + no more we finish. + */ + --i; + } + +skip_memory: + /* + The reasoning behind having two loops is the following: + If there was only one loop, the table-scan, then for every element which + matches, the queue in memory has to be searched to remove the element. + While if we go first over the queue and remove what's in there we have only + one pass over it and after finishing it, moving to table-scan for the disabled + events. This needs quite less time and means quite less locking on + LOCK_event_arrays. + */ + DBUG_PRINT("info",("Mem-cache checked, now going to db for disabled events")); + /* only enabled events are in memory, so we go now and delete the rest */ + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info)) && !ret) + { + char *et_db; + + if ((et_db= get_field(thd->mem_root, table->field[EVEX_FIELD_DB])) == NULL) + { + ret= 2; + break; + } + + LEX_STRING et_db_lex= {et_db, strlen(et_db)}; + if (!sortcmp_lex_string(et_db_lex, db_lex, system_charset_info)) + { + event_timed ett; + char *ptr; + + if ((ptr= get_field(thd->mem_root, table->field[EVEX_FIELD_STATUS])) + == NullS) + { + sql_print_error("Error while loading from mysql.event. " + "Table probably corrupted"); + goto end; + } + /* + When not running nothing is in memory so we have to clean + everything. + We don't delete EVENT_ENABLED events when the scheduler is running + because maybe this is an event which we asked to drop itself when + it is finished and it hasn't finished yet, so we don't touch it. + It will drop itself. The not running ENABLED events has been already + deleted from ha_delete_row() above in the loop over the QUEUE + (in case the executor is running). + 'D' stands for DISABLED, 'E' for ENABLED - it's an enum + */ + if ((evex_is_running && ptr[0] == 'D') || !evex_is_running) + { + DBUG_PRINT("info", ("Dropping %s.%s", et_db, ett.name.str)); + if ((ret= table->file->ha_delete_row(table->record[0]))) + { + my_error(ER_EVENT_DROP_FAILED, MYF(0), ett.name.str); + goto end; + } + } + } + } + DBUG_PRINT("info",("Disk checked for disabled events. Finishing.")); + +end: + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + end_read_record(&read_record_info); + + thd->version--; // Force close to free memory + + close_thread_tables(thd); + + DBUG_RETURN(ret); +} diff --git a/sql/event.h b/sql/event.h index 1fe5c8e5713..f9dea2d85e9 100644 --- a/sql/event.h +++ b/sql/event.h @@ -79,6 +79,8 @@ class event_timed { event_timed(const event_timed &); /* Prevent use of these */ void operator=(event_timed &); + my_bool in_spawned_thread; + ulong locked_by_thread_id; my_bool running; pthread_mutex_t LOCK_running; @@ -116,9 +118,10 @@ public: bool free_sphead_on_delete; uint flags;//all kind of purposes - event_timed():running(0), status_changed(false), last_executed_changed(false), - expression(0), created(0), modified(0), - on_completion(MYSQL_EVENT_ON_COMPLETION_DROP), + event_timed():in_spawned_thread(0),locked_by_thread_id(0), + running(0), status_changed(false), + last_executed_changed(false), expression(0), created(0), + modified(0), on_completion(MYSQL_EVENT_ON_COMPLETION_DROP), status(MYSQL_EVENT_ENABLED), sphead(0), dropped(false), free_sphead_on_delete(true), flags(0) @@ -197,8 +200,45 @@ public: return ret; } + + /* + Checks whether the object is being used in a spawned thread. + This method is for very basic checking. Use ::can_spawn_now_n_lock() + for most of the cases. + */ + + my_bool + can_spawn_now() + { + my_bool ret; + VOID(pthread_mutex_lock(&this->LOCK_running)); + ret= !in_spawned_thread; + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; + } + + /* + Checks whether this thread can lock the object for modification -> + preventing being spawned for execution, and locks if possible. + use ::can_spawn_now() only for basic checking because a race + condition may occur between the check and eventual modification (deletion) + of the object. + */ + + my_bool + can_spawn_now_n_lock(THD *thd); + + int + spawn_unlock(THD *thd); + + int + spawn_now(void * (*thread_func)(void*)); - void free_sp() + void + spawn_thread_finish(THD *thd); + + void + free_sp() { delete sphead; sphead= 0; @@ -221,7 +261,11 @@ evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists, int evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); -int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); +int +sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); + +int +evex_drop_db_events(THD *thd, char *db); int init_events(); diff --git a/sql/event_executor.cc b/sql/event_executor.cc index 7960f1e1758..7bcc8882fca 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -18,6 +18,11 @@ #include "event.h" #include "sp.h" +#define WAIT_STATUS_READY 0 +#define WAIT_STATUS_EMPTY_QUEUE 1 +#define WAIT_STATUS_NEW_TOP_EVENT 2 +#define WAIT_STATUS_STOP_EXECUTOR 3 + /* Make this define DBUG_FAULTY_THR to be able to put breakpoints inside @@ -165,18 +170,97 @@ init_event_thread(THD* thd) DBUG_RETURN(0); } + +/* + This function waits till the time next event in the queue should be + executed. + + Returns + WAIT_STATUS_READY There is an event to be executed right now + WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped. + WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled + on top. Restart ticking. + WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0; +*/ + +static int +executor_wait_till_next_event_exec(THD *thd) +{ + event_timed *et; + TIME time_now; + int t2sleep; + + DBUG_ENTER("executor_wait_till_next_event_exec"); + /* + now let's see how much time to sleep, we know there is at least 1 + element in the queue. + */ + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + DBUG_RETURN(1); + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + DBUG_ASSERT(et); + if (et->status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); + if (et->dropped) + et->drop(thd); + delete et; + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + sql_print_information("Event found disabled, dropping."); + DBUG_RETURN(1); + } + + DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); + // set the internal clock of thd + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); + t2sleep= evex_time_diff(&et->execute_at, &time_now); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + + DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); + if (t2sleep > 0) + { + /* + We sleep t2sleep seconds but we check every second whether this thread + has been killed, or there is a new candidate + */ + while (t2sleep-- && !thd->killed && event_executor_running_global_var && + evex_queue_num_elements(EVEX_EQ_NAME) && + (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) + { + DBUG_PRINT("evex main thread",("will sleep a bit more")); + my_sleep(1000000); + } + } + + int ret= 0; + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + ret= 1; + else if (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) != et) + ret= 2; + if (thd->killed && event_executor_running_global_var) + ret= 3; + + DBUG_RETURN(ret); +} + + pthread_handler_t event_executor_main(void *arg) { THD *thd; /* needs to be first for thread_stack */ - ulonglong iter_num= 0; uint i=0, j=0; my_ulonglong cnt= 0; + TIME time_now; DBUG_ENTER("event_executor_main"); DBUG_PRINT("event_executor_main", ("EVEX thread started")); - // init memory root init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); @@ -186,23 +270,24 @@ event_executor_main(void *arg) if (sizeof(my_time_t) != sizeof(time_t)) { - sql_print_error("sizeof(my_time_t) != sizeof(time_t) ." + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." "The scheduler will not work correctly. Stopping."); + DBUG_ASSERT(0); goto err_no_thd; } //TODO Andrey: Check for NULL if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create THD for event_executor_main"); + sql_print_error("SCHEDULER: Cannot create THD for the main thread."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is pthread_detach_this_thread(); - + if (init_event_thread(thd)) - goto err; + goto finish; // make this thread invisible it has no vio -> show processlist won't see thd->system_thread= 1; @@ -213,7 +298,7 @@ event_executor_main(void *arg) thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - DBUG_PRINT("EVEX main thread", ("Initing events_queuey")); + DBUG_PRINT("EVEX main thread", ("Initing events_queue")); /* eventually manifest that we are running, not to crashe because of @@ -229,15 +314,14 @@ event_executor_main(void *arg) thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); if (evex_load_events_from_db(thd)) - goto err; + goto finish; evex_main_thread_id= thd->thread_id; - sql_print_information("Scheduler thread started"); + sql_print_information("SCHEDULER: Main thread started"); while (!thd->killed) { TIME time_now; - my_time_t now; event_timed *et; cnt++; @@ -246,7 +330,7 @@ event_executor_main(void *arg) thd->proc_info = "Sleeping"; if (!event_executor_running_global_var) { - sql_print_information("Scheduler asked to stop."); + sql_print_information("SCHEDULER: Asked to stop."); break; } @@ -255,62 +339,30 @@ event_executor_main(void *arg) my_sleep(1000000);// sleep 1s continue; } - - { - int t2sleep; - /* - now let's see how much time to sleep, we know there is at least 1 - element in the queue. - */ - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - continue; - } - et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); - if (et->status == MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); - if (et->dropped) - et->drop(thd); - delete et; - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - sql_print_information("Event found disabled, dropping."); - continue; - } - - DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); - time((time_t *)&now); - my_tz_UTC->gmt_sec_to_TIME(&time_now, now); - t2sleep= evex_time_diff(&et->execute_at, &time_now); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - - DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); - if (t2sleep > 0) - { - /* - We sleep t2sleep seconds but we check every second whether this thread - has been killed, or there is a new candidate - */ - while (t2sleep-- && !thd->killed && event_executor_running_global_var && - evex_queue_num_elements(EVEX_EQ_NAME) && - (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) - { - DBUG_PRINT("evex main thread",("will sleep a bit more")); - my_sleep(1000000); - } - } - if (!event_executor_running_global_var) - { - sql_print_information("Scheduler asked to stop."); - break; - } + +restart_ticking: + switch (executor_wait_till_next_event_exec(thd)) { + case WAIT_STATUS_READY: // time to execute the event on top + DBUG_PRINT("evex main thread",("time to execute an event")); + break; + case WAIT_STATUS_EMPTY_QUEUE: // no more events + DBUG_PRINT("evex main thread",("no more events")); + continue; + break; + case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue + DBUG_PRINT("evex main thread",("restart ticking")); + goto restart_ticking; + case WAIT_STATUS_STOP_EXECUTOR: + sql_print_information("SCHEDULER: Asked to stop."); + goto finish; + break; + default: + DBUG_ASSERT(0); } - VOID(pthread_mutex_lock(&LOCK_event_arrays)); + thd->end_time(); + my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); if (!evex_queue_num_elements(EVEX_EQ_NAME)) { @@ -332,14 +384,13 @@ event_executor_main(void *arg) DBUG_PRINT("evex main thread",("it's right time")); if (et->status == MYSQL_EVENT_ENABLED) { - pthread_t th; - + int fork_ret_code; DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str, TIME_to_ulonglong_datetime(&et->execute_at))); et->mark_last_executed(thd); if (et->compute_next_execution_time()) { - sql_print_error("Error while computing time of %s.%s . " + sql_print_error("SCHEDULER: Error while computing time of %s.%s . " "Disabling after execution.", et->dbname.str, et->name.str); et->status= MYSQL_EVENT_DISABLED; @@ -348,13 +399,23 @@ event_executor_main(void *arg) TIME_to_ulonglong_datetime(&et->execute_at))); et->update_fields(thd); - ++iter_num; - DBUG_PRINT("info", (" Spawning a thread %d", iter_num)); #ifndef DBUG_FAULTY_THR - if (pthread_create(&th, NULL, event_executor_worker, (void*)et)) - { - sql_print_error("Problem while trying to create a thread"); - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); + thread_safe_increment(workers_count, &LOCK_workers_count); + switch ((fork_ret_code= et->spawn_now(event_executor_worker))) { + case EVENT_EXEC_CANT_FORK: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_error("SCHEDULER: Problem while trying to create a thread"); + UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish); + case EVENT_EXEC_ALREADY_EXEC: + thread_safe_decrement(workers_count, &LOCK_workers_count); + sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", + et->dbname.str, et->name.str); + break; + default: + DBUG_ASSERT(!fork_ret_code); + if (fork_ret_code) + thread_safe_decrement(workers_count, &LOCK_workers_count); + break; } #else event_executor_worker((void *) et); @@ -364,22 +425,21 @@ event_executor_main(void *arg) et->flags |= EVENT_EXEC_NO_MORE; if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) - evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1 else evex_queue_first_updated(&EVEX_EQ_NAME); } DBUG_PRINT("evex main thread",("unlocking")); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); }// while +finish: -err: // First manifest that this thread does not work and then destroy VOID(pthread_mutex_lock(&LOCK_evex_running)); evex_is_running= false; evex_main_thread_id= 0; VOID(pthread_mutex_unlock(&LOCK_evex_running)); - sql_print_information("Event scheduler stopping. Waiting for worker threads to finish."); /* TODO: A better will be with a conditional variable @@ -388,21 +448,33 @@ err: Read workers_count without lock, no need for locking. In the worst case we have to wait 1sec more. */ - while (workers_count) - my_sleep(1000000);// 1s + sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish."); + while (1) + { + VOID(pthread_mutex_lock(&LOCK_workers_count)); + if (!workers_count) + { + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + break; + } + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + my_sleep(1000000);// 1s + } /* - LEX_STRINGs reside in the memory root and will be destroyed with it. - Hence no need of delete but only freeing of SP + First we free all objects ... + Lock because a DROP DATABASE could be running in parallel and it locks on these */ - // First we free all objects ... + sql_print_information("SCHEDULER: Emptying the queue."); + VOID(pthread_mutex_lock(&LOCK_event_arrays)); for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) { event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); et->free_sp(); delete et; } - // ... then we can thras the whole queue at once + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + // ... then we can thrash the whole queue at once evex_queue_destroy(&EVEX_EQ_NAME); thd->proc_info = "Clearing"; @@ -426,7 +498,7 @@ err_no_thd: VOID(pthread_mutex_unlock(&LOCK_evex_running)); free_root(&evex_mem_root, MYF(0)); - sql_print_information("Event scheduler stopped."); + sql_print_information("SCHEDULER: Stopped."); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -444,9 +516,6 @@ event_executor_worker(void *event_void) MEM_ROOT worker_mem_root; DBUG_ENTER("event_executor_worker"); - VOID(pthread_mutex_lock(&LOCK_workers_count)); - ++workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); @@ -455,7 +524,7 @@ event_executor_worker(void *event_void) if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! { - sql_print_error("Cannot create a THD structure in a scheduler worker thread"); + sql_print_error("SCHEDULER: Cannot create a THD structure in an worker."); goto err_no_thd; } thd->thread_stack = (char*)&thd; // remember where our stack is @@ -495,30 +564,23 @@ event_executor_worker(void *event_void) int ret; DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]", event->dbname.str, event->name.str,(int) event->expression)); - sql_print_information(" EVEX EXECUTING event %s.%s [EXPR:%d]", + sql_print_information("SCHEDULER: Executing event %s.%s [EXPR:%d]", event->dbname.str, event->name.str,(int) event->expression); ret= event->execute(thd, &worker_mem_root); - sql_print_information(" EVEX EXECUTED event %s.%s [EXPR:%d]. RetCode=%d", + sql_print_information("SCHEDULER: Executed event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str, (int) event->expression, ret); if (ret == EVEX_COMPILE_ERROR) - sql_print_information(" EVEX COMPILE ERROR for event %s.%s", + sql_print_information("SCHEDULER:COMPILE ERROR for event %s.%s", event->dbname.str, event->name.str); DBUG_PRINT("info", (" EVEX EXECUTED event %s.%s [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str, (int) event->expression, ret)); } - if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("event_executor_worker", - ("%s exec no more. to drop=%d",event->name.str, event->dropped)); - if (event->dropped) - event->drop(thd); - delete event; - } + event->spawn_thread_finish(thd); thd->db= 0; @@ -548,10 +610,7 @@ err: err_no_thd: free_root(&worker_mem_root, MYF(0)); - - VOID(pthread_mutex_lock(&LOCK_workers_count)); - --workers_count; - VOID(pthread_mutex_unlock(&LOCK_workers_count)); + thread_safe_decrement(workers_count, &LOCK_workers_count); #ifndef DBUG_FAULTY_THR my_thread_end(); @@ -574,7 +633,7 @@ evex_load_events_from_db(THD *thd) if ((ret= evex_open_event_table(thd, TL_READ, &table))) { - sql_print_error("Table mysql.event is damaged."); + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); DBUG_RETURN(SP_OPEN_TABLE_FAILED); } @@ -594,7 +653,7 @@ evex_load_events_from_db(THD *thd) if ((ret= et->load_from_row(&evex_mem_root, table))) { - sql_print_error("Error while loading from mysql.event. " + sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); goto end; } @@ -606,11 +665,11 @@ evex_load_events_from_db(THD *thd) } DBUG_PRINT("evex_load_events_from_db", - ("Event %s loaded from row. Time to compile", et->name.str)); + ("Event %s loaded from row. Time to compile", et->name.str)); if ((ret= et->compile(thd, &evex_mem_root))) { - sql_print_error("Error while compiling %s.%s. Aborting load.", + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", et->dbname.str, et->name.str); goto end; } @@ -618,8 +677,8 @@ evex_load_events_from_db(THD *thd) // let's find when to be executed if (et->compute_next_execution_time()) { - sql_print_error("Error while computing execution time of %s.%s. Skipping", - et->dbname.str, et->name.str); + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); continue; } @@ -640,7 +699,7 @@ end: thd->version--; // Force close to free memory close_thread_tables(thd); - sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s"); + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); diff --git a/sql/event_priv.h b/sql/event_priv.h index 7d1cdbcd264..7bcb26aaed0 100644 --- a/sql/event_priv.h +++ b/sql/event_priv.h @@ -19,6 +19,10 @@ #include "mysql_priv.h" +#define EVENT_EXEC_STARTED 0 +#define EVENT_EXEC_ALREADY_EXEC 1 +#define EVENT_EXEC_CANT_FORK 2 + #define EVEX_USE_QUEUE #define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \ @@ -32,10 +36,10 @@ int my_time_compare(TIME *a, TIME *b); int -evex_db_find_event_aux(THD *thd, const LEX_STRING dbname, - const LEX_STRING rname, - const LEX_STRING definer, - TABLE *table); +evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, + const LEX_STRING ev_name, + const LEX_STRING user_name, + TABLE *table); int event_timed_compare_q(void *vptr, byte* a, byte *b); diff --git a/sql/event_timed.cc b/sql/event_timed.cc index 28d21089b74..19edaf345dd 100644 --- a/sql/event_timed.cc +++ b/sql/event_timed.cc @@ -879,11 +879,12 @@ event_timed::drop(THD *thd) TABLE *table; int ret= 0; DBUG_ENTER("event_timed::drop"); + DBUG_PRINT("info",("%s.%s", dbname.str, name.str)); if (evex_open_event_table(thd, TL_WRITE, &table)) DBUG_RETURN(-1); - if (evex_db_find_event_aux(thd, dbname, name, definer, table)) + if (evex_db_find_event_by_name(thd, dbname, name, definer, table)) DBUG_RETURN(-2); if ((ret= table->file->ha_delete_row(table->record[0]))) @@ -919,7 +920,7 @@ event_timed::update_fields(THD *thd) } - if ((ret= evex_db_find_event_aux(thd, dbname, name, definer, table))) + if ((ret= evex_db_find_event_by_name(thd, dbname, name, definer, table))) goto done; store_record(table,record[1]); @@ -1059,6 +1060,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) MEM_ROOT *tmp_mem_root= 0; LEX *old_lex= thd->lex, lex; char *old_db; + int old_db_length; event_timed *ett; sp_name *spn; char *old_query; @@ -1088,7 +1090,9 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root) old_query_len= thd->query_length; old_query= thd->query; old_db= thd->db; + old_db_length= thd->db_length; thd->db= dbname.str; + thd->db_length= dbname.length; thd->query= get_show_create_event(thd, &thd->query_length); DBUG_PRINT("event_timed::compile", ("query:%s",thd->query)); @@ -1148,3 +1152,135 @@ done: DBUG_RETURN(ret); } + +/* + Checks whether this thread can lock the object for modification -> + preventing being spawned for execution, and locks if possible. + use ::can_spawn_now() only for basic checking because a race + condition may occur between the check and eventual modification (deletion) + of the object. + + Returns + true - locked + false - cannot lock +*/ + +my_bool +event_timed::can_spawn_now_n_lock(THD *thd) +{ + my_bool ret= FALSE; + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + in_spawned_thread= TRUE; + ret= TRUE; + locked_by_thread_id= thd->thread_id; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; +} + + +extern pthread_attr_t connection_attrib; + +/* + Checks whether is possible and forks a thread. Passes self as argument. + + Returns + EVENT_EXEC_STARTED - OK + EVENT_EXEC_ALREADY_EXEC - Thread not forked, already working + EVENT_EXEC_CANT_FORK - Unable to spawn thread (error) +*/ + +int +event_timed::spawn_now(void * (*thread_func)(void*)) +{ + int ret= EVENT_EXEC_STARTED; + static uint exec_num= 0; + DBUG_ENTER("event_timed::spawn_now"); + DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str)); + + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + pthread_t th; + in_spawned_thread= true; + if (pthread_create(&th, &connection_attrib, thread_func, (void*)this)) + { + DBUG_PRINT("info", ("problem while spawning thread")); + ret= EVENT_EXEC_CANT_FORK; + in_spawned_thread= false; + } +#ifndef DBUG_OFF + else + { + sql_print_information("SCHEDULER: Started thread %d", ++exec_num); + DBUG_PRINT("info", ("thread spawned")); + } +#endif + } + else + { + DBUG_PRINT("info", ("already in spawned thread. skipping")); + ret= EVENT_EXEC_ALREADY_EXEC; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + + DBUG_RETURN(ret); +} + + +void +event_timed::spawn_thread_finish(THD *thd) +{ + DBUG_ENTER("event_timed::spawn_thread_finish"); + VOID(pthread_mutex_lock(&this->LOCK_running)); + in_spawned_thread= false; + if ((flags & EVENT_EXEC_NO_MORE) || status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("info", ("%s exec no more. to drop=%d", name.str, dropped)); + if (dropped) + drop(thd); + VOID(pthread_mutex_unlock(&this->LOCK_running)); + delete this; + DBUG_VOID_RETURN; + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + DBUG_VOID_RETURN; +} + + +/* + Unlocks the object after it has been locked with ::can_spawn_now_n_lock() + + Returns + 0 - ok + 1 - not locked by this thread + +*/ + + +int +event_timed::spawn_unlock(THD *thd) +{ + int ret= 0; + VOID(pthread_mutex_lock(&this->LOCK_running)); + if (!in_spawned_thread) + { + if (locked_by_thread_id == thd->thread_id) + { + in_spawned_thread= FALSE; + locked_by_thread_id= 0; + } + else + { + sql_print_error("A thread tries to unlock when he hasn't locked. " + "thread_id=%ld locked by %ld", + thd->thread_id, locked_by_thread_id); + DBUG_ASSERT(0); + ret= 1; + } + } + VOID(pthread_mutex_unlock(&this->LOCK_running)); + return ret; +} diff --git a/sql/sql_db.cc b/sql/sql_db.cc index 5ffa4fd76ed..2b026783ca5 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -20,6 +20,7 @@ #include "mysql_priv.h" #include <mysys_err.h> #include "sp.h" +#include "event.h" #include <my_dir.h> #include <m_ctype.h> #ifdef __WIN__ @@ -748,6 +749,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) exit: (void)sp_drop_db_routines(thd, db); /* QQ Ignore errors for now */ + (void)evex_drop_db_events(thd, db); /* QQ Ignore errors for now */ start_waiting_global_read_lock(thd); /* If this database was the client's selected database, we silently change the |