diff options
author | unknown <andrey@lmy004.> | 2006-07-13 10:59:58 +0200 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-07-13 10:59:58 +0200 |
commit | 31caa8c433ace692ea5f31c2c2ae0d872533e8de (patch) | |
tree | fb7407480fd7ae1b764e1bf59ec33047559bf6dd /sql/event_queue.cc | |
parent | 628be8a71611bc86f7f0cf809b27d63bdd9b12c8 (diff) | |
download | mariadb-git-31caa8c433ace692ea5f31c2c2ae0d872533e8de.tar.gz |
WL #3337 (Events new architecture)
Final stroke, events should be loaded from disk on server startup.
Also check the validity of their bodies if possible during loading.
sql/event_data_objects.cc:
Remove Event_job_data::free_sp(), move the code to the destructor
Change the way we change the security context
Steal some code from sql_parse.cc
sql/event_data_objects.h:
Remove free_sp()
Make compile() public, to be used when booting for verifying the integrity of mysql.event
sql/event_queue.cc:
Make the queue load events from disk on server boot.
Compile and thus check for integrity the events.
sql/event_queue.h:
shift methods around. add queue_loaded boolean.
sql/event_scheduler.cc:
Rename init_event_thread() to pre_init_event_thread()
and make it more generic.
Add post_init_event_thread()
Export these two as well as deinit_event_thread().
Now it is quite easy to write code to spawn a new event thread
whenever needed.
sql/event_scheduler.h:
export pre_init_event_thread(), post_init_event_thread() and deinit_event_thread()
to simplify writing of thread functions.
sql/events.cc:
Events::init() returns only one error code, then make it bool
sql/events.h:
Events::init() returns only one error code, then make it bool
sql/mysqld.cc:
Check the return code of Events::init()
sql/sp_head.cc:
Add trace info
sql/sql_class.cc:
Reorganize thd::change_security_context() to load main_security_ctx
sql/sql_class.h:
Reorganize thd::change_security_context() to load main_security_ctx
sql/sql_lex.cc:
Initialize lex->spname
sql/sql_yacc.yy:
Add a comment
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 144 |
1 files changed, 110 insertions, 34 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 12eceee8cfb..69114c53118 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -35,6 +35,14 @@ #define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__) #define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__) +struct event_queue_param +{ + THD *thd; + Event_queue *queue; + pthread_mutex_t LOCK_loaded; + pthread_cond_t COND_loaded; +}; + /* Compares the execute_at members of two Event_queue_element instances. @@ -64,6 +72,31 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b) } +pthread_handler_t +event_queue_loader_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd= (THD *)((struct event_queue_param *) arg)->thd; + struct event_queue_param *param= (struct event_queue_param *) arg; + thd->thread_stack= (char *) &thd; + + if (post_init_event_thread(thd)) + goto end; + + DBUG_ENTER("event_queue_loader_thread"); + + pthread_mutex_lock(¶m->LOCK_loaded); + param->queue->load_events_from_db(thd); + pthread_cond_signal(¶m->COND_loaded); + pthread_mutex_unlock(¶m->LOCK_loaded); + +end: + deinit_event_thread(thd); + + DBUG_RETURN(0); // Against gcc warnings +} + + /* Constructor of class Event_queue. @@ -80,6 +113,8 @@ Event_queue::Event_queue() mutex_last_attempted_lock_in_func= ""; mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE; + + queue_loaded= FALSE; } @@ -125,8 +160,11 @@ Event_queue::deinit_mutexes() bool Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) { - int i= 0; - bool ret= FALSE; + THD *new_thd; + pthread_t th; + bool res; + struct event_queue_param *event_queue_param_value= NULL; + DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this=0x%lx", this)); @@ -139,8 +177,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) NULL, EVENT_QUEUE_EXTENT)) { sql_print_error("SCHEDULER: Can't initialize the execution queue"); - ret= TRUE; - goto end; + goto err; } if (sizeof(my_time_t) != sizeof(time_t)) @@ -148,13 +185,43 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." "The scheduler may not work correctly. Stopping."); DBUG_ASSERT(0); - ret= TRUE; - goto end; + goto err; } -end: + if (!(new_thd= new THD)) + goto err; + + pre_init_event_thread(new_thd); + + event_queue_param_value= (struct event_queue_param *) + my_malloc(sizeof(struct event_queue_param), MYF(0)); + event_queue_param_value->thd= new_thd; + event_queue_param_value->queue= this; + pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST); + pthread_cond_init(&event_queue_param_value->COND_loaded, NULL); + + pthread_mutex_lock(&event_queue_param_value->LOCK_loaded); + DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); + if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread, + (void*)event_queue_param_value))) + { + do { + pthread_cond_wait(&event_queue_param_value->COND_loaded, + &event_queue_param_value->LOCK_loaded); + } while (queue_loaded == FALSE); + } + + pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded); + pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded); + pthread_cond_destroy(&event_queue_param_value->COND_loaded); + my_free((char *)event_queue_param_value, MYF(0)); + UNLOCK_QUEUE_DATA(); - DBUG_RETURN(ret); + DBUG_RETURN(res); + +err: + UNLOCK_QUEUE_DATA(); + DBUG_RETURN(TRUE); } @@ -498,7 +565,7 @@ Event_queue::load_events_from_db(THD *thd) READ_RECORD read_record_info; int ret= -1; uint count= 0; - bool clean_the_queue= FALSE; + bool clean_the_queue= TRUE; /* Compile the events on this root but only for syntax check, then discard */ MEM_ROOT boot_root; @@ -518,14 +585,12 @@ Event_queue::load_events_from_db(THD *thd) if (!(et= new Event_queue_element)) { DBUG_PRINT("info", ("Out of memory")); - clean_the_queue= TRUE; break; } DBUG_PRINT("info", ("Loading event from row.")); if ((ret= et->load_from_row(table))) { - clean_the_queue= TRUE; sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); break; @@ -536,27 +601,6 @@ Event_queue::load_events_from_db(THD *thd) delete et; continue; } -#if 0 - init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); - - /* We load only on scheduler root just to check whether the body compiles */ - switch (ret= et->compile(thd, &boot_root)) { - case EVEX_MICROSECOND_UNSUP: - et->free_sp(); - sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " - "supported but found in mysql.event"); - goto end; - case EVEX_COMPILE_ERROR: - sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", - et->dbname.str, et->name.str); - goto end; - default: - /* Free it, it will be compiled again on the worker thread */ - et->free_sp(); - break; - } - free_root(&boot_root, MYF(0)); /* let's find when to be executed */ if (et->compute_next_execution_time()) @@ -565,11 +609,40 @@ Event_queue::load_events_from_db(THD *thd) " Skipping", et->dbname.str, et->name.str); continue; } -#endif + + { + Event_job_data temp_job_data; + DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); + + temp_job_data.load_from_row(table); + + /* We load only on scheduler root just to check whether the body compiles */ + switch (ret= temp_job_data.compile(thd, thd->mem_root)) { + case EVEX_MICROSECOND_UNSUP: + sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " + "supported but found in mysql.event"); + break; + case EVEX_COMPILE_ERROR: + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", + et->dbname.str, et->name.str); + break; + default: + break; + } + thd->end_statement(); + thd->cleanup_after_query(); + } + if (ret) + { + delete et; + goto end; + } + DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list.")); queue_insert_safe(&queue, (byte *) et); count++; } + clean_the_queue= FALSE; end: end_read_record(&read_record_info); @@ -585,10 +658,12 @@ end: } /* Force close to free memory */ - thd->version--; + thd->version--; close_thread_tables(thd); + queue_loaded= TRUE; + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); } @@ -713,6 +788,7 @@ Event_queue::empty_queue() uint i; DBUG_ENTER("Event_queue::empty_queue"); DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); + sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements); /* empty the queue */ for (i= 0; i < queue.elements; ++i) { |