summaryrefslogtreecommitdiff
path: root/sql/event_scheduler.cc
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-03 11:20:08 +0200
committerunknown <andrey@lmy004.>2006-07-03 11:20:08 +0200
commit377446fa3497ffbc0f2a17614d848bfb79f52662 (patch)
tree99e7b4fe6e41946b7b36c74b0c7e898e13945244 /sql/event_scheduler.cc
parent8d961c45e2e83f04da92cbfc31d2975a6949743f (diff)
downloadmariadb-git-377446fa3497ffbc0f2a17614d848bfb79f52662.tar.gz
WL#3337 (Event scheduler new architecture)
This is the first cut of separating Event_scheduler in two classes which are more specialized. Inheritance was used to separate methods and member variables. Still Event_scheduler is a child of Event_queue. This dependency will be removed soon. sql/event_data_objects.cc: add comments sql/event_db_repository.cc: coding style sql/event_db_repository.h: add a call, will be implemented later sql/event_queue.cc: Event_queue, still as super-class of Event_scheduler sql/event_queue.h: Event_queue as super-class of Event_scheduler. Trying to separate the two classes sql/event_scheduler.cc: Event_scheduler as child class of Event_queue. Trying to separate both classes. sql/event_scheduler.h: Event_scheduler as child class of Event_queue. Trying to separate both classes. sql/events.cc: Don't allocate on the stack the scheduler but on the heap. The exact way it is done will be changed, that's ok for now.
Diffstat (limited to 'sql/event_scheduler.cc')
-rw-r--r--sql/event_scheduler.cc1006
1 files changed, 164 insertions, 842 deletions
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index 35c30e5fc5a..fb60ce8ae6d 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -264,11 +264,6 @@ LEX_STRING states_names[] =
};
#endif
-
-Event_scheduler
-Event_scheduler::singleton;
-
-
const char * const
Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] =
{
@@ -278,6 +273,13 @@ Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] =
};
+/*
+Event_scheduler*
+Event_scheduler::singleton= NULL;
+*/
+
+
+
class Worker_thread_param
{
public:
@@ -301,35 +303,6 @@ public:
/*
- Compares the execute_at members of 2 Event_timed instances.
- Used as callback for the prioritized queue when shifting
- elements inside.
-
- SYNOPSIS
- event_timed_compare_q()
-
- vptr - not used (set it to NULL)
- a - first Event_timed object
- b - second Event_timed object
-
- RETURN VALUE
- -1 - a->execute_at < b->execute_at
- 0 - a->execute_at == b->execute_at
- 1 - a->execute_at > b->execute_at
-
- NOTES
- execute_at.second_part is not considered during comparison
-*/
-
-static int
-event_timed_compare_q(void *vptr, byte* a, byte *b)
-{
- return my_time_compare(&((Event_timed *)a)->execute_at,
- &((Event_timed *)b)->execute_at);
-}
-
-
-/*
Prints the stack of infos, warnings, errors from thd to
the console so it can be fetched by the logs-into-tables and
checked later.
@@ -640,6 +613,8 @@ event_worker_thread(void *arg)
}
+
+
/*
Constructor of class Event_scheduler.
@@ -648,15 +623,35 @@ event_worker_thread(void *arg)
*/
Event_scheduler::Event_scheduler()
- :state(UNINITIALIZED), start_scheduler_suspended(FALSE),
- thread_id(0), mutex_last_locked_at_line(0),
- mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""),
- mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE),
- mutex_scheduler_data_locked(FALSE)
{
+ thread_id= 0;
+ mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0;
+ mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= "";
+ cond_waiting_on= COND_NONE;
+ mutex_scheduler_data_locked= FALSE;
+ state= UNINITIALIZED;
+ start_scheduler_suspended= FALSE;
+ LOCK_scheduler_data= &LOCK_event_queue;
}
+
+/*
+ Returns the singleton instance of the class.
+
+ SYNOPSIS
+ Event_scheduler::create_instance()
+
+ RETURN VALUE
+ address
+*/
+
+void
+Event_scheduler::create_instance()
+{
+ singleton= new Event_scheduler();
+}
+
/*
Returns the singleton instance of the class.
@@ -671,7 +666,7 @@ Event_scheduler*
Event_scheduler::get_instance()
{
DBUG_ENTER("Event_scheduler::get_instance");
- DBUG_RETURN(&singleton);
+ DBUG_RETURN(singleton);
}
@@ -693,9 +688,9 @@ Event_scheduler::init(Event_db_repository *db_repo)
bool ret= FALSE;
DBUG_ENTER("Event_scheduler::init");
DBUG_PRINT("enter", ("this=%p", this));
-
+
+ Event_queue::init(db_repo);
LOCK_SCHEDULER_DATA();
- db_repository= db_repo;
for (;i < COND_LAST; i++)
if (pthread_cond_init(&cond_vars[i], NULL))
{
@@ -703,27 +698,6 @@ Event_scheduler::init(Event_db_repository *db_repo)
ret= TRUE;
goto end;
}
-
- /* init memory root */
- init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
-
- if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
- event_timed_compare_q, NULL, 30 /*auto_extent*/))
- {
- sql_print_error("SCHEDULER: Can't initialize the execution queue");
- ret= TRUE;
- goto end;
- }
-
- if (sizeof(my_time_t) != sizeof(time_t))
- {
- sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
- "The scheduler may not work correctly. Stopping.");
- DBUG_ASSERT(0);
- ret= TRUE;
- goto end;
- }
-
state= INITIALIZED;
end:
UNLOCK_SCHEDULER_DATA();
@@ -746,14 +720,12 @@ void
Event_scheduler::destroy()
{
DBUG_ENTER("Event_scheduler");
-
+ Event_queue::deinit();
LOCK_SCHEDULER_DATA();
switch (state) {
case UNINITIALIZED:
break;
case INITIALIZED:
- delete_queue(&queue);
- free_root(&scheduler_root, MYF(0));
int i;
for (i= 0; i < COND_LAST; i++)
pthread_cond_destroy(&cond_vars[i]);
@@ -771,389 +743,6 @@ Event_scheduler::destroy()
}
-/*
- Creates an event in the scheduler queue
-
- SYNOPSIS
- Event_scheduler::create_event()
- et The event to add
- check_existence Whether to check if already loaded.
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
-*/
-
-int
-Event_scheduler::create_event(THD *thd, Event_parse_data *et, bool check_existence)
-{
- int res;
- Event_timed *et_new;
- DBUG_ENTER("Event_scheduler::create_event");
- DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
- if (check_existence && find_event(et->dbname, et->name, FALSE))
- {
- res= OP_ALREADY_EXISTS;
- goto end;
- }
-
- /* We need to load the event on scheduler_root */
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
- {
- queue_insert_safe(&queue, (byte *) et_new);
- DBUG_PRINT("info", ("Sending COND_new_work"));
- pthread_cond_signal(&cond_vars[COND_new_work]);
- }
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(res);
-}
-
-
-/*
- Drops an event from the scheduler queue
-
- SYNOPSIS
- Event_scheduler::drop_event()
- etn The event to drop
- state Wait the event or kill&drop
-
- RETURN VALUE
- FALSE OK (replaced or scheduler not working)
- TRUE Failure
-*/
-
-bool
-Event_scheduler::drop_event(THD *thd, sp_name *name)
-{
- int res;
- Event_timed *et_old;
- DBUG_ENTER("Event_scheduler::drop_event");
- DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name,
- &LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
-
- if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
- DBUG_PRINT("info", ("No such event found, probably DISABLED"));
-
- UNLOCK_SCHEDULER_DATA();
-
- /* See comments in ::replace_event() why this is split in two parts. */
- if (et_old)
- {
- switch ((res= et_old->kill_thread(thd))) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- sql_print_error("SCHEDULER: Got unexpected error %d", res);
- DBUG_ASSERT(0);
- }
- }
-
- DBUG_RETURN(FALSE);
-}
-
-
-/*
- Updates an event from the scheduler queue
-
- SYNOPSIS
- Event_scheduler::replace_event()
- et The event to replace(add) into the queue
- state Async or sync stopping
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
- OP_ALREADY_EXISTS Event already in the queue
-*/
-
-int
-Event_scheduler::update_event(THD *thd, Event_parse_data *et,
- LEX_STRING *new_schema,
- LEX_STRING *new_name)
-{
- int res= OP_OK;
- Event_timed *et_old, *et_new= NULL;
- LEX_STRING old_schema, old_name;
-
- LINT_INIT(old_schema.str);
- LINT_INIT(old_schema.length);
- LINT_INIT(old_name.str);
- LINT_INIT(old_name.length);
-
- DBUG_ENTER("Event_scheduler::update_event");
- DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
- thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
-
- if (!(et_old= find_event(et->dbname, et->name, TRUE)))
- DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
- et->dbname.str, et->name.str));
-
- if (new_schema && new_name)
- {
- old_schema= et->dbname;
- old_name= et->name;
- et->dbname= *new_schema;
- et->name= *new_name;
- }
- /*
- We need to load the event (it's strings but on the object itself)
- on scheduler_root. et_new could be NULL :
- 1. Error occured
- 2. If the replace is DISABLED, we don't load it into the queue.
- */
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
- {
- queue_insert_safe(&queue, (byte *) et_new);
- DBUG_PRINT("info", ("Sending COND_new_work"));
- pthread_cond_signal(&cond_vars[COND_new_work]);
- }
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
-
- if (new_schema && new_name)
- {
- et->dbname= old_schema;
- et->name= old_name;
- }
- DBUG_PRINT("info", ("res=%d", res));
- UNLOCK_SCHEDULER_DATA();
- /*
- Andrey: Is this comment still truthful ???
-
- We don't move this code above because a potential kill_thread will call
- THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
- which is LOCK_scheduler_data on which the COND_new_work in ::run() locks.
- Hence, we try to acquire a lock which we have already acquired and we run
- into an assert. Holding LOCK_scheduler_data however is not needed because
- we don't touch any invariant of the scheduler anymore. ::drop_event() does
- the same.
- */
- if (et_old)
- {
- switch (et_old->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
-
- DBUG_RETURN(res);
-}
-
-
-/*
- Searches for an event in the scheduler queue
-
- SYNOPSIS
- Event_scheduler::find_event()
- db The schema of the event to find
- name The event to find
- remove_from_q If found whether to remove from the Q
-
- RETURN VALUE
- NULL Not found
- otherwise Address
-
- NOTE
- The caller should do the locking also the caller is responsible for
- actual signalling in case an event is removed from the queue
- (signalling COND_new_work for instance).
-*/
-
-Event_timed *
-Event_scheduler::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
-{
- uint i;
- DBUG_ENTER("Event_scheduler::find_event");
-
- for (i= 0; i < queue.elements; ++i)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
- et->dbname.str, et->name.str));
- if (event_timed_identifier_equal(db, name, et))
- {
- if (remove_from_q)
- queue_remove(&queue, i);
- DBUG_RETURN(et);
- }
- }
-
- DBUG_RETURN(NULL);
-}
-
-
-/*
- Drops all events from the in-memory queue and disk that match
- certain pattern evaluated by a comparator function
-
- SYNOPSIS
- Event_scheduler::drop_matching_events()
- thd THD
- pattern A pattern string
- comparator The function to use for comparing
-
- RETURN VALUE
- -1 Scheduler not working
- >=0 Number of dropped events
-
- NOTE
- Expected is the caller to acquire lock on LOCK_scheduler_data
-*/
-
-void
-Event_scheduler::drop_matching_events(THD *thd, LEX_STRING pattern,
- bool (*comparator)(Event_timed *,LEX_STRING *))
-{
- DBUG_ENTER("Event_scheduler::drop_matching_events");
- DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str,
- state));
- if (is_running_or_suspended())
- {
- uint i= 0, dropped= 0;
- while (i < queue.elements)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
- if (comparator(et, &pattern))
- {
- /*
- The queue is ordered. If we remove an element, then all elements after
- it will shift one position to the left, if we imagine it as an array
- from left to the right. In this case we should not increment the
- counter and the (i < queue.elements) condition is ok.
- */
- queue_remove(&queue, i);
-
- /* See replace_event() */
- switch (et->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et->flags |= EVENT_FREE_WHEN_FINISHED;
- ++dropped;
- break;
- case 0:
- delete et;
- ++dropped;
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
- else
- i++;
- }
- DBUG_PRINT("info", ("Dropped %lu", dropped));
- }
- /*
- Don't send COND_new_work because no need to wake up the scheduler thread.
- When it wakes next time up it will recalculate how much more it should
- sleep if the top of the queue has been changed by this method.
- */
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Drops all events from the in-memory queue and disk that are from
- certain schema.
-
- SYNOPSIS
- Event_scheduler::drop_schema_events()
- thd THD
- db The schema name
-
- RETURN VALUE
- -1 Scheduler not working
- >=0 Number of dropped events
-*/
-
-int
-Event_scheduler::drop_schema_events(THD *thd, LEX_STRING schema)
-{
- int ret;
- DBUG_ENTER("Event_scheduler::drop_schema_events");
- LOCK_SCHEDULER_DATA();
- if (is_running_or_suspended())
- drop_matching_events(thd, schema, event_timed_db_equal);
-
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_RETURN(ret);
-}
-
-
extern pthread_attr_t connection_attrib;
@@ -1205,8 +794,8 @@ Event_scheduler::start()
}
/* Wait till the child thread has booted (w/ or wo success) */
- while (!is_running_or_suspended() && state != CANTSTART)
- cond_wait(COND_started_or_stopped, &LOCK_scheduler_data);
+ while (!(state == SUSPENDED || state == RUNNING) && state != CANTSTART)
+ cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
/*
If we cannot start for some reason then don't prohibit further attempts.
@@ -1314,7 +903,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Manager thread started with id %lu",
thd->thread_id);
abstime.tv_nsec= 0;
- while (is_running_or_suspended())
+ while ((state == SUSPENDED || state == RUNNING))
{
Event_timed *et;
@@ -1374,9 +963,9 @@ Event_scheduler::run(THD *thd)
pthread_cond_timedwait() will wait till `abstime`.
"Sleeping until next time"
*/
- thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping");
+ thd->enter_cond(&cond_vars[COND_new_work],LOCK_scheduler_data,"Sleeping");
- pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data,
+ pthread_cond_timedwait(&cond_vars[COND_new_work], LOCK_scheduler_data,
&abstime);
DBUG_PRINT("info", ("Manager woke up. state is %d", state));
@@ -1397,7 +986,7 @@ Event_scheduler::run(THD *thd)
In this case stop the manager.
We should enter ::execute_top() with locked LOCK_scheduler_data.
*/
- int ret= execute_top(thd);
+ int ret= execute_top(thd, et);
UNLOCK_SCHEDULER_DATA();
if (ret)
break;
@@ -1421,7 +1010,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Shutting down");
thd->proc_info= (char *)"Cleaning queue";
- clean_queue(thd);
+ clean_memory(thd);
THD_CHECK_SENTRY(thd);
/* free mamager_root memory but don't destroy the root */
@@ -1474,15 +1063,13 @@ Event_scheduler::run(THD *thd)
*/
bool
-Event_scheduler::execute_top(THD *thd)
+Event_scheduler::execute_top(THD *thd, Event_timed *et)
{
int spawn_ret_code;
bool ret= FALSE;
DBUG_ENTER("Event_scheduler::execute_top");
DBUG_PRINT("enter", ("thd=%p", thd));
- Event_timed *et= (Event_timed *)queue_top(&queue);
-
/* Is it good idea to pass a stack address ?*/
Worker_thread_param param(et);
@@ -1552,7 +1139,7 @@ Event_scheduler::execute_top(THD *thd)
*/
void
-Event_scheduler::clean_queue(THD *thd)
+Event_scheduler::clean_memory(THD *thd)
{
CHARSET_INFO *scs= system_charset_info;
uint i;
@@ -1565,14 +1152,7 @@ Event_scheduler::clean_queue(THD *thd)
sql_print_information("SCHEDULER: Emptying the queue");
- /* empty the queue */
- for (i= 0; i < queue.elements; ++i)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- et->free_sp();
- delete et;
- }
- resize_queue(&queue, 0);
+ empty_queue();
DBUG_VOID_RETURN;
}
@@ -1681,7 +1261,7 @@ Event_scheduler::stop()
DBUG_PRINT("enter", ("thd=%p", current_thd));
LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
+ if (!(state == SUSPENDED || state == RUNNING))
{
/*
One situation to be here is if there was a start that forked a new
@@ -1717,7 +1297,7 @@ Event_scheduler::stop()
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %d . "
"workers count=%d", state, workers_count()));
- cond_wait(COND_started_or_stopped, &LOCK_scheduler_data);
+ cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
}
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
UNLOCK_SCHEDULER_DATA();
@@ -1768,7 +1348,7 @@ Event_scheduler::suspend_or_resume(
pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
}
DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume"));
- cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data);
+ cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
DBUG_PRINT("info", ("Got response"));
}
UNLOCK_SCHEDULER_DATA();
@@ -1833,7 +1413,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
if (state == SUSPENDED)
{
- thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data,
+ thd->enter_cond(&cond_vars[COND_suspend_or_resume], LOCK_scheduler_data,
"Suspended");
/* Send back signal to the thread that asked us to suspend operations */
pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
@@ -1842,7 +1422,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
while (state == SUSPENDED)
{
- cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data);
+ cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume"));
if (state != SUSPENDED)
{
@@ -1852,18 +1432,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
if (was_suspended)
{
- if (queue.elements)
- {
- uint i;
- DBUG_PRINT("info", ("We have to recompute the execution times"));
-
- for (i= 0; i < queue.elements; i++)
- {
- ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
- ((Event_timed*)queue_element(&queue, i))->update_fields(thd);
- }
- queue_fix(&queue);
- }
+ recalculate_queue(thd);
/* This will implicitly unlock LOCK_scheduler_data */
thd->exit_cond("");
}
@@ -1892,18 +1461,18 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
bool slept= FALSE;
DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue");
DBUG_PRINT("enter", ("q.elements=%lu state=%s",
- queue.elements, states_names[state]));
+ events_count_no_lock(), states_names[state]));
- if (!queue.elements)
- thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data,
+ if (!events_count_no_lock())
+ thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data,
"Empty queue, sleeping");
/* Wait in a loop protecting against catching spurious signals */
- while (!queue.elements && state == RUNNING)
+ while (!events_count_no_lock() && state == RUNNING)
{
slept= TRUE;
DBUG_PRINT("info", ("Entering condition because of empty queue"));
- cond_wait(COND_new_work, &LOCK_scheduler_data);
+ cond_wait(COND_new_work, LOCK_scheduler_data);
DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d",
state));
/*
@@ -1916,105 +1485,13 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
thd->exit_cond("");
DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d",
- queue.elements, states_names[state], thd->killed));
+ events_count_no_lock(), states_names[state], thd->killed));
DBUG_RETURN(slept);
}
/*
- Wrapper for pthread_mutex_lock
-
- SYNOPSIS
- Event_scheduler::lock_data()
- mutex Mutex to lock
- line The line number on which the lock is done
-
- RETURN VALUE
- Error code of pthread_mutex_lock()
-*/
-
-inline void
-Event_scheduler::lock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_scheduler_data, func, line));
- pthread_mutex_lock(&LOCK_scheduler_data);
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_scheduler_data_locked= TRUE;
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_mutex_unlock
-
- SYNOPSIS
- Event_scheduler::unlock_data()
- mutex Mutex to unlock
- line The line number on which the unlock is done
-*/
-
-inline void
-Event_scheduler::unlock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- &LOCK_scheduler_data, func, line));
- mutex_last_unlocked_at_line= line;
- mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
- pthread_mutex_unlock(&LOCK_scheduler_data);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_cond_wait
-
- SYNOPSIS
- Event_scheduler::cond_wait()
- cond Conditional to wait for
- mutex Mutex of the conditional
-
- RETURN VALUE
- Error code of pthread_cond_wait()
-*/
-
-inline int
-Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond,
- pthread_mutex_t *mutex)
-{
- int ret;
- DBUG_ENTER("Event_scheduler::cond_wait");
- DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex));
- ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex);
- cond_waiting_on= COND_NONE;
- DBUG_RETURN(ret);
-}
-
-
-/*
- Checks whether the scheduler is in a running or suspended state.
-
- SYNOPSIS
- Event_scheduler::is_running_or_suspended()
-
- RETURN VALUE
- TRUE Either running or suspended
- FALSE IN_SHUTDOWN, not started, etc.
-*/
-
-inline bool
-Event_scheduler::is_running_or_suspended()
-{
- return (state == SUSPENDED || state == RUNNING);
-}
-
-
-/*
Returns the current state of the scheduler
SYNOPSIS
@@ -2027,9 +1504,9 @@ Event_scheduler::get_state()
enum Event_scheduler::enum_state ret;
DBUG_ENTER("Event_scheduler::get_state");
/* lock_data & unlock_data are not static */
- pthread_mutex_lock(&singleton.LOCK_scheduler_data);
- ret= singleton.state;
- pthread_mutex_unlock(&singleton.LOCK_scheduler_data);
+ pthread_mutex_lock(singleton->LOCK_scheduler_data);
+ ret= singleton->state;
+ pthread_mutex_unlock(singleton->LOCK_scheduler_data);
DBUG_RETURN(ret);
}
@@ -2053,252 +1530,6 @@ Event_scheduler::initialized()
}
-/*
- Returns the number of elements in the queue
-
- SYNOPSIS
- Event_scheduler::events_count()
-
- RETURN VALUE
- 0 Number of Event_timed objects in the queue
-*/
-
-uint
-Event_scheduler::events_count()
-{
- uint n;
- DBUG_ENTER("Event_scheduler::events_count");
- LOCK_SCHEDULER_DATA();
- n= queue.elements;
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_RETURN(n);
-}
-
-
-
-
-/*
- Loads all ENABLED events from mysql.event into the prioritized
- queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_timed instances for every ENABLED event
- from mysql.event.
-
- SYNOPSIS
- Event_scheduler::load_events_from_db()
- thd - Thread context. Used for memory allocation in some cases.
-
- RETURN VALUE
- 0 OK
- !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
- EVEX_COMPILE_ERROR) - in all these cases mysql.event was
- tampered.
-
- NOTES
- Reports the error to the console
-*/
-
-int
-Event_scheduler::load_events_from_db(THD *thd)
-{
- TABLE *table;
- READ_RECORD read_record_info;
- int ret= -1;
- uint count= 0;
- bool clean_the_queue= FALSE;
- /* Compile the events on this root but only for syntax check, then discard */
- MEM_ROOT boot_root;
-
- DBUG_ENTER("Event_scheduler::load_events_from_db");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- if (state > COMMENCING)
- {
- DBUG_ASSERT(0);
- sql_print_error("SCHEDULER: Trying to load events while already running.");
- DBUG_RETURN(EVEX_GENERAL_ERROR);
- }
-
- if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
- {
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
- DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
- }
-
- init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
- init_read_record(&read_record_info, thd, table ,NULL,1,0);
- while (!(read_record_info.read_record(&read_record_info)))
- {
- Event_timed *et;
- if (!(et= new Event_timed))
- {
- DBUG_PRINT("info", ("Out of memory"));
- clean_the_queue= TRUE;
- break;
- }
- DBUG_PRINT("info", ("Loading event from row."));
-
- if ((ret= et->load_from_row(&scheduler_root, table)))
- {
- clean_the_queue= TRUE;
- sql_print_error("SCHEDULER: Error while loading from mysql.event. "
- "Table probably corrupted");
- break;
- }
- if (et->status != Event_timed::ENABLED)
- {
- DBUG_PRINT("info",("%s is disabled",et->name.str));
- delete et;
- continue;
- }
-
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- /* We load only on scheduler root just to check whether the body compiles */
- switch (ret= et->compile(thd, &boot_root)) {
- case EVEX_MICROSECOND_UNSUP:
- et->free_sp();
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- goto end;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
- et->dbname.str, et->name.str);
- goto end;
- default:
- /* Free it, it will be compiled again on the worker thread */
- et->free_sp();
- break;
- }
-
- /* let's find when to be executed */
- if (et->compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
- continue;
- }
-
- DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list."));
- queue_insert_safe(&queue, (byte *) et);
- count++;
- }
-end:
- end_read_record(&read_record_info);
- free_root(&boot_root, MYF(0));
-
- if (clean_the_queue)
- {
- for (count= 0; count < queue.elements; ++count)
- queue_remove(&queue, 0);
- ret= -1;
- }
- else
- {
- ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
- }
-
- /* Force close to free memory */
- thd->version--;
-
- close_thread_tables(thd);
-
- DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
- DBUG_RETURN(ret);
-}
-
-
-/*
- Opens mysql.db and mysql.user and checks whether:
- 1. mysql.db has column Event_priv at column 20 (0 based);
- 2. mysql.user has column Event_priv at column 29 (0 based);
-
- SYNOPSIS
- Event_scheduler::check_system_tables()
-*/
-
-bool
-Event_scheduler::check_system_tables(THD *thd)
-{
- TABLE_LIST tables;
- bool not_used;
- Open_tables_state backup;
- bool ret;
-
- DBUG_ENTER("Event_scheduler::check_system_tables");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- thd->reset_n_backup_open_tables_state(&backup);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "db";
- tables.lock_type= TL_READ;
-
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
- sql_print_error("Cannot open mysql.db");
- else
- {
- ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
- mysql_db_table_fields, &mysql_db_table_last_check,
- ER_CANNOT_LOAD_FROM_TABLE);
- close_thread_tables(thd);
- }
- if (ret)
- DBUG_RETURN(TRUE);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "user";
- tables.lock_type= TL_READ;
-
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
- sql_print_error("Cannot open mysql.db");
- else
- {
- if (tables.table->s->fields < 29 ||
- strncmp(tables.table->field[29]->field_name,
- STRING_WITH_LEN("Event_priv")))
- {
- sql_print_error("mysql.user has no `Event_priv` column at position 29");
- ret= TRUE;
- }
- close_thread_tables(thd);
- }
-
- thd->restore_backup_open_tables_state(&backup);
-
- DBUG_RETURN(ret);
-}
-
-
-/*
- Inits mutexes.
-
- SYNOPSIS
- Event_scheduler::init_mutexes()
-*/
-
-void
-Event_scheduler::init_mutexes()
-{
- pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST);
-}
-
-
-/*
- Destroys mutexes.
-
- SYNOPSIS
- Event_scheduler::destroy_mutexes()
-*/
-
-void
-Event_scheduler::destroy_mutexes()
-{
- pthread_mutex_destroy(&singleton.LOCK_scheduler_data);
-}
/*
@@ -2337,8 +1568,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("state"), scs);
- protocol->store(states_names[singleton.state].str,
- states_names[singleton.state].length,
+ protocol->store(states_names[singleton->state].str,
+ states_names[singleton->state].length,
scs);
ret= protocol->write();
@@ -2346,7 +1577,7 @@ Event_scheduler::dump_internal_status(THD *thd)
If not initialized - don't show anything else. get_instance()
will otherwise implicitly initialize it. We don't want that.
*/
- if (singleton.state >= INITIALIZED)
+ if (singleton->state >= INITIALIZED)
{
/* last locked at*/
/*
@@ -2357,8 +1588,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->store(STRING_WITH_LEN("last locked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
- singleton.mutex_last_locked_in_func,
- singleton.mutex_last_locked_at_line));
+ singleton->mutex_last_locked_in_func,
+ singleton->mutex_last_locked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2367,8 +1598,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->store(STRING_WITH_LEN("last unlocked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
- singleton.mutex_last_unlocked_in_func,
- singleton.mutex_last_unlocked_at_line));
+ singleton->mutex_last_unlocked_in_func,
+ singleton->mutex_last_unlocked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2378,8 +1609,8 @@ Event_scheduler::dump_internal_status(THD *thd)
tmp_string.length(scs->cset->
snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s",
- (singleton.cond_waiting_on != COND_NONE) ?
- cond_vars_names[singleton.cond_waiting_on]:
+ (singleton->cond_waiting_on != COND_NONE) ?
+ cond_vars_names[singleton->cond_waiting_on]:
"NONE"));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2396,7 +1627,7 @@ Event_scheduler::dump_internal_status(THD *thd)
/* queue.elements */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue.elements"), scs);
- int_string.set((longlong) scheduler->queue.elements, scs);
+ int_string.set((longlong) scheduler->events_count_no_lock(), scs);
protocol->store(&int_string);
ret= protocol->write();
@@ -2411,3 +1642,94 @@ Event_scheduler::dump_internal_status(THD *thd)
#endif
DBUG_RETURN(0);
}
+
+
+/*
+ Wrapper for pthread_mutex_lock
+
+ SYNOPSIS
+ Event_scheduler::lock_data()
+ mutex Mutex to lock
+ line The line number on which the lock is done
+
+ RETURN VALUE
+ Error code of pthread_mutex_lock()
+*/
+
+void
+Event_scheduler::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::lock_mutex");
+ DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
+ &LOCK_scheduler_data, func, line));
+ pthread_mutex_lock(LOCK_scheduler_data);
+ mutex_last_locked_in_func_name= func;
+ mutex_last_locked_at_line_nr= line;
+ mutex_scheduler_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_mutex_unlock
+
+ SYNOPSIS
+ Event_scheduler::unlock_data()
+ mutex Mutex to unlock
+ line The line number on which the unlock is done
+*/
+
+void
+Event_scheduler::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
+ DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
+ LOCK_scheduler_data, func, line));
+ mutex_last_unlocked_at_line_nr= line;
+ mutex_scheduler_data_locked= FALSE;
+ mutex_last_unlocked_in_func_name= func;
+ pthread_mutex_unlock(LOCK_scheduler_data);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_cond_wait
+
+ SYNOPSIS
+ Event_scheduler::cond_wait()
+ cond Conditional to wait for
+ mutex Mutex of the conditional
+
+ RETURN VALUE
+ Error code of pthread_cond_wait()
+*/
+
+int
+Event_scheduler::cond_wait(int cond, pthread_mutex_t *mutex)
+{
+ int ret;
+ DBUG_ENTER("Event_scheduler::cond_wait");
+ DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex));
+ ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex);
+ cond_waiting_on= COND_NONE;
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_scheduler::queue_changed()
+*/
+
+void
+Event_scheduler::queue_changed()
+{
+ DBUG_ENTER("Event_scheduler::queue_changed");
+ DBUG_PRINT("info", ("Sending COND_new_work"));
+ pthread_cond_signal(&cond_vars[COND_new_work]);
+ DBUG_VOID_RETURN;
+}