diff options
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r-- | sql/event_queue.cc | 234 |
1 files changed, 49 insertions, 185 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 39b237987e9..ba12b732726 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -73,35 +73,6 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b) } -pthread_handler_t -event_queue_loader_thread(void *arg) -{ - /* needs to be first for thread_stack */ - THD *thd= (THD *)((struct event_queue_param *) arg)->thd; - struct event_queue_param *param= (struct event_queue_param *) arg; - thd->thread_stack= (char *) &thd; - - if (post_init_event_thread(thd)) - goto end; - - DBUG_ENTER("event_queue_loader_thread"); - - - pthread_mutex_lock(¶m->LOCK_loaded); - param->queue->check_system_tables(thd); - param->queue->load_events_from_db(thd); - - param->loading_finished= TRUE; - pthread_cond_signal(¶m->COND_loaded); - - pthread_mutex_unlock(¶m->LOCK_loaded); - -end: - deinit_event_thread(thd); - DBUG_RETURN(0); // Against gcc warnings -} - - /* Constructor of class Event_queue. @@ -110,14 +81,12 @@ end: */ Event_queue::Event_queue() + :mutex_last_unlocked_at_line(0), mutex_last_locked_at_line(0), + mutex_last_attempted_lock_at_line(0), + mutex_queue_data_locked(FALSE), mutex_queue_data_attempting_lock(FALSE) { - mutex_last_unlocked_at_line= mutex_last_locked_at_line= - mutex_last_attempted_lock_at_line= 0; - mutex_last_unlocked_in_func= mutex_last_locked_in_func= mutex_last_attempted_lock_in_func= ""; - - mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE; } @@ -150,7 +119,12 @@ Event_queue::deinit_mutexes() /* - Inits the queue + This is a queue's constructor. Until this method is called, the + queue is unusable. We don't use a C++ constructor instead in + order to be able to check the return value. The queue is + initialized once at server startup. Initialization can fail in + case of a failure reading events from the database or out of + memory. SYNOPSIS Event_queue::init() @@ -161,9 +135,9 @@ Event_queue::deinit_mutexes() */ bool -Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) +Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, + Event_scheduler *sched) { - THD *new_thd; pthread_t th; bool res; struct event_queue_param *event_queue_param_value= NULL; @@ -186,43 +160,16 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) if (sizeof(my_time_t) != sizeof(time_t)) { sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." - "The scheduler may not work correctly. Stopping."); + "The scheduler may not work correctly. Stopping"); DBUG_ASSERT(0); goto err; } - if (!(new_thd= new THD)) - goto err; - - pre_init_event_thread(new_thd); - new_thd->security_ctx->set_user((char*)"event_scheduler_loader"); - - event_queue_param_value= (struct event_queue_param *) - my_malloc(sizeof(struct event_queue_param), MYF(0)); - - event_queue_param_value->thd= new_thd; - event_queue_param_value->queue= this; - event_queue_param_value->loading_finished= FALSE; - pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST); - pthread_cond_init(&event_queue_param_value->COND_loaded, NULL); - - pthread_mutex_lock(&event_queue_param_value->LOCK_loaded); - DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); - if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread, - (void*)event_queue_param_value))) - { - do { - pthread_cond_wait(&event_queue_param_value->COND_loaded, - &event_queue_param_value->LOCK_loaded); - } while (event_queue_param_value->loading_finished == FALSE); - } - - pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded); - pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded); - pthread_cond_destroy(&event_queue_param_value->COND_loaded); - my_free((char *)event_queue_param_value, MYF(0)); - + res= load_events_from_db(thd); UNLOCK_QUEUE_DATA(); + if (res) + deinit_queue(); + DBUG_RETURN(res); err: @@ -316,16 +263,15 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name) { int res; - Event_queue_element *old_element= NULL, - *new_element; + Event_queue_element *new_element; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str)); new_element= new Event_queue_element(); - res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname, - new_name? *new_name:name, new_element); + res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname, + new_name ? *new_name:name, new_element); if (res) { delete new_element; @@ -345,16 +291,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, new_element->compute_next_execution_time(); LOCK_QUEUE_DATA(); - if (!(old_element= find_n_remove_event(dbname, name))) - { - DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED", - dbname.str, name.str)); - } + find_n_remove_event(dbname, name); + /* If not disabled event */ if (new_element) { - DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx", - new_element, old_element)); + DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); } dbug_dump_queue(thd->query_start()); @@ -363,8 +305,6 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, if (new_element) notify_observers(); - if (old_element) - delete old_element; end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); @@ -385,19 +325,13 @@ void Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) { int res; - Event_queue_element *element; DBUG_ENTER("Event_queue::drop_event"); DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name)); LOCK_QUEUE_DATA(); - element= find_n_remove_event(dbname, name); + find_n_remove_event(dbname, name); dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); - - if (element) - delete element; - else - DBUG_PRINT("info", ("No such event found, probably DISABLED")); /* We don't signal here because the scheduler will catch the change @@ -429,10 +363,10 @@ void Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, bool (*comparator)(LEX_STRING, Event_basic *)) { + uint i= 0; DBUG_ENTER("Event_queue::drop_matching_events"); DBUG_PRINT("enter", ("pattern=%s", pattern.str)); - uint i= 0; while (i < queue.elements) { Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i); @@ -440,10 +374,10 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, if (comparator(pattern, et)) { /* - The queue is ordered. If we remove an element, then all elements after - it will shift one position to the left, if we imagine it as an array - from left to the right. In this case we should not increment the - counter and the (i < queue.elements) condition is ok. + The queue is ordered. If we remove an element, then all elements + after it will shift one position to the left, if we imagine it as + an array from left to the right. In this case we should not + increment the counter and the (i < queue.elements) condition is ok. */ queue_remove(&queue, i); delete et; @@ -453,12 +387,12 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, } /* We don't call notify_observers() . If we remove the top event: - 1. The queue is empty. The scheduler will wake up at some time and realize - that the queue is empty. If create_event() comes inbetween it will - signal the scheduler - 2. The queue is not empty, but the next event after the previous top, won't - be executed any time sooner than the element we removed. Hence, we may - not notify the scheduler and it will realize the change when it + 1. The queue is empty. The scheduler will wake up at some time and + realize that the queue is empty. If create_event() comes inbetween + it will signal the scheduler + 2. The queue is not empty, but the next event after the previous top, + won't be executed any time sooner than the element we removed. Hence, + we may not notify the scheduler and it will realize the change when it wakes up from timedwait. */ @@ -472,11 +406,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, SYNOPSIS Event_queue::drop_schema_events() - thd THD - db The schema name - - RETURN VALUE - >=0 Number of dropped events + thd HD + schema The schema name */ void @@ -516,16 +447,12 @@ Event_queue::notify_observers() db The schema of the event to find name The event to find - RETURN VALUE - NULL Not found - otherwise Address - NOTE The caller should do the locking also the caller is responsible for actual signalling in case an event is removed from the queue. */ -Event_queue_element * +void Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) { uint i; @@ -539,11 +466,12 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) if (event_basic_identifier_equal(db, name, et)) { queue_remove(&queue, i); - DBUG_RETURN(et); + delete et; + break; } } - DBUG_RETURN(NULL); + DBUG_VOID_RETURN; } @@ -583,7 +511,7 @@ Event_queue::load_events_from_db(THD *thd) if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) { - sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open"); DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); } @@ -625,14 +553,17 @@ Event_queue::load_events_from_db(THD *thd) temp_job_data.load_from_row(table); - /* We load only on scheduler root just to check whether the body compiles */ + /* + We load only on scheduler root just to check whether the body + compiles. + */ switch (ret= temp_job_data.compile(thd, thd->mem_root)) { case EVEX_MICROSECOND_UNSUP: sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " "supported but found in mysql.event"); break; case EVEX_COMPILE_ERROR: - sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load", et->dbname.str, et->name.str); break; default: @@ -663,12 +594,10 @@ end: else { ret= 0; - sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); + sql_print_information("SCHEDULER: Loaded %d event%s", count, + (count == 1)?"":"s"); } - /* Force close to free memory */ - thd->version--; - close_thread_tables(thd); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); @@ -677,71 +606,6 @@ end: /* - Opens mysql.db and mysql.user and checks whether: - 1. mysql.db has column Event_priv at column 20 (0 based); - 2. mysql.user has column Event_priv at column 29 (0 based); - - SYNOPSIS - Event_queue::check_system_tables() - thd Thread - - RETURN VALUE - FALSE OK - TRUE Error -*/ - -void -Event_queue::check_system_tables(THD *thd) -{ - TABLE_LIST tables; - bool not_used; - Open_tables_state backup; - bool ret; - - DBUG_ENTER("Event_queue::check_system_tables"); - DBUG_PRINT("enter", ("thd=0x%lx", thd)); - - thd->reset_n_backup_open_tables_state(&backup); - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "db"; - tables.lock_type= TL_READ; - - if ((ret= simple_open_n_lock_tables(thd, &tables))) - { - sql_print_error("Cannot open mysql.db"); - goto end; - } - ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, - mysql_db_table_fields, &mysql_db_table_last_check, - ER_CANNOT_LOAD_FROM_TABLE); - close_thread_tables(thd); - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "user"; - tables.lock_type= TL_READ; - - if (simple_open_n_lock_tables(thd, &tables)) - sql_print_error("Cannot open mysql.db"); - else - { - if (tables.table->s->fields < 29 || - strncmp(tables.table->field[29]->field_name, - STRING_WITH_LEN("Event_priv"))) - sql_print_error("mysql.user has no `Event_priv` column at position 29"); - close_thread_tables(thd); - } - -end: - thd->restore_backup_open_tables_state(&backup); - - DBUG_VOID_RETURN; -} - - -/* Recalculates activation times in the queue. There is one reason for that. Because the values (execute_at) by which the queue is ordered are changed by calls to compute_next_execution_time() on a request from the @@ -782,7 +646,7 @@ Event_queue::recalculate_activation_times(THD *thd) Event_queue::empty_queue() NOTE - Should be called with LOCK_event_queue locked + Should be called with LOCK_event_queue locked */ void |