diff options
author | unknown <andrey@fifo.vaih.whnetz> | 2006-08-31 17:18:39 +0200 |
---|---|---|
committer | unknown <andrey@fifo.vaih.whnetz> | 2006-08-31 17:18:39 +0200 |
commit | 0410cc3a0c87582fe9d749f5e007dcdb1b9848e1 (patch) | |
tree | 7578f63a766b0ce331cbafad99ef748c185822f1 /sql/event_scheduler.cc | |
parent | 3e4b79d9cafea4d1e86ddba6c8f5658f4c4ef23c (diff) | |
download | mariadb-git-0410cc3a0c87582fe9d749f5e007dcdb1b9848e1.tar.gz |
WL#3337 (Event scheduler new architecture)
This patch makes the relationship between Event_scheduler and Event_queue
unidirectional from the former to the latter.
The change is that the conditional on which the scheduler sleeped has been
moved to the Event_queue and the latter does not call anymore
Event_scheduler::queue_changed(), which in turn has be removed.
sql/event_queue.cc:
Remove dependency of Event_queue on Event_scheduler but not vice versa.
Event_scheduler polls whether there is time to execute an event.
Removed notify_observers() as the way of calling has changed.
Added Event_queue::cond_wait() similar to Event_scheduler::cond_wait().
sql/event_queue.h:
init_queue() does not need anymore Event_scheduler object because
the relationship is now one-way. Event_scheduler knows about Event_queue
but not vice versa.
get_top_execution_if_time() does by itself the waiting instead of
returning abstime. This simplifies the code in Event_scheduler::run()
get_top_execution_if_time() returns only if job_data != NULL or if
the scheduler thread was killed.
notify_observers() is no more used and therefore removed.
Added Event_queue::cond_wait() because now there is waiting on a
conditional variable in Event_queue too (like in Event_scheduler for
::stop()).
sql/event_scheduler.cc:
Change the relationship between Event_scheduler & Event_queue.
Event_queue does not know anymore about Event_scheduler. When
the scheduler calls get_top_element_if_time() it may fall asleep
on a conditional of Event_queue, if either the queue is empty or
it's still not time for activation. When the method returns it
will return a non-null address, namely an object to be executed.
If the return value is NULL, the thread was killed by a call to
Event_scheduler::stop() (we assert this).
sql/event_scheduler.h:
Remove queue_changed() as it is obsoleted by making the relationship
between Event_scheduler and Event_queue one-way, from the former to the
latter. Event_queue now does not know about Event_scheduler.
get_state() is changed to is_running(). The state enum should be private,
as it is not needed to be seen from outside anymore.
sql/events.cc:
Event_queue does not need anymore a pointer to Event_scheduler.
Diffstat (limited to 'sql/event_scheduler.cc')
-rw-r--r-- | sql/event_scheduler.cc | 188 |
1 files changed, 79 insertions, 109 deletions
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index d4a5df85cda..6423f3676fc 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q) LOCK_DATA(); queue= q; started_events= 0; - thread_id= 0; + scheduler_thd= NULL; state= INITIALIZED; UNLOCK_DATA(); } @@ -397,22 +397,18 @@ Event_scheduler::start() scheduler_param_value->thd= new_thd; scheduler_param_value->scheduler= this; + scheduler_thd= new_thd; + DBUG_PRINT("info", ("Setting state go RUNNING")); + state= RUNNING; DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); if (pthread_create(&th, &connection_attrib, event_scheduler_thread, (void*)scheduler_param_value)) { DBUG_PRINT("error", ("cannot create a new thread")); state= INITIALIZED; + scheduler_thd= NULL; ret= TRUE; - } - DBUG_PRINT("info", ("Setting state go RUNNING")); - state= RUNNING; -end: - UNLOCK_DATA(); - if (ret && new_thd) - { - DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); new_thd->proc_info= "Clearing"; DBUG_ASSERT(new_thd->net.buff != 0); net_end(&new_thd->net); @@ -422,6 +418,9 @@ end: delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } +end: + UNLOCK_DATA(); + DBUG_RETURN(ret); } @@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd) Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); - LOCK_DATA(); - - thread_id= thd->thread_id; sql_print_information("SCHEDULER: Manager thread started with id %lu", - thread_id); + thd->thread_id); /* Recalculate the values in the queue because there could have been stops in executions of the scheduler and some times could have passed by. */ queue->recalculate_activation_times(thd); - while (state == RUNNING) + + while (is_running()) { - thd->end_time(); /* Gets a minimized version */ - if (queue->get_top_for_execution_if_time(thd, thd->query_start(), - &job_data, &abstime)) + if (queue->get_top_for_execution_if_time(thd, &job_data)) { - sql_print_information("SCHEDULER: Serious error during getting next" - " event to execute. Stopping"); + sql_print_information("SCHEDULER: Serious error during getting next " + "event to execute. Stopping"); break; } - DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " - "abs_time.tv_sec=%d", - job_data, thd->query_start(), abstime.tv_sec)); - if (!job_data && !abstime.tv_sec) - { - DBUG_PRINT("info", ("The queue is empty. Going to sleep")); - COND_STATE_WAIT(thd, NULL, "Waiting on empty queue"); - DBUG_PRINT("info", ("Woke up. Got COND_state")); - } - else if (abstime.tv_sec) + DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data)); + if (job_data) { - DBUG_PRINT("info", ("Have to sleep some time %u s. till %u", - abstime.tv_sec - thd->query_start(), abstime.tv_sec)); - - COND_STATE_WAIT(thd, &abstime, "Waiting for next activation"); - /* - If we get signal we should recalculate the whether it's the right time - because there could be : - 1. Spurious wake-up - 2. The top of the queue was changed (new one becase of create/update) - */ - DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); + if ((res= execute_top(thd, job_data))) + break; } else { - UNLOCK_DATA(); - res= execute_top(thd, job_data); - LOCK_DATA(); - if (res) - break; - ++started_events; + DBUG_ASSERT(thd->killed); + DBUG_PRINT("info", ("job_data is NULL, the thread was killed")); } DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); } + LOCK_DATA(); DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); - pthread_cond_signal(&COND_state); -error: state= INITIALIZED; + pthread_cond_signal(&COND_state); UNLOCK_DATA(); sql_print_information("SCHEDULER: Stopped"); @@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) job_data))) goto error; + ++started_events; + DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); DBUG_RETURN(FALSE); @@ -568,6 +544,27 @@ error: /* + Checkes whether the state of the scheduler is RUNNING + + SYNOPSIS + Event_scheduler::is_running() + + RETURN VALUE + TRUE RUNNING + FALSE Not RUNNING +*/ + +inline bool +Event_scheduler::is_running() +{ + LOCK_DATA(); + bool ret= (state == RUNNING); + UNLOCK_DATA(); + return ret; +} + + +/* Stops the scheduler (again). Waits for acknowledgement from the scheduler that it has stopped - synchronous stopping. @@ -591,26 +588,48 @@ Event_scheduler::stop() if (state != RUNNING) goto end; - state= STOPPING; - - DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); - sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); - - pthread_cond_signal(&COND_state); - /* Guarantee we don't catch spurious signals */ - sql_print_information("SCHEDULER: Waiting the manager thread to reply"); do { DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " "thread. Current value of state is %s . " "workers count=%d", scheduler_states_names[state].str, workers_count())); + /* + NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON + threads. In addition, kill_one_thread() requires THD but during shutdown + current_thd is NULL. Hence, if kill_one_thread should be used it has to + be modified to kill also daemons, by adding a flag, and also we have to + create artificial THD here. To save all this work, we just do what + kill_one_thread() does to kill a thread. See also sql_repl.cc for similar + usage. + */ + + state= STOPPING; + DBUG_PRINT("info", ("Manager thread has id %d", scheduler_thd->thread_id)); + /* Lock from delete */ + pthread_mutex_lock(&scheduler_thd->LOCK_delete); + /* This will wake up the thread if it waits on Queue's conditional */ + sql_print_information("SCHEDULER: Killing manager thread %lu", + scheduler_thd->thread_id); + scheduler_thd->awake(THD::KILL_CONNECTION); + pthread_mutex_unlock(&scheduler_thd->LOCK_delete); + /* thd could be 0x0, when shutting down */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop"); } while (state == STOPPING); DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); - - thread_id= 0; + /* + The rationale behind setting it to NULL here but not destructing it + beforehand is because the THD will be deinited in event_scheduler_thread(). + It's more clear when the post_init and the deinit is done in one function. + Here we just mark that the scheduler doesn't have a THD anymore. Though for + milliseconds the old thread could exist we can't use it anymore. When we + unlock the mutex in this function a little later the state will be + INITIALIZED. Therefore, a connection thread could enter the critical section + and will create a new THD object. + */ + scheduler_thd= NULL; end: UNLOCK_DATA(); DBUG_RETURN(FALSE); @@ -634,12 +653,8 @@ Event_scheduler::workers_count() pthread_mutex_lock(&LOCK_thread_count); // For unlink from list I_List_iterator<THD> it(threads); while ((tmp=it++)) - { - if (tmp->command == COM_DAEMON) - continue; if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) ++count; - } pthread_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("exit", ("%d", count)); DBUG_RETURN(count); @@ -647,25 +662,6 @@ Event_scheduler::workers_count() /* - Signals the main scheduler thread that the queue has changed - its state. - - SYNOPSIS - Event_scheduler::queue_changed() -*/ - -void -Event_scheduler::queue_changed() -{ - DBUG_ENTER("Event_scheduler::queue_changed"); - DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", - scheduler_states_names[state].str)); - pthread_cond_signal(&COND_state); - DBUG_VOID_RETURN; -} - - -/* Auxiliary function for locking LOCK_scheduler_state. Used by the LOCK_DATA macro. @@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line) Event_scheduler::cond_wait() thd Thread (Could be NULL during shutdown procedure) abstime If not null then call pthread_cond_timedwait() + msg Message for thd->proc_info func Which function is requesting cond_wait line On which line cond_wait is requested */ @@ -757,33 +754,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg, /* - Returns the current state of the scheduler - - SYNOPSIS - Event_scheduler::get_state() - - RETURN VALUE - The state of the scheduler (INITIALIZED | RUNNING | STOPPING) -*/ - -enum Event_scheduler::enum_state -Event_scheduler::get_state() -{ - enum Event_scheduler::enum_state ret; - DBUG_ENTER("Event_scheduler::get_state"); - LOCK_DATA(); - ret= state; - UNLOCK_DATA(); - DBUG_RETURN(ret); -} - - -/* - REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF - Returns whether the scheduler was initialized. -*/ - -/* Dumps the internal status of the scheduler SYNOPSIS @@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("thread_id"), scs); if (thread_id) { - int_string.set((longlong) thread_id, scs); + int_string.set((longlong) scheduler_thd->thread_id, scs); protocol->store(&int_string); } else |