summaryrefslogtreecommitdiff
path: root/sql/event_scheduler.cc
diff options
context:
space:
mode:
authorunknown <andrey@fifo.vaih.whnetz>2006-08-31 17:18:39 +0200
committerunknown <andrey@fifo.vaih.whnetz>2006-08-31 17:18:39 +0200
commit0410cc3a0c87582fe9d749f5e007dcdb1b9848e1 (patch)
tree7578f63a766b0ce331cbafad99ef748c185822f1 /sql/event_scheduler.cc
parent3e4b79d9cafea4d1e86ddba6c8f5658f4c4ef23c (diff)
downloadmariadb-git-0410cc3a0c87582fe9d749f5e007dcdb1b9848e1.tar.gz
WL#3337 (Event scheduler new architecture)
This patch makes the relationship between Event_scheduler and Event_queue unidirectional from the former to the latter. The change is that the conditional on which the scheduler sleeped has been moved to the Event_queue and the latter does not call anymore Event_scheduler::queue_changed(), which in turn has be removed. sql/event_queue.cc: Remove dependency of Event_queue on Event_scheduler but not vice versa. Event_scheduler polls whether there is time to execute an event. Removed notify_observers() as the way of calling has changed. Added Event_queue::cond_wait() similar to Event_scheduler::cond_wait(). sql/event_queue.h: init_queue() does not need anymore Event_scheduler object because the relationship is now one-way. Event_scheduler knows about Event_queue but not vice versa. get_top_execution_if_time() does by itself the waiting instead of returning abstime. This simplifies the code in Event_scheduler::run() get_top_execution_if_time() returns only if job_data != NULL or if the scheduler thread was killed. notify_observers() is no more used and therefore removed. Added Event_queue::cond_wait() because now there is waiting on a conditional variable in Event_queue too (like in Event_scheduler for ::stop()). sql/event_scheduler.cc: Change the relationship between Event_scheduler & Event_queue. Event_queue does not know anymore about Event_scheduler. When the scheduler calls get_top_element_if_time() it may fall asleep on a conditional of Event_queue, if either the queue is empty or it's still not time for activation. When the method returns it will return a non-null address, namely an object to be executed. If the return value is NULL, the thread was killed by a call to Event_scheduler::stop() (we assert this). sql/event_scheduler.h: Remove queue_changed() as it is obsoleted by making the relationship between Event_scheduler and Event_queue one-way, from the former to the latter. Event_queue now does not know about Event_scheduler. get_state() is changed to is_running(). The state enum should be private, as it is not needed to be seen from outside anymore. sql/events.cc: Event_queue does not need anymore a pointer to Event_scheduler.
Diffstat (limited to 'sql/event_scheduler.cc')
-rw-r--r--sql/event_scheduler.cc188
1 files changed, 79 insertions, 109 deletions
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index d4a5df85cda..6423f3676fc 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q)
LOCK_DATA();
queue= q;
started_events= 0;
- thread_id= 0;
+ scheduler_thd= NULL;
state= INITIALIZED;
UNLOCK_DATA();
}
@@ -397,22 +397,18 @@ Event_scheduler::start()
scheduler_param_value->thd= new_thd;
scheduler_param_value->scheduler= this;
+ scheduler_thd= new_thd;
+ DBUG_PRINT("info", ("Setting state go RUNNING"));
+ state= RUNNING;
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
(void*)scheduler_param_value))
{
DBUG_PRINT("error", ("cannot create a new thread"));
state= INITIALIZED;
+ scheduler_thd= NULL;
ret= TRUE;
- }
- DBUG_PRINT("info", ("Setting state go RUNNING"));
- state= RUNNING;
-end:
- UNLOCK_DATA();
- if (ret && new_thd)
- {
- DBUG_PRINT("info", ("There was an error during THD creation. Clean up"));
new_thd->proc_info= "Clearing";
DBUG_ASSERT(new_thd->net.buff != 0);
net_end(&new_thd->net);
@@ -422,6 +418,9 @@ end:
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
+end:
+ UNLOCK_DATA();
+
DBUG_RETURN(ret);
}
@@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd)
Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
- LOCK_DATA();
-
- thread_id= thd->thread_id;
sql_print_information("SCHEDULER: Manager thread started with id %lu",
- thread_id);
+ thd->thread_id);
/*
Recalculate the values in the queue because there could have been stops
in executions of the scheduler and some times could have passed by.
*/
queue->recalculate_activation_times(thd);
- while (state == RUNNING)
+
+ while (is_running())
{
- thd->end_time();
/* Gets a minimized version */
- if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
- &job_data, &abstime))
+ if (queue->get_top_for_execution_if_time(thd, &job_data))
{
- sql_print_information("SCHEDULER: Serious error during getting next"
- " event to execute. Stopping");
+ sql_print_information("SCHEDULER: Serious error during getting next "
+ "event to execute. Stopping");
break;
}
- DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
- "abs_time.tv_sec=%d",
- job_data, thd->query_start(), abstime.tv_sec));
- if (!job_data && !abstime.tv_sec)
- {
- DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
- COND_STATE_WAIT(thd, NULL, "Waiting on empty queue");
- DBUG_PRINT("info", ("Woke up. Got COND_state"));
- }
- else if (abstime.tv_sec)
+ DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data));
+ if (job_data)
{
- DBUG_PRINT("info", ("Have to sleep some time %u s. till %u",
- abstime.tv_sec - thd->query_start(), abstime.tv_sec));
-
- COND_STATE_WAIT(thd, &abstime, "Waiting for next activation");
- /*
- If we get signal we should recalculate the whether it's the right time
- because there could be :
- 1. Spurious wake-up
- 2. The top of the queue was changed (new one becase of create/update)
- */
- DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
+ if ((res= execute_top(thd, job_data)))
+ break;
}
else
{
- UNLOCK_DATA();
- res= execute_top(thd, job_data);
- LOCK_DATA();
- if (res)
- break;
- ++started_events;
+ DBUG_ASSERT(thd->killed);
+ DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
}
DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
}
+ LOCK_DATA();
DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
- pthread_cond_signal(&COND_state);
-error:
state= INITIALIZED;
+ pthread_cond_signal(&COND_state);
UNLOCK_DATA();
sql_print_information("SCHEDULER: Stopped");
@@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
job_data)))
goto error;
+ ++started_events;
+
DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd));
DBUG_RETURN(FALSE);
@@ -568,6 +544,27 @@ error:
/*
+ Checkes whether the state of the scheduler is RUNNING
+
+ SYNOPSIS
+ Event_scheduler::is_running()
+
+ RETURN VALUE
+ TRUE RUNNING
+ FALSE Not RUNNING
+*/
+
+inline bool
+Event_scheduler::is_running()
+{
+ LOCK_DATA();
+ bool ret= (state == RUNNING);
+ UNLOCK_DATA();
+ return ret;
+}
+
+
+/*
Stops the scheduler (again). Waits for acknowledgement from the
scheduler that it has stopped - synchronous stopping.
@@ -591,26 +588,48 @@ Event_scheduler::stop()
if (state != RUNNING)
goto end;
- state= STOPPING;
-
- DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
- sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
-
- pthread_cond_signal(&COND_state);
-
/* Guarantee we don't catch spurious signals */
- sql_print_information("SCHEDULER: Waiting the manager thread to reply");
do {
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %s . "
"workers count=%d", scheduler_states_names[state].str,
workers_count()));
+ /*
+ NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
+ threads. In addition, kill_one_thread() requires THD but during shutdown
+ current_thd is NULL. Hence, if kill_one_thread should be used it has to
+ be modified to kill also daemons, by adding a flag, and also we have to
+ create artificial THD here. To save all this work, we just do what
+ kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
+ usage.
+ */
+
+ state= STOPPING;
+ DBUG_PRINT("info", ("Manager thread has id %d", scheduler_thd->thread_id));
+ /* Lock from delete */
+ pthread_mutex_lock(&scheduler_thd->LOCK_delete);
+ /* This will wake up the thread if it waits on Queue's conditional */
+ sql_print_information("SCHEDULER: Killing manager thread %lu",
+ scheduler_thd->thread_id);
+ scheduler_thd->awake(THD::KILL_CONNECTION);
+ pthread_mutex_unlock(&scheduler_thd->LOCK_delete);
+
/* thd could be 0x0, when shutting down */
+ sql_print_information("SCHEDULER: Waiting the manager thread to reply");
COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop");
} while (state == STOPPING);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
-
- thread_id= 0;
+ /*
+ The rationale behind setting it to NULL here but not destructing it
+ beforehand is because the THD will be deinited in event_scheduler_thread().
+ It's more clear when the post_init and the deinit is done in one function.
+ Here we just mark that the scheduler doesn't have a THD anymore. Though for
+ milliseconds the old thread could exist we can't use it anymore. When we
+ unlock the mutex in this function a little later the state will be
+ INITIALIZED. Therefore, a connection thread could enter the critical section
+ and will create a new THD object.
+ */
+ scheduler_thd= NULL;
end:
UNLOCK_DATA();
DBUG_RETURN(FALSE);
@@ -634,12 +653,8 @@ Event_scheduler::workers_count()
pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
- {
- if (tmp->command == COM_DAEMON)
- continue;
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
++count;
- }
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_PRINT("exit", ("%d", count));
DBUG_RETURN(count);
@@ -647,25 +662,6 @@ Event_scheduler::workers_count()
/*
- Signals the main scheduler thread that the queue has changed
- its state.
-
- SYNOPSIS
- Event_scheduler::queue_changed()
-*/
-
-void
-Event_scheduler::queue_changed()
-{
- DBUG_ENTER("Event_scheduler::queue_changed");
- DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ",
- scheduler_states_names[state].str));
- pthread_cond_signal(&COND_state);
- DBUG_VOID_RETURN;
-}
-
-
-/*
Auxiliary function for locking LOCK_scheduler_state. Used
by the LOCK_DATA macro.
@@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line)
Event_scheduler::cond_wait()
thd Thread (Could be NULL during shutdown procedure)
abstime If not null then call pthread_cond_timedwait()
+ msg Message for thd->proc_info
func Which function is requesting cond_wait
line On which line cond_wait is requested
*/
@@ -757,33 +754,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
/*
- Returns the current state of the scheduler
-
- SYNOPSIS
- Event_scheduler::get_state()
-
- RETURN VALUE
- The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
-*/
-
-enum Event_scheduler::enum_state
-Event_scheduler::get_state()
-{
- enum Event_scheduler::enum_state ret;
- DBUG_ENTER("Event_scheduler::get_state");
- LOCK_DATA();
- ret= state;
- UNLOCK_DATA();
- DBUG_RETURN(ret);
-}
-
-
-/*
- REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF
- Returns whether the scheduler was initialized.
-*/
-
-/*
Dumps the internal status of the scheduler
SYNOPSIS
@@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->store(STRING_WITH_LEN("thread_id"), scs);
if (thread_id)
{
- int_string.set((longlong) thread_id, scs);
+ int_string.set((longlong) scheduler_thd->thread_id, scs);
protocol->store(&int_string);
}
else