summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-05 17:12:50 +0200
committerunknown <andrey@lmy004.>2006-07-05 17:12:50 +0200
commitb9a7fe2757d9040296311e96a9e2740416d181f2 (patch)
treeb703fe1427d5c31cc528ae515b4514eeb6c9dce8
parenta5dfeb02e991e6e5e9e332443522de1bb4592df8 (diff)
downloadmariadb-git-b9a7fe2757d9040296311e96a9e2740416d181f2.tar.gz
WL#3337 (Event scheduler new architecture)
Cleaned up the code a bit. Fixed few leaks. This code still does not load events on server startup from disk. The problem is that there is a need for a THD instance, which does not exist during server boot. This will be solved soon. Still Event_timed is used both for the memory queue and for exectution. This will be changed according to WL#3337 probably in the next commit. sql/event_data_objects.cc: Strip unneeded stuff from class Event_timed Event_timed is still used for the queue and execution. That will be changed in the next commit. sql/event_data_objects.h: Strip unneeded stuff from class Event_timed Event_timed is still used for the queue and execution. That will be changed in the next commit. sql/event_db_repository.cc: Cosmetics. Add a new method load_named_event_job, to be made complete in the next commit. It will load from disk an instance of Event_job_data to be used during execution. sql/event_db_repository.h: find_event does not need MEM_ROOT anymore because the memory is allocated on Event's own root. sql/event_queue.cc: Remove dead code. Move dumping of the queue to separate method. Make critical sections in create_event & update_event as small as possible - load the new event outside of the section and free the object also outside of it. sql/event_queue.h: init -> init_queue -> easier for ctags deinit -> deinit_queue -> easier for ctags sql/event_scheduler.cc: empty this file sql/event_scheduler.h: empty this file sql/event_scheduler_ng.cc: add back DBUG_RETURN(0) in thread handlers. We don't stop running events when stopping the scheduler. Therefore remove this method now. If it is needed later it can be added back. sql/event_scheduler_ng.h: Remove stop_all_running_threads() init -> init_scheduler deinit -> deinit_scheduler easier for ctags sql/events.cc: Cosmetics sql/events.h: Cosmetics sql/set_var.cc: Remove references to dead code sql/sql_parse.cc: Reorganize a bit.
-rw-r--r--sql/event_data_objects.cc195
-rw-r--r--sql/event_data_objects.h25
-rw-r--r--sql/event_db_repository.cc84
-rw-r--r--sql/event_db_repository.h12
-rw-r--r--sql/event_queue.cc249
-rw-r--r--sql/event_queue.h28
-rw-r--r--sql/event_scheduler.cc1537
-rw-r--r--sql/event_scheduler.h187
-rw-r--r--sql/event_scheduler_ng.cc104
-rw-r--r--sql/event_scheduler_ng.h8
-rw-r--r--sql/events.cc58
-rw-r--r--sql/events.h8
-rw-r--r--sql/set_var.cc4
-rw-r--r--sql/sql_parse.cc31
14 files changed, 243 insertions, 2287 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index 97db443e08d..0e984bd4c7a 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -530,18 +530,14 @@ Event_parse_data::init_ends(THD *thd, Item *new_ends)
Event_timed::Event_timed()
*/
-Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
- running(0), thread_id(0), status_changed(false),
+Event_timed::Event_timed():status_changed(false),
last_executed_changed(false), expression(0),
created(0), modified(0),
on_completion(Event_timed::ON_COMPLETION_DROP),
status(Event_timed::ENABLED), sphead(0),
- sql_mode(0), dropped(false),
- free_sphead_on_delete(true), flags(0)
+ sql_mode(0), dropped(false), flags(0)
{
- pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&this->COND_finished, NULL);
init();
}
@@ -555,46 +551,8 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
Event_timed::~Event_timed()
{
- deinit_mutexes();
free_root(&mem_root, MYF(0));
-
- if (free_sphead_on_delete)
- free_sp();
-}
-
-
-/*
- Destructor
-
- SYNOPSIS
- Event_timed::deinit_mutexes()
-*/
-
-void
-Event_timed::deinit_mutexes()
-{
- pthread_mutex_destroy(&this->LOCK_running);
- pthread_cond_destroy(&this->COND_finished);
-}
-
-
-/*
- Checks whether the event is running
-
- SYNOPSIS
- Event_timed::is_running()
-*/
-
-bool
-Event_timed::is_running()
-{
- bool ret;
-
- VOID(pthread_mutex_lock(&this->LOCK_running));
- ret= running;
- VOID(pthread_mutex_unlock(&this->LOCK_running));
-
- return ret;
+ free_sp();
}
@@ -1253,7 +1211,7 @@ Event_timed::update_fields(THD *thd)
Open_tables_state backup;
int ret;
- DBUG_ENTER("Event_timed::update_time_fields");
+ DBUG_ENTER("Event_timed::update_fields");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
@@ -1382,7 +1340,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
Executes the event (the underlying sp_head object);
SYNOPSIS
- evex_fill_row()
+ Event_timed::execute()
thd THD
mem_root If != NULL use it to compile the event on it
@@ -1607,149 +1565,6 @@ done:
}
-extern pthread_attr_t connection_attrib;
-
-/*
- Checks whether is possible and forks a thread. Passes self as argument.
-
- RETURN VALUE
- EVENT_EXEC_STARTED OK
- EVENT_EXEC_ALREADY_EXEC Thread not forked, already working
- EVENT_EXEC_CANT_FORK Unable to spawn thread (error)
-*/
-
-int
-Event_timed::spawn_now(void * (*thread_func)(void*), void *arg)
-{
- THD *thd= current_thd;
- int ret= EVENT_EXEC_STARTED;
- DBUG_ENTER("Event_timed::spawn_now");
- DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str));
-
- VOID(pthread_mutex_lock(&this->LOCK_running));
-
- DBUG_PRINT("info", ("SCHEDULER: execute_at of %s is %lld", name.str,
- TIME_to_ulonglong_datetime(&execute_at)));
- mark_last_executed(thd);
- if (compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
- "Disabling after execution.", dbname.str, name.str);
- status= DISABLED;
- }
- DBUG_PRINT("evex manager", ("[%10s] next exec at [%llu]", name.str,
- TIME_to_ulonglong_datetime(&execute_at)));
- /*
- 1. For one-time event : year is > 0 and expression is 0
- 2. For recurring, expression is != -=> check execute_at_null in this case
- */
- if ((execute_at.year && !expression) || execute_at_null)
- {
- sql_print_information("SCHEDULER: [%s.%s of %s] no more executions "
- "after this one", dbname.str, name.str,
- definer.str);
- flags |= EVENT_EXEC_NO_MORE | EVENT_FREE_WHEN_FINISHED;
- }
-
- update_fields(thd);
-
- if (!in_spawned_thread)
- {
- pthread_t th;
- in_spawned_thread= true;
-
- if (pthread_create(&th, &connection_attrib, thread_func, arg))
- {
- DBUG_PRINT("info", ("problem while spawning thread"));
- ret= EVENT_EXEC_CANT_FORK;
- in_spawned_thread= false;
- }
- }
- else
- {
- DBUG_PRINT("info", ("already in spawned thread. skipping"));
- ret= EVENT_EXEC_ALREADY_EXEC;
- }
- VOID(pthread_mutex_unlock(&this->LOCK_running));
-
- DBUG_RETURN(ret);
-}
-
-
-bool
-Event_timed::spawn_thread_finish(THD *thd)
-{
- bool should_free;
- DBUG_ENTER("Event_timed::spawn_thread_finish");
- VOID(pthread_mutex_lock(&LOCK_running));
- in_spawned_thread= false;
- DBUG_PRINT("info", ("Sending COND_finished for thread %d", thread_id));
- thread_id= 0;
- if (dropped)
- drop(thd);
- pthread_cond_broadcast(&COND_finished);
- should_free= flags & EVENT_FREE_WHEN_FINISHED;
- VOID(pthread_mutex_unlock(&LOCK_running));
- DBUG_RETURN(should_free);
-}
-
-
-/*
- Kills a running event
- SYNOPSIS
- Event_timed::kill_thread()
-
- RETURN VALUE
- 0 OK
- -1 EVEX_CANT_KILL
- !0 Error
-*/
-
-int
-Event_timed::kill_thread(THD *thd)
-{
- int ret= 0;
- DBUG_ENTER("Event_timed::kill_thread");
- pthread_mutex_lock(&LOCK_running);
- DBUG_PRINT("info", ("thread_id=%lu", thread_id));
-
- if (thread_id == thd->thread_id)
- {
- /*
- We don't kill ourselves in cases like :
- alter event e_43 do alter event e_43 do set @a = 4 because
- we will never receive COND_finished.
- */
- DBUG_PRINT("info", ("It's not safe to kill ourselves in self altering queries"));
- ret= EVEX_CANT_KILL;
- }
- else if (thread_id && !(ret= kill_one_thread(thd, thread_id, false)))
- {
- thd->enter_cond(&COND_finished, &LOCK_running, "Waiting for finished");
- DBUG_PRINT("info", ("Waiting for COND_finished from thread %d", thread_id));
- while (thread_id)
- pthread_cond_wait(&COND_finished, &LOCK_running);
-
- DBUG_PRINT("info", ("Got COND_finished"));
- /* This will implicitly unlock LOCK_running. Hence we return before that */
- thd->exit_cond("");
-
- DBUG_RETURN(0);
- }
- else if (!thread_id && in_spawned_thread)
- {
- /*
- Because the manager thread waits for the forked thread to update thread_id
- this situation is impossible.
- */
- DBUG_ASSERT(0);
- }
- pthread_mutex_unlock(&LOCK_running);
- DBUG_PRINT("exit", ("%d", ret));
- DBUG_RETURN(ret);
-}
-
-
/*
Checks whether two events have the same name
diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h
index 5ae5c7e81ab..a5aaf0e66fa 100644
--- a/sql/event_data_objects.h
+++ b/sql/event_data_objects.h
@@ -63,12 +63,6 @@ class Event_timed
{
Event_timed(const Event_timed &); /* Prevent use of these */
void operator=(Event_timed &);
- my_bool in_spawned_thread;
- ulong locked_by_thread_id;
- my_bool running;
- ulong thread_id;
- pthread_mutex_t LOCK_running;
- pthread_cond_t COND_finished;
bool status_changed;
bool last_executed_changed;
@@ -118,7 +112,6 @@ public:
ulong sql_mode;
bool dropped;
- bool free_sphead_on_delete;
uint flags;//all kind of purposes
static void *operator new(size_t size)
@@ -146,9 +139,6 @@ public:
void
init();
- void
- deinit_mutexes();
-
int
load_from_row(TABLE *table);
@@ -172,24 +162,9 @@ public:
int
compile(THD *thd, MEM_ROOT *mem_root);
-
- bool
- is_running();
-
- int
- spawn_now(void * (*thread_func)(void*), void *arg);
-
- bool
- spawn_thread_finish(THD *thd);
void
free_sp();
-
- int
- kill_thread(THD *thd);
-
- void
- set_thread_id(ulong tid) { thread_id= tid; }
};
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc
index 074e05e5d8f..a7daed113bb 100644
--- a/sql/event_db_repository.cc
+++ b/sql/event_db_repository.cc
@@ -207,7 +207,7 @@ evex_fill_row(THD *thd, TABLE *table, Event_parse_data *et, my_bool is_update)
table->field[ET_FIELD_STARTS]->set_notnull();
table->field[ET_FIELD_STARTS]->
store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME);
- }
+ }
if (!et->ends_null)
{
@@ -374,8 +374,7 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
ret= read_record_info.read_record(&read_record_info);
if (ret == 0)
ret= copy_event_to_schema_table(thd, schema_table, event_table);
- }
- while (ret == 0);
+ } while (ret == 0);
DBUG_PRINT("info", ("Scan finished. ret=%d", ret));
end_read_record(&read_record_info);
@@ -464,8 +463,7 @@ Event_db_repository::fill_schema_events(THD *thd, TABLE_LIST *tables, char *db)
int
Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- Event_timed **ett,
- TABLE *tbl, MEM_ROOT *root)
+ Event_timed **ett, TABLE *tbl)
{
TABLE *table;
int ret;
@@ -505,7 +503,7 @@ done:
if (ret)
{
delete et;
- et= 0;
+ et= NULL;
}
/* don't close the table if we haven't opened it ourselves */
if (!tbl && table)
@@ -518,7 +516,6 @@ done:
int
Event_db_repository::init_repository()
{
- init_alloc_root(&repo_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
return 0;
}
@@ -526,7 +523,6 @@ Event_db_repository::init_repository()
void
Event_db_repository::deinit_repository()
{
- free_root(&repo_root, MYF(0));
}
@@ -731,7 +727,8 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
parse_data->name.str));
DBUG_PRINT("info", ("check existance of an event with the same name"));
- if (!evex_db_find_event_by_name(thd, parse_data->dbname, parse_data->name, table))
+ if (!evex_db_find_event_by_name(thd, parse_data->dbname,
+ parse_data->name, table))
{
if (create_if_not)
{
@@ -1026,14 +1023,12 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
*/
if (db.length > table->field[ET_FIELD_DB]->field_length ||
name.length > table->field[ET_FIELD_NAME]->field_length)
-
DBUG_RETURN(EVEX_KEY_NOT_FOUND);
table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin);
table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin);
- key_copy(key, table->record[0], table->key_info,
- table->key_info->key_length);
+ key_copy(key, table->record[0], table->key_info, table->key_info->key_length);
if (table->file->index_read_idx(table->record[0], 0, key,
table->key_info->key_length,
@@ -1125,7 +1120,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
the table, compiles and inserts it into the cache.
SYNOPSIS
- Event_scheduler::load_named_event()
+ Event_db_repository::load_named_event_timed()
thd THD
etn The name of the event to load and compile on scheduler's root
etn_new The loaded event
@@ -1136,20 +1131,21 @@ Event_db_repository::drop_events_by_field(THD *thd,
*/
int
-Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- Event_timed **etn_new)
+Event_db_repository::load_named_event_timed(THD *thd, LEX_STRING dbname,
+ LEX_STRING name,
+ Event_timed **etn_new)
{
int ret= 0;
MEM_ROOT *tmp_mem_root;
Event_timed *et_loaded= NULL;
Open_tables_state backup;
- DBUG_ENTER("Event_db_repository::load_named_event");
+ DBUG_ENTER("Event_db_repository::load_named_event_timed");
DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup);
/* No need to use my_error() here because db_find_event() has done it */
- ret= find_event(thd, dbname, name, &et_loaded, NULL, &repo_root);
+ ret= find_event(thd, dbname, name, &et_loaded, NULL);
thd->restore_backup_open_tables_state(&backup);
/* In this case no memory was allocated so we don't need to clean */
if (ret)
@@ -1171,3 +1167,57 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK);
}
+
+
+/*
+ Looks for a named event in mysql.event and then loads it from
+ the table, compiles and inserts it into the cache.
+
+ SYNOPSIS
+ Event_db_repository::load_named_event_job()
+ thd THD
+ etn The name of the event to load and compile on scheduler's root
+ etn_new The loaded event
+
+ RETURN VALUE
+ NULL Error during compile or the event is non-enabled.
+ otherwise Address
+*/
+
+int
+Event_db_repository::load_named_event_job(THD *thd, LEX_STRING dbname,
+ LEX_STRING name,
+ Event_job_data **etn_new)
+{
+ int ret= 0;
+ MEM_ROOT *tmp_mem_root;
+ Event_timed *et_loaded= NULL;
+ Open_tables_state backup;
+
+ DBUG_ENTER("Event_db_repository::load_named_event_job");
+ DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
+#if 0
+ thd->reset_n_backup_open_tables_state(&backup);
+ /* No need to use my_error() here because db_find_event() has done it */
+ ret= find_event(thd, dbname, name, &et_loaded, NULL);
+ thd->restore_backup_open_tables_state(&backup);
+ /* In this case no memory was allocated so we don't need to clean */
+ if (ret)
+ DBUG_RETURN(OP_LOAD_ERROR);
+
+ if (et_loaded->status != Event_timed::ENABLED)
+ {
+ /*
+ We don't load non-enabled events.
+ In db_find_event() `et_new` was allocated on the heap and not on
+ scheduler_root therefore we delete it here.
+ */
+ delete et_loaded;
+ DBUG_RETURN(OP_DISABLED_EVENT);
+ }
+
+ et_loaded->compute_next_execution_time();
+ *etn_new= et_loaded;
+#endif
+ DBUG_RETURN(OP_OK);
+}
diff --git a/sql/event_db_repository.h b/sql/event_db_repository.h
index e1c64c8aded..c000247cff2 100644
--- a/sql/event_db_repository.h
+++ b/sql/event_db_repository.h
@@ -56,6 +56,7 @@ fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */);
class Event_timed;
class Event_parse_data;
class Event_queue_element;
+class Event_job_data;
class Event_db_repository
{
@@ -88,10 +89,15 @@ public:
int
find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **ett,
- TABLE *tbl, MEM_ROOT *root);
+ TABLE *tbl);
int
- load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **etn_new);
+ load_named_event_timed(THD *thd, LEX_STRING dbname, LEX_STRING name,
+ Event_timed **etn_new);
+
+ int
+ load_named_event_job(THD *thd, LEX_STRING dbname, LEX_STRING name,
+ Event_job_data **etn_new);
int
find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
@@ -116,8 +122,6 @@ private:
static bool
check_system_tables(THD *thd);
- MEM_ROOT repo_root;
-
/* Prevent use of these */
Event_db_repository(const Event_db_repository &);
void operator=(Event_db_repository &);
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 44920b29c16..b42372ba6bd 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -90,34 +90,29 @@ Event_queue::Event_queue()
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
+ OP_ALREADY_EXISTS Event already in the queue
*/
int
-Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
+Event_queue::create_event(THD *thd, Event_parse_data *et)
{
int res;
Event_timed *et_new;
DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue));
+ res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new);
LOCK_QUEUE_DATA();
- if (check_existence && find_event(et->dbname, et->name, FALSE))
- {
- res= OP_ALREADY_EXISTS;
- goto end;
- }
-
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
+ if (!res)
{
DBUG_PRINT("info", ("new event in the queue %p", et_new));
queue_insert_safe(&queue, (byte *) et_new);
- on_queue_change();
+ notify_observers();
}
else if (res == OP_DISABLED_EVENT)
res= OP_OK;
-end:
UNLOCK_QUEUE_DATA();
+
DBUG_RETURN(res);
}
@@ -129,104 +124,54 @@ end:
Event_queue::update_event()
thd Thread
et The event to replace(add) into the queue
- new_schema New schema
- new_name New name
+ new_schema New schema, in case of RENAME TO
+ new_name New name, in case of RENAME TO
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
- OP_ALREADY_EXISTS Event already in the queue
*/
int
Event_queue::update_event(THD *thd, Event_parse_data *et,
- LEX_STRING *new_schema,
- LEX_STRING *new_name)
+ LEX_STRING *new_schema, LEX_STRING *new_name)
{
- int res= OP_OK;
- Event_timed *et_old, *et_new= NULL;
- LEX_STRING old_schema, old_name;
-
- LINT_INIT(old_schema.str);
- LINT_INIT(old_schema.length);
- LINT_INIT(old_name.str);
- LINT_INIT(old_name.length);
+ int res;
+ Event_timed *et_old= NULL, *et_new= NULL;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
thd, et, et->dbname.str, et->name.str, &LOCK_event_queue));
+ res= db_repository->
+ load_named_event_timed(thd, new_schema?*new_schema:et->dbname,
+ new_name? *new_name:et->name, &et_new);
+
+ if (res && res != OP_DISABLED_EVENT)
+ goto end;
+
LOCK_QUEUE_DATA();
if (!(et_old= find_event(et->dbname, et->name, TRUE)))
+ {
DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
et->dbname.str, et->name.str));
-
- if (new_schema && new_name)
- {
- old_schema= et->dbname;
- old_name= et->name;
- et->dbname= *new_schema;
- et->name= *new_name;
}
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
+ if (!res)
{
DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
queue_insert_safe(&queue, (byte *) et_new);
- on_queue_change();
}
else if (res == OP_DISABLED_EVENT)
res= OP_OK;
-
- if (new_schema && new_name)
- {
- et->dbname= old_schema;
- et->name= old_name;
- }
- DBUG_PRINT("info", ("res=%d", res));
UNLOCK_QUEUE_DATA();
- /*
- Andrey: Is this comment still truthful ???
-
- We don't move this code above because a potential kill_thread will call
- THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
- which is LOCK_event_queue on which the COND_new_work in ::run() locks.
- Hence, we try to acquire a lock which we have already acquired and we run
- into an assert. Holding LOCK_event_queue however is not needed because
- we don't touch any invariant of the scheduler anymore. ::drop_event() does
- the same.
- */
- if (et_old)
- {
- switch (et_old->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
+ notify_observers();
+
+ if (et_old)
+ delete et_old;
+end:
+ DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
}
@@ -256,40 +201,13 @@ Event_queue::drop_event(THD *thd, sp_name *name)
LOCK_QUEUE_DATA();
if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
DBUG_PRINT("info", ("No such event found, probably DISABLED"));
-
UNLOCK_QUEUE_DATA();
-
- /* See comments in ::replace_event() why this is split in two parts. */
if (et_old)
- {
- switch ((res= et_old->kill_thread(thd))) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- sql_print_error("SCHEDULER: Got unexpected error %d", res);
- DBUG_ASSERT(0);
- }
- }
+ delete et_old;
+ /*
+ We don't signal here because the scheduler will catch the change
+ next time it wakes up.
+ */
DBUG_RETURN(FALSE);
}
@@ -361,7 +279,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
- uint i= 0, dropped= 0;
+ uint i= 0;
while (i < queue.elements)
{
Event_timed *et= (Event_timed *) queue_element(&queue, i);
@@ -375,32 +293,22 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
counter and the (i < queue.elements) condition is ok.
*/
queue_remove(&queue, i);
-
- /* See replace_event() */
- switch (et->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et->flags |= EVENT_FREE_WHEN_FINISHED;
- ++dropped;
- break;
- case 0:
- delete et;
- ++dropped;
- break;
- default:
- DBUG_ASSERT(0);
- }
+ delete et;
}
else
i++;
}
- DBUG_PRINT("info", ("Dropped %lu", dropped));
/*
- Don't send COND_new_work because no need to wake up the scheduler thread.
- When it wakes next time up it will recalculate how much more it should
- sleep if the top of the queue has been changed by this method.
+ We don't call notify_observers() . If we remove the top event:
+ 1. The queue is empty. The scheduler will wake up at some time and realize
+ that the queue is empty. If create_event() comes inbetween it will
+ signal the scheduler
+ 2. The queue is not empty, but the next event after the previous top, won't
+ be executed any time sooner than the element we removed. Hence, we may
+ not notify the scheduler and it will realize the change when it
+ wakes up from timedwait.
*/
-
+
DBUG_VOID_RETURN;
}
@@ -418,16 +326,14 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
>=0 Number of dropped events
*/
-int
+void
Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
{
- int ret;
DBUG_ENTER("Event_queue::drop_schema_events");
LOCK_QUEUE_DATA();
drop_matching_events(thd, schema, event_timed_db_equal);
UNLOCK_QUEUE_DATA();
-
- DBUG_RETURN(ret);
+ DBUG_VOID_RETURN;
}
@@ -744,13 +650,13 @@ Event_queue::deinit_mutexes()
its state.
SYNOPSIS
- Event_queue::on_queue_change()
+ Event_queue::notify_observers()
*/
void
-Event_queue::on_queue_change()
+Event_queue::notify_observers()
{
- DBUG_ENTER("Event_queue::on_queue_change");
+ DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
@@ -761,7 +667,7 @@ Event_queue::on_queue_change()
The implementation of full-fledged initialization.
SYNOPSIS
- Event_scheduler::init()
+ Event_queue::init()
RETURN VALUE
FALSE OK
@@ -769,15 +675,16 @@ Event_queue::on_queue_change()
*/
bool
-Event_queue::init(Event_db_repository *db_repo)
+Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
{
int i= 0;
bool ret= FALSE;
- DBUG_ENTER("Event_queue::init");
+ DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this=%p", this));
LOCK_QUEUE_DATA();
db_repository= db_repo;
+ scheduler= sched;
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_timed_compare_q, NULL, 30 /*auto_extent*/))
@@ -803,9 +710,9 @@ end:
void
-Event_queue::deinit()
+Event_queue::deinit_queue()
{
- DBUG_ENTER("Event_queue::deinit");
+ DBUG_ENTER("Event_queue::deinit_queue");
LOCK_QUEUE_DATA();
empty_queue();
@@ -833,6 +740,8 @@ void
Event_queue::empty_queue()
{
uint i;
+ DBUG_ENTER("Event_queue::empty_queue");
+ DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
/* empty the queue */
for (i= 0; i < events_count_no_lock(); ++i)
{
@@ -840,6 +749,7 @@ Event_queue::empty_queue()
delete et;
}
resize_queue(&queue, 0);
+ DBUG_VOID_RETURN;
}
@@ -864,6 +774,29 @@ Event_queue::top_changed()
}
+inline void
+Event_queue::dbug_dump_queue(time_t now)
+{
+#ifndef DBUG_OFF
+ Event_timed *et;
+ uint i;
+ DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
+ for (i = 0; i < queue.elements; i++)
+ {
+ et= ((Event_timed*)queue_element(&queue, i));
+ DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
+ DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
+ " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
+ TIME_to_ulonglong_datetime(&et->execute_at),
+ TIME_to_ulonglong_datetime(&et->starts),
+ TIME_to_ulonglong_datetime(&et->ends),
+ et->expression, sec_since_epoch_TIME(&et->execute_at), now,
+ (int)(sec_since_epoch_TIME(&et->execute_at) - now),
+ sec_since_epoch_TIME(&et->execute_at) <= now));
+ }
+#endif
+}
+
Event_timed *
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
struct timespec *abstime)
@@ -876,36 +809,22 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
LOCK_QUEUE_DATA();
do {
int res;
- Event_timed *et= NULL;
if (!queue.elements)
{
abstime->tv_sec= 0;
break;
}
- int i;
- DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
- for (i = 0; i < queue.elements; i++)
- {
- et= ((Event_timed*)queue_element(&queue, i));
- DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
- DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
- " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
- TIME_to_ulonglong_datetime(&et->execute_at),
- TIME_to_ulonglong_datetime(&et->starts),
- TIME_to_ulonglong_datetime(&et->ends),
- et->expression, sec_since_epoch_TIME(&et->execute_at), now,
- (int)(sec_since_epoch_TIME(&et->execute_at) - now),
- sec_since_epoch_TIME(&et->execute_at) <= now));
- }
- et= ((Event_timed*)queue_element(&queue, 0));
+ dbug_dump_queue(now);
+
+ Event_timed *et= ((Event_timed*)queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
if (top_time.tv_sec <= now)
{
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
- if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
- &et_new)))
+ if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name,
+ &et_new)))
{
DBUG_ASSERT(0);
break;
diff --git a/sql/event_queue.h b/sql/event_queue.h
index 1335100be21..d253e3c7597 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -38,15 +38,15 @@ public:
deinit_mutexes();
bool
- init(Event_db_repository *db_repo);
+ init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched);
void
- deinit();
+ deinit_queue();
/* Methods for queue management follow */
int
- create_event(THD *thd, Event_parse_data *et, bool check_existence);
+ create_event(THD *thd, Event_parse_data *et);
int
update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
@@ -55,13 +55,9 @@ public:
bool
drop_event(THD *thd, sp_name *name);
- int
+ void
drop_schema_events(THD *thd, LEX_STRING schema);
- int
- drop_user_events(THD *thd, LEX_STRING *definer)
- { DBUG_ASSERT(0); return 0;}
-
uint
events_count();
@@ -89,7 +85,7 @@ public:
void
top_changed();
-///////////////protected
+protected:
Event_timed *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
@@ -105,8 +101,6 @@ public:
Event_db_repository *db_repository;
- /* The sorted queue with the Event_timed objects */
- QUEUE queue;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
@@ -122,10 +116,16 @@ public:
unlock_data(const char *func, uint line);
void
- on_queue_change();
-
+ notify_observers();
+
+ void
+ dbug_dump_queue(time_t now);
+
Event_scheduler_ng *scheduler;
-protected:
+
+//public:
+ /* The sorted queue with the Event_timed objects */
+ QUEUE queue;
};
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index 3a9b988f92e..f1c7d8394e3 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -14,1540 +14,3 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "mysql_priv.h"
-#include "events.h"
-#include "event_data_objects.h"
-#include "event_scheduler.h"
-#include "event_db_repository.h"
-#include "sp_head.h"
-#include "event_queue.h"
-
-
-#ifdef __GNUC__
-#if __GNUC__ >= 2
-#define SCHED_FUNC __FUNCTION__
-#endif
-#else
-#define SCHED_FUNC "<unknown>"
-#endif
-
-#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
-#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
-
-
-Event_scheduler*
-Event_scheduler::singleton= NULL;
-
-
-#ifndef DBUG_OFF
-static
-LEX_STRING states_names[] =
-{
- {(char*) STRING_WITH_LEN("UNINITIALIZED")},
- {(char*) STRING_WITH_LEN("INITIALIZED")},
- {(char*) STRING_WITH_LEN("COMMENCING")},
- {(char*) STRING_WITH_LEN("CANTSTART")},
- {(char*) STRING_WITH_LEN("RUNNING")},
- {(char*) STRING_WITH_LEN("SUSPENDED")},
- {(char*) STRING_WITH_LEN("IN_SHUTDOWN")}
-};
-#endif
-
-const char * const
-Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] =
-{
- "new work",
- "started or stopped",
- "suspend or resume"
-};
-
-
-/*
-Event_scheduler*
-Event_scheduler::singleton= NULL;
-*/
-
-
-
-class Worker_thread_param
-{
-public:
- Event_timed *et;
- pthread_mutex_t LOCK_started;
- pthread_cond_t COND_started;
- bool started;
-
- Worker_thread_param(Event_timed *etn):et(etn), started(FALSE)
- {
- pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&COND_started, NULL);
- }
-
- ~Worker_thread_param()
- {
- pthread_mutex_destroy(&LOCK_started);
- pthread_cond_destroy(&COND_started);
- }
-};
-
-
-/*
- Prints the stack of infos, warnings, errors from thd to
- the console so it can be fetched by the logs-into-tables and
- checked later.
-
- SYNOPSIS
- evex_print_warnings
- thd - thread used during the execution of the event
- et - the event itself
-*/
-
-static void
-evex_print_warnings(THD *thd, Event_timed *et)
-{
- MYSQL_ERROR *err;
- DBUG_ENTER("evex_print_warnings");
- if (!thd->warn_list.elements)
- DBUG_VOID_RETURN;
-
- char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
- char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
- String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
- prefix.length(0);
- prefix.append("SCHEDULER: [");
-
- append_identifier(thd, &prefix, et->definer_user.str, et->definer_user.length);
- prefix.append('@');
- append_identifier(thd, &prefix, et->definer_host.str, et->definer_host.length);
- prefix.append("][", 2);
- append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
- prefix.append('.');
- append_identifier(thd,&prefix, et->name.str, et->name.length);
- prefix.append("] ", 2);
-
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
- while ((err= it++))
- {
- String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
- /* set it to 0 or we start adding at the end. That's the trick ;) */
- err_msg.length(0);
- err_msg.append(prefix);
- err_msg.append(err->msg, strlen(err->msg), system_charset_info);
- err_msg.append("]");
- DBUG_ASSERT(err->level < 3);
- (sql_print_message_handlers[err->level])("%*s", err_msg.length(),
- err_msg.c_ptr());
- }
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Inits an scheduler thread handler, both the main and a worker
-
- SYNOPSIS
- init_event_thread()
- thd - the THD of the thread. Has to be allocated by the caller.
-
- NOTES
- 1. The host of the thead is my_localhost
- 2. thd->net is initted with NULL - no communication.
-
- RETURN VALUE
- 0 OK
- -1 Error
-*/
-
-static int
-init_event_thread(THD** t, enum enum_thread_type thread_type)
-{
- THD *thd= *t;
- thd->thread_stack= (char*)t; // remember where our stack is
- DBUG_ENTER("init_event_thread");
- thd->client_capabilities= 0;
- thd->security_ctx->master_access= 0;
- thd->security_ctx->db_access= 0;
- thd->security_ctx->host_or_ip= (char*)my_localhost;
- my_net_init(&thd->net, 0);
- thd->net.read_timeout= slave_net_timeout;
- thd->slave_thread= 0;
- thd->options|= OPTION_AUTO_IS_NULL;
- thd->client_capabilities|= CLIENT_MULTI_RESULTS;
- thd->real_id=pthread_self();
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->thread_id= thread_id++;
- threads.append(thd);
- thread_count++;
- thread_running++;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
-
- if (init_thr_lock() || thd->store_globals())
- {
- thd->cleanup();
- DBUG_RETURN(-1);
- }
-
-#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
- sigset_t set;
- VOID(sigemptyset(&set)); // Get mask in use
- VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
-#endif
-
- /*
- Guarantees that we will see the thread in SHOW PROCESSLIST though its
- vio is NULL.
- */
- thd->system_thread= thread_type;
-
- thd->proc_info= "Initialized";
- thd->version= refresh_version;
- thd->set_time();
-
- DBUG_RETURN(0);
-}
-
-
-/*
- Inits the main scheduler thread and then calls Event_scheduler::run()
- of arg.
-
- SYNOPSIS
- event_scheduler_thread()
- arg void* ptr to Event_scheduler
-
- NOTES
- 1. The host of the thead is my_localhost
- 2. thd->net is initted with NULL - no communication.
- 3. The reason to have a proxy function is that it's not possible to
- use a method as function to be executed in a spawned thread:
- - our pthread_hander_t macro uses extern "C"
- - separating thread setup from the real execution loop is also to be
- considered good.
-
- RETURN VALUE
- 0 OK
-*/
-
-pthread_handler_t
-event_scheduler_thread(void *arg)
-{
- /* needs to be first for thread_stack */
- THD *thd= NULL;
- Event_scheduler *scheduler= (Event_scheduler *) arg;
-
- DBUG_ENTER("event_scheduler_thread");
-
- my_thread_init();
- pthread_detach_this_thread();
-
- /* note that constructor of THD uses DBUG_ ! */
- if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_SCHEDULER))
- {
- sql_print_error("SCHEDULER: Cannot init manager event thread.");
- scheduler->report_error_during_start();
- }
- else
- {
- thd->security_ctx->set_user((char*)"event_scheduler");
-
- sql_print_information("SCHEDULER: Manager thread booting");
- if (Event_scheduler::get_instance()->event_queue->check_system_tables(thd))
- scheduler->report_error_during_start();
- else
- scheduler->run(thd);
-
- /*
- NOTE: Don't touch `scheduler` after this point because we have notified
- the
- thread which shuts us down that we have finished cleaning. In this
- very moment a new scheduler thread could be started and a crash is
- not welcome.
- */
- }
-
- /*
- If we cannot create THD then don't decrease because we haven't touched
- thread_count and thread_running in init_event_thread() which was never
- called. In init_event_thread() thread_count and thread_running are
- always increased even in the case the method returns an error.
- */
- if (thd)
- {
- thd->proc_info= "Clearing";
- DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net);
- pthread_mutex_lock(&LOCK_thread_count);
- thread_count--;
- thread_running--;
- delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
- }
- my_thread_end();
- DBUG_RETURN(0); // Can't return anything here
-}
-
-
-/*
- Function that executes an event in a child thread. Setups the
- environment for the event execution and cleans after that.
-
- SYNOPSIS
- event_worker_thread()
- arg The Event_timed object to be processed
-
- RETURN VALUE
- 0 OK
-*/
-
-pthread_handler_t
-event_worker_thread(void *arg)
-{
- THD *thd; /* needs to be first for thread_stack */
- Worker_thread_param *param= (Worker_thread_param *) arg;
- Event_timed *event= param->et;
- int ret;
- bool startup_error= FALSE;
- Security_context *save_ctx;
- /* this one is local and not needed after exec */
- Security_context security_ctx;
-
- DBUG_ENTER("event_worker_thread");
- DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str));
-
- my_thread_init();
- pthread_detach_this_thread();
-
- if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_WORKER))
- {
- sql_print_error("SCHEDULER: Startup failure.");
- startup_error= TRUE;
- event->spawn_thread_finish(thd);
- }
- else
- event->set_thread_id(thd->thread_id);
-
- DBUG_PRINT("info", ("master_access=%d db_access=%d",
- thd->security_ctx->master_access, thd->security_ctx->db_access));
- /*
- If we don't change it before we send the signal back, then an intermittent
- DROP EVENT will take LOCK_scheduler_data and try to kill this thread, because
- event->thread_id is already real. However, because thd->security_ctx->user
- is not initialized then a crash occurs in kill_one_thread(). Thus, we have
- to change the context before sending the signal. We are under
- LOCK_scheduler_data being held by Event_scheduler::run() -> ::execute_top().
- */
- thd->change_security_context(event->definer_user, event->definer_host,
- event->dbname, &security_ctx, &save_ctx);
- DBUG_PRINT("info", ("master_access=%d db_access=%d",
- thd->security_ctx->master_access, thd->security_ctx->db_access));
-
- /* Signal the scheduler thread that we have started successfully */
- pthread_mutex_lock(&param->LOCK_started);
- param->started= TRUE;
- pthread_cond_signal(&param->COND_started);
- pthread_mutex_unlock(&param->LOCK_started);
-
- if (!startup_error)
- {
- thd->init_for_queries();
- thd->enable_slow_log= TRUE;
-
- event->set_thread_id(thd->thread_id);
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id);
-
- ret= event->execute(thd, thd->mem_root);
- evex_print_warnings(thd, event);
- sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d",
- event->dbname.str, event->name.str,
- event->definer.str, ret);
- if (ret == EVEX_COMPILE_ERROR)
- sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
- event->dbname.str, event->name.str,
- event->definer.str);
- else if (ret == EVEX_MICROSECOND_UNSUP)
- sql_print_information("SCHEDULER: MICROSECOND is not supported");
-
- DBUG_PRINT("info", ("master_access=%d db_access=%d",
- thd->security_ctx->master_access, thd->security_ctx->db_access));
-
- /* If true is returned, we are expected to free it */
- if (event->spawn_thread_finish(thd))
- {
- DBUG_PRINT("info", ("Freeing object pointer"));
- delete event;
- }
- }
-
- if (thd)
- {
- thd->proc_info= "Clearing";
- DBUG_ASSERT(thd->net.buff != 0);
- /*
- Free it here because net.vio is NULL for us => THD::~THD will check it
- and won't call net_end(&net); See also replication code.
- */
- net_end(&thd->net);
- DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id));
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thread_count--;
- thread_running--;
- delete thd;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- }
-
- my_thread_end();
- DBUG_RETURN(0); // Can't return anything here
-}
-
-
-
-
-/*
- Constructor of class Event_scheduler.
-
- SYNOPSIS
- Event_scheduler::Event_scheduler()
-*/
-
-Event_scheduler::Event_scheduler()
-{
- thread_id= 0;
- mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
- mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
- cond_waiting_on= COND_NONE;
- mutex_scheduler_data_locked= FALSE;
- state= UNINITIALIZED;
- start_scheduler_suspended= FALSE;
- LOCK_scheduler_data= &LOCK_data;
-}
-
-
-
-/*
- Returns the singleton instance of the class.
-
- SYNOPSIS
- Event_scheduler::create_instance()
-
- RETURN VALUE
- address
-*/
-
-void
-Event_scheduler::create_instance(Event_queue *queue)
-{
- singleton= new Event_scheduler();
- singleton->event_queue= queue;
-}
-
-/*
- Returns the singleton instance of the class.
-
- SYNOPSIS
- Event_scheduler::get_instance()
-
- RETURN VALUE
- address
-*/
-
-Event_scheduler*
-Event_scheduler::get_instance()
-{
- DBUG_ENTER("Event_scheduler::get_instance");
- DBUG_RETURN(singleton);
-}
-
-
-/*
- The implementation of full-fledged initialization.
-
- SYNOPSIS
- Event_scheduler::init()
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-bool
-Event_scheduler::init(Event_db_repository *db_repo)
-{
- int i= 0;
- bool ret= FALSE;
- DBUG_ENTER("Event_scheduler::init");
- DBUG_PRINT("enter", ("this=%p", this));
-
- LOCK_SCHEDULER_DATA();
- init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
- for (;i < COND_LAST; i++)
- if (pthread_cond_init(&cond_vars[i], NULL))
- {
- sql_print_error("SCHEDULER: Unable to initalize conditions");
- ret= TRUE;
- goto end;
- }
- state= INITIALIZED;
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(ret);
-}
-
-
-/*
- Frees all memory allocated by the scheduler object.
-
- SYNOPSIS
- Event_scheduler::destroy()
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-void
-Event_scheduler::destroy()
-{
- DBUG_ENTER("Event_scheduler");
- LOCK_SCHEDULER_DATA();
- switch (state) {
- case UNINITIALIZED:
- break;
- case INITIALIZED:
- int i;
- for (i= 0; i < COND_LAST; i++)
- pthread_cond_destroy(&cond_vars[i]);
- state= UNINITIALIZED;
- break;
- default:
- sql_print_error("SCHEDULER: Destroying while state is %d", state);
- /* I trust my code but ::safe() > ::sorry() */
- DBUG_ASSERT(0);
- break;
- }
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_VOID_RETURN;
-}
-
-
-extern pthread_attr_t connection_attrib;
-
-
-/*
- Starts the event scheduler
-
- SYNOPSIS
- Event_scheduler::start()
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-bool
-Event_scheduler::start()
-{
- bool ret= FALSE;
- pthread_t th;
- DBUG_ENTER("Event_scheduler::start");
-
- LOCK_SCHEDULER_DATA();
- /* If already working or starting don't make another attempt */
- DBUG_ASSERT(state == INITIALIZED);
- if (state > INITIALIZED)
- {
- DBUG_PRINT("info", ("scheduler is already running or starting"));
- ret= TRUE;
- goto end;
- }
-
- /*
- Now if another thread calls start it will bail-out because the branch
- above will be executed. Thus no two or more child threads will be forked.
- If the child thread cannot start for some reason then `state` is set
- to CANTSTART and COND_started is also signaled. In this case we
- set `state` back to INITIALIZED so another attempt to start the scheduler
- can be made.
- */
- state= COMMENCING;
- /* Fork */
- if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
- (void*)this))
- {
- DBUG_PRINT("error", ("cannot create a new thread"));
- state= INITIALIZED;
- ret= TRUE;
- goto end;
- }
-
- /* Wait till the child thread has booted (w/ or wo success) */
- while (!(state == SUSPENDED || state == RUNNING) && state != CANTSTART)
- cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
-
- /*
- If we cannot start for some reason then don't prohibit further attempts.
- Set back to INITIALIZED.
- */
- if (state == CANTSTART)
- {
- state= INITIALIZED;
- ret= TRUE;
- goto end;
- }
-
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(ret);
-}
-
-
-/*
- Starts the event scheduler in suspended mode.
-
- SYNOPSIS
- Event_scheduler::start_suspended()
-
- RETURN VALUE
- TRUE OK
- FALSE Error
-*/
-
-bool
-Event_scheduler::start_suspended()
-{
- DBUG_ENTER("Event_scheduler::start_suspended");
- start_scheduler_suspended= TRUE;
- DBUG_RETURN(start());
-}
-
-
-
-/*
- Report back that we cannot start. Used for ocasions where
- we can't go into ::run() and have to report externally.
-
- SYNOPSIS
- Event_scheduler::report_error_during_start()
-*/
-
-inline void
-Event_scheduler::report_error_during_start()
-{
- DBUG_ENTER("Event_scheduler::report_error_during_start");
-
- LOCK_SCHEDULER_DATA();
- state= CANTSTART;
- DBUG_PRINT("info", ("Sending back COND_started_or_stopped"));
- pthread_cond_signal(&cond_vars[COND_started_or_stopped]);
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- The internal loop of the event scheduler
-
- SYNOPSIS
- Event_scheduler::run()
- thd Thread
-
- RETURN VALUE
- FALSE OK
- TRUE Failure
-*/
-
-bool
-Event_scheduler::run(THD *thd)
-{
- int ret;
- struct timespec abstime;
- DBUG_ENTER("Event_scheduler::run");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- LOCK_SCHEDULER_DATA();
- ret= event_queue->load_events_from_db(thd);
-
- if (!ret)
- {
- thread_id= thd->thread_id;
- state= start_scheduler_suspended? SUSPENDED:RUNNING;
- start_scheduler_suspended= FALSE;
- }
- else
- state= CANTSTART;
-
- DBUG_PRINT("info", ("Sending back COND_started_or_stopped"));
- pthread_cond_signal(&cond_vars[COND_started_or_stopped]);
- if (ret)
- {
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(TRUE);
- }
- if (!check_n_suspend_if_needed(thd))
- UNLOCK_SCHEDULER_DATA();
-
- sql_print_information("SCHEDULER: Manager thread started with id %lu",
- thd->thread_id);
- abstime.tv_nsec= 0;
- while ((state == SUSPENDED || state == RUNNING))
- {
- Event_timed *et;
-
- LOCK_SCHEDULER_DATA();
- if (check_n_wait_for_non_empty_queue(thd))
- continue;
-
- /* On TRUE data is unlocked, go back to the beginning */
- if (check_n_suspend_if_needed(thd))
- continue;
-
- /* Guaranteed locked here */
- if (state == IN_SHUTDOWN || shutdown_in_progress)
- {
- UNLOCK_SCHEDULER_DATA();
- break;
- }
- DBUG_ASSERT(state == RUNNING);
-
-// et= (Event_timed *)queue_top(&event_queue->queue);
- et= event_queue->get_top();
-
- /* Skip disabled events */
- if (et->status != Event_timed::ENABLED)
- {
- /*
- It could be a one-timer scheduled for a time, already in the past when the
- scheduler was suspended.
- */
- sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue",
- et->dbname.length, et->dbname.str, et->name.length,
- et->name.str);
- queue_remove(&event_queue->queue, 0);
- /* ToDo: check this again */
- if (et->dropped)
- et->drop(thd);
- delete et;
- UNLOCK_SCHEDULER_DATA();
- continue;
- }
- thd->proc_info= (char *)"Computing";
- DBUG_PRINT("evex manager",("computing time to sleep till next exec"));
- /* Timestamp is in UTC */
- abstime.tv_sec= sec_since_epoch_TIME(&et->execute_at);
-
- thd->end_time();
- if (abstime.tv_sec > thd->query_start())
- {
- /* Event trigger time is in the future */
- thd->proc_info= (char *)"Sleep";
- DBUG_PRINT("info", ("Going to sleep. Should wakeup after approx %d secs",
- abstime.tv_sec - thd->query_start()));
- DBUG_PRINT("info", ("Entering condition because waiting for activation"));
- /*
- Use THD::enter_cond()/exit_cond() or we won't be able to kill a
- sleeping thread. Though ::stop() can do it by sending COND_new_work
- an user can't by just issuing 'KILL x'; . In the latter case
- pthread_cond_timedwait() will wait till `abstime`.
- "Sleeping until next time"
- */
- thd->enter_cond(&cond_vars[COND_new_work],LOCK_scheduler_data,"Sleeping");
-
- pthread_cond_timedwait(&cond_vars[COND_new_work], LOCK_scheduler_data,
- &abstime);
-
- DBUG_PRINT("info", ("Manager woke up. state is %d", state));
- /*
- If we get signal we should recalculate the whether it's the right time
- because there could be :
- 1. Spurious wake-up
- 2. The top of the queue was changed (new one becase of add/drop/replace)
- */
- /* This will do implicit UNLOCK_SCHEDULER_DATA() */
- thd->exit_cond("");
- }
- else
- {
- thd->proc_info= (char *)"Executing";
- /*
- Execute the event. An error may occur if a thread cannot be forked.
- In this case stop the manager.
- We should enter ::execute_top() with locked LOCK_scheduler_data.
- */
- int ret= execute_top(thd, et);
- UNLOCK_SCHEDULER_DATA();
- if (ret)
- break;
- }
- }
-
- thd->proc_info= (char *)"Cleaning";
-
- LOCK_SCHEDULER_DATA();
- /*
- It's possible that a user has used (SQL)COM_KILL. Hence set the appropriate
- state because it is only set by ::stop().
- */
- if (state != IN_SHUTDOWN)
- {
- DBUG_PRINT("info", ("We got KILL but the but not from ::stop()"));
- state= IN_SHUTDOWN;
- }
- UNLOCK_SCHEDULER_DATA();
-
- sql_print_information("SCHEDULER: Shutting down");
-
- thd->proc_info= (char *)"Cleaning queue";
- clean_memory(thd);
- THD_CHECK_SENTRY(thd);
-
- /* free mamager_root memory but don't destroy the root */
- thd->proc_info= (char *)"Cleaning memory root";
- free_root(&scheduler_root, MYF(0));
- THD_CHECK_SENTRY(thd);
-
- /*
- We notify the waiting thread which shutdowns us that we have cleaned.
- There are few more instructions to be executed in this pthread but
- they don't affect manager structures thus it's safe to signal already
- at this point.
- */
- LOCK_SCHEDULER_DATA();
- thd->proc_info= (char *)"Sending shutdown signal";
- DBUG_PRINT("info", ("Sending COND_started_or_stopped"));
- if (state == IN_SHUTDOWN)
- pthread_cond_signal(&cond_vars[COND_started_or_stopped]);
-
- state= INITIALIZED;
- /*
- We set it here because ::run() can stop not only because of ::stop()
- call but also because of `KILL x`
- */
- thread_id= 0;
- sql_print_information("SCHEDULER: Stopped");
- UNLOCK_SCHEDULER_DATA();
-
- /* We have modified, we set back */
- thd->query= NULL;
- thd->query_length= 0;
-
- DBUG_RETURN(FALSE);
-}
-
-
-/*
- Executes the top element of the queue. Auxiliary method for ::run().
-
- SYNOPSIS
- Event_scheduler::execute_top()
-
- RETURN VALUE
- FALSE OK
- TRUE Failure
-
- NOTE
- NO locking is done. EXPECTED is that the caller should have locked
- the queue (w/ LOCK_scheduler_data).
-*/
-
-bool
-Event_scheduler::execute_top(THD *thd, Event_timed *et)
-{
- int spawn_ret_code;
- bool ret= FALSE;
- DBUG_ENTER("Event_scheduler::execute_top");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- /* Is it good idea to pass a stack address ?*/
- Worker_thread_param param(et);
-
- pthread_mutex_lock(&param.LOCK_started);
- /*
- We don't lock LOCK_scheduler_data fpr workers_increment() because it's a
- pre-requisite for calling the current_method.
- */
- switch ((spawn_ret_code= et->spawn_now(event_worker_thread, &param))) {
- case EVENT_EXEC_CANT_FORK:
- /*
- We don't lock LOCK_scheduler_data here because it's a pre-requisite
- for calling the current_method.
- */
- sql_print_error("SCHEDULER: Problem while trying to create a thread");
- ret= TRUE;
- break;
- case EVENT_EXEC_ALREADY_EXEC:
- /*
- We don't lock LOCK_scheduler_data here because it's a pre-requisite
- for calling the current_method.
- */
- sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
- et->dbname.str, et->name.str);
- if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
- event_queue->remove_top();
- else
- event_queue->top_changed();
- break;
- default:
- DBUG_ASSERT(!spawn_ret_code);
- if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
- event_queue->remove_top();
- else
- event_queue->top_changed();
- /*
- We don't lock LOCK_scheduler_data here because it's a pre-requisite
- for calling the current_method.
- */
- if (likely(!spawn_ret_code))
- {
- /* Wait the forked thread to start */
- do {
- pthread_cond_wait(&param.COND_started, &param.LOCK_started);
- } while (!param.started);
- }
- /*
- param was allocated on the stack so no explicit delete as well as
- in this moment it's no more used in the spawned thread so it's safe
- to be deleted.
- */
- break;
- }
- pthread_mutex_unlock(&param.LOCK_started);
- /* `param` is on the stack and will be destructed by the compiler */
-
- DBUG_RETURN(ret);
-}
-
-
-/*
- Cleans the scheduler's queue. Auxiliary method for ::run().
-
- SYNOPSIS
- Event_scheduler::clean_queue()
- thd Thread
-*/
-
-void
-Event_scheduler::clean_memory(THD *thd)
-{
- CHARSET_INFO *scs= system_charset_info;
- uint i;
- DBUG_ENTER("Event_scheduler::clean_queue");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- LOCK_SCHEDULER_DATA();
- stop_all_running_events(thd);
- UNLOCK_SCHEDULER_DATA();
-
- sql_print_information("SCHEDULER: Emptying the queue");
-
- event_queue->empty_queue();
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Stops all running events
-
- SYNOPSIS
- Event_scheduler::stop_all_running_events()
- thd Thread
-
- NOTE
- LOCK_scheduler data must be acquired prior to call to this method
-*/
-
-void
-Event_scheduler::stop_all_running_events(THD *thd)
-{
- CHARSET_INFO *scs= system_charset_info;
- uint i;
- DYNAMIC_ARRAY running_threads;
- THD *tmp;
- DBUG_ENTER("Event_scheduler::stop_all_running_events");
- DBUG_PRINT("enter", ("workers_count=%d", workers_count()));
-
- my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10);
-
- bool had_super= FALSE;
- VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (tmp->command == COM_DAEMON)
- continue;
- if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
- push_dynamic(&running_threads, (gptr) &tmp->thread_id);
- }
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
-
- /* We need temporarily SUPER_ACL to be able to kill our offsprings */
- if (!(thd->security_ctx->master_access & SUPER_ACL))
- thd->security_ctx->master_access|= SUPER_ACL;
- else
- had_super= TRUE;
-
- char tmp_buff[10*STRING_BUFFER_USUAL_SIZE];
- char int_buff[STRING_BUFFER_USUAL_SIZE];
- String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
- String int_string(int_buff, sizeof(int_buff), scs);
- tmp_string.length(0);
-
- for (i= 0; i < running_threads.elements; ++i)
- {
- int ret;
- ulong thd_id= *dynamic_element(&running_threads, i, ulong*);
-
- int_string.set((longlong) thd_id,scs);
- tmp_string.append(int_string);
- if (i < running_threads.elements - 1)
- tmp_string.append(' ');
-
- if ((ret= kill_one_thread(thd, thd_id, FALSE)))
- {
- sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret);
- break;
- }
- }
- if (running_threads.elements)
- sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr());
-
- if (!had_super)
- thd->security_ctx->master_access &= ~SUPER_ACL;
-
- delete_dynamic(&running_threads);
-
- sql_print_information("SCHEDULER: Waiting for worker threads to finish");
-
- while (workers_count())
- my_sleep(100000);
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Stops the event scheduler
-
- SYNOPSIS
- Event_scheduler::stop()
-
- RETURN VALUE
- OP_OK OK
- OP_CANT_KILL Error during stopping of manager thread
- OP_NOT_RUNNING Manager not working
-
- NOTE
- The caller must have acquited LOCK_scheduler_data.
-*/
-
-int
-Event_scheduler::stop()
-{
- THD *thd= current_thd;
- DBUG_ENTER("Event_scheduler::stop");
- DBUG_PRINT("enter", ("thd=%p", current_thd));
-
- LOCK_SCHEDULER_DATA();
- if (!(state == SUSPENDED || state == RUNNING))
- {
- /*
- One situation to be here is if there was a start that forked a new
- thread but the new thread did not acquire yet LOCK_scheduler_data.
- Hence, in this case return an error.
- */
- DBUG_PRINT("info", ("manager not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_NOT_RUNNING);
- }
- state= IN_SHUTDOWN;
-
- DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
- sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
-
- /*
- Sending the COND_new_work to ::run() is a way to get this working without
- race conditions. If we use kill_one_thread() it will call THD::awake() and
- because in ::run() both THD::enter_cond()/::exit_cond() are used,
- THD::awake() will try to lock LOCK_scheduler_data. If we UNLOCK it before,
- then the pthread_cond_signal(COND_started_or_stopped) could be signaled in
- ::run() and we can miss the signal before we relock. A way is to use
- another mutex for this shutdown procedure but better not.
- */
- pthread_cond_signal(&cond_vars[COND_new_work]);
- /* Or we are suspended - then we should wake up */
- pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
-
- /* Guarantee we don't catch spurious signals */
- sql_print_information("SCHEDULER: Waiting the manager thread to reply");
- while (state != INITIALIZED)
- {
- DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
- "thread. Current value of state is %d . "
- "workers count=%d", state, workers_count()));
- cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
- }
- DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_RETURN(OP_OK);
-}
-
-
-/*
- Suspends or resumes the scheduler.
- SUSPEND - it won't execute any event till resumed.
- RESUME - it will resume if suspended.
-
- SYNOPSIS
- Event_scheduler::suspend_or_resume()
-
- RETURN VALUE
- OP_OK OK
-*/
-
-int
-Event_scheduler::suspend_or_resume(
- enum Event_scheduler::enum_suspend_or_resume action)
-{
- DBUG_ENTER("Event_scheduler::suspend_or_resume");
- DBUG_PRINT("enter", ("action=%d", action));
-
- LOCK_SCHEDULER_DATA();
-
- if ((action == SUSPEND && state == SUSPENDED) ||
- (action == RESUME && state == RUNNING))
- {
- DBUG_PRINT("info", ("Either trying to suspend suspended or resume "
- "running scheduler. Doing nothing."));
- }
- else
- {
- /* Wake the main thread up if he is asleep */
- DBUG_PRINT("info", ("Sending signal"));
- if (action==SUSPEND)
- {
- state= SUSPENDED;
- pthread_cond_signal(&cond_vars[COND_new_work]);
- }
- else
- {
- state= RUNNING;
- pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
- }
- DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume"));
- cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
- DBUG_PRINT("info", ("Got response"));
- }
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
-}
-
-
-/*
- Returns the number of executing events.
-
- SYNOPSIS
- Event_scheduler::workers_count()
-*/
-
-uint
-Event_scheduler::workers_count()
-{
- THD *tmp;
- uint count= 0;
-
- DBUG_ENTER("Event_scheduler::workers_count");
- VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (tmp->command == COM_DAEMON)
- continue;
- if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
- ++count;
- }
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- DBUG_PRINT("exit", ("%d", count));
- DBUG_RETURN(count);
-}
-
-
-/*
- Checks and suspends if needed
-
- SYNOPSIS
- Event_scheduler::check_n_suspend_if_needed()
- thd Thread
-
- RETURN VALUE
- FALSE Not suspended, we haven't slept
- TRUE We were suspended. LOCK_scheduler_data is unlocked.
-
- NOTE
- The caller should have locked LOCK_scheduler_data!
- The mutex will be unlocked in case this function returns TRUE
-*/
-
-bool
-Event_scheduler::check_n_suspend_if_needed(THD *thd)
-{
- bool was_suspended= FALSE;
- DBUG_ENTER("Event_scheduler::check_n_suspend_if_needed");
- if (thd->killed && !shutdown_in_progress)
- {
- state= SUSPENDED;
- thd->killed= THD::NOT_KILLED;
- }
- if (state == SUSPENDED)
- {
- thd->enter_cond(&cond_vars[COND_suspend_or_resume], LOCK_scheduler_data,
- "Suspended");
- /* Send back signal to the thread that asked us to suspend operations */
- pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
- sql_print_information("SCHEDULER: Suspending operations");
- was_suspended= TRUE;
- }
- while (state == SUSPENDED)
- {
- cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
- DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume"));
- if (state != SUSPENDED)
- {
- pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
- sql_print_information("SCHEDULER: Resuming operations");
- }
- }
- if (was_suspended)
- {
- event_queue->recalculate_queue(thd);
- /* This will implicitly unlock LOCK_scheduler_data */
- thd->exit_cond("");
- }
- DBUG_RETURN(was_suspended);
-}
-
-
-/*
- Checks for empty queue and waits till new element gets in
-
- SYNOPSIS
- Event_scheduler::check_n_wait_for_non_empty_queue()
- thd Thread
-
- RETURN VALUE
- FALSE Did not wait - LOCK_scheduler_data still locked.
- TRUE Waited - LOCK_scheduler_data unlocked.
-
- NOTE
- The caller should have locked LOCK_scheduler_data!
-*/
-
-bool
-Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
-{
- bool slept= FALSE;
- DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue");
- DBUG_PRINT("enter", ("q.elements=%lu state=%s",
- event_queue->events_count_no_lock(), states_names[state]));
-
- if (!event_queue->events_count_no_lock())
- thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data,
- "Empty queue, sleeping");
-
- /* Wait in a loop protecting against catching spurious signals */
- while (!event_queue->events_count_no_lock() && state == RUNNING)
- {
- slept= TRUE;
- DBUG_PRINT("info", ("Entering condition because of empty queue"));
- cond_wait(COND_new_work, LOCK_scheduler_data);
- DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d",
- state));
- /*
- exit_cond does implicit mutex_UNLOCK, we needed it locked if
- 1. we loop again
- 2. end the current loop and start doing calculations
- */
- }
- if (slept)
- thd->exit_cond("");
-
- DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d",
- event_queue->events_count_no_lock(), states_names[state], thd->killed));
-
- DBUG_RETURN(slept);
-}
-
-
-/*
- Returns the current state of the scheduler
-
- SYNOPSIS
- Event_scheduler::get_state()
-*/
-
-enum Event_scheduler::enum_state
-Event_scheduler::get_state()
-{
- enum Event_scheduler::enum_state ret;
- DBUG_ENTER("Event_scheduler::get_state");
- /* lock_data & unlock_data are not static */
- pthread_mutex_lock(singleton->LOCK_scheduler_data);
- ret= singleton->state;
- pthread_mutex_unlock(singleton->LOCK_scheduler_data);
- DBUG_RETURN(ret);
-}
-
-
-/*
- Returns whether the scheduler was initialized.
-
- SYNOPSIS
- Event_scheduler::initialized()
-
- RETURN VALUE
- FALSE Was not initialized so far
- TRUE Was initialized
-*/
-
-bool
-Event_scheduler::initialized()
-{
- DBUG_ENTER("Event_scheduler::initialized");
- DBUG_RETURN(Event_scheduler::get_state() != UNINITIALIZED);
-}
-
-
-
-
-/*
- Dumps some data about the internal status of the scheduler.
-
- SYNOPSIS
- Event_scheduler::dump_internal_status()
- thd THD
-
- RETURN VALUE
- 0 OK
- 1 Error
-*/
-
-int
-Event_scheduler::dump_internal_status(THD *thd)
-{
- DBUG_ENTER("dump_internal_status");
-#ifndef DBUG_OFF
- CHARSET_INFO *scs= system_charset_info;
- Protocol *protocol= thd->protocol;
- List<Item> field_list;
- int ret;
- char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
- char int_buff[STRING_BUFFER_USUAL_SIZE];
- String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
- String int_string(int_buff, sizeof(int_buff), scs);
- tmp_string.length(0);
- int_string.length(0);
-
- field_list.push_back(new Item_empty_string("Name", 20));
- field_list.push_back(new Item_empty_string("Value",20));
- if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
- Protocol::SEND_EOF))
- DBUG_RETURN(1);
-
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("state"), scs);
- protocol->store(states_names[singleton->state].str,
- states_names[singleton->state].length,
- scs);
-
- ret= protocol->write();
- /*
- If not initialized - don't show anything else. get_instance()
- will otherwise implicitly initialize it. We don't want that.
- */
- if (singleton->state >= INITIALIZED)
- {
- /* last locked at*/
- /*
- The first thing to do, or get_instance() will overwrite the values.
- mutex_last_locked_at_line / mutex_last_unlocked_at_line
- */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("last locked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- singleton->mutex_last_locked_in_func,
- singleton->mutex_last_locked_at_line));
- protocol->store(&tmp_string);
- ret= protocol->write();
-
- /* last unlocked at*/
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("last unlocked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- singleton->mutex_last_unlocked_in_func,
- singleton->mutex_last_unlocked_at_line));
- protocol->store(&tmp_string);
- ret= protocol->write();
-
- /* waiting on */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("waiting on condition"), scs);
- tmp_string.length(scs->cset->
- snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s",
- (singleton->cond_waiting_on != COND_NONE) ?
- cond_vars_names[singleton->cond_waiting_on]:
- "NONE"));
- protocol->store(&tmp_string);
- ret= protocol->write();
-
- Event_scheduler *scheduler= get_instance();
-
- /* workers_count */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("workers_count"), scs);
- int_string.set((longlong) scheduler->workers_count(), scs);
- protocol->store(&int_string);
- ret= protocol->write();
-
- /* queue.elements */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("queue.elements"), scs);
- int_string.set((longlong) scheduler->event_queue->events_count_no_lock(), scs);
- protocol->store(&int_string);
- ret= protocol->write();
-
- /* scheduler_data_locked */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
- int_string.set((longlong) scheduler->mutex_scheduler_data_locked, scs);
- protocol->store(&int_string);
- ret= protocol->write();
- }
- send_eof(thd);
-#endif
- DBUG_RETURN(0);
-}
-
-
-/*
- Wrapper for pthread_mutex_lock
-
- SYNOPSIS
- Event_scheduler::lock_data()
- mutex Mutex to lock
- line The line number on which the lock is done
-
- RETURN VALUE
- Error code of pthread_mutex_lock()
-*/
-
-void
-Event_scheduler::lock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_scheduler_data, func, line));
- pthread_mutex_lock(LOCK_scheduler_data);
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_scheduler_data_locked= TRUE;
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_mutex_unlock
-
- SYNOPSIS
- Event_scheduler::unlock_data()
- mutex Mutex to unlock
- line The line number on which the unlock is done
-*/
-
-void
-Event_scheduler::unlock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- LOCK_scheduler_data, func, line));
- mutex_last_unlocked_at_line= line;
- mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
- pthread_mutex_unlock(LOCK_scheduler_data);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_cond_wait
-
- SYNOPSIS
- Event_scheduler::cond_wait()
- cond Conditional to wait for
- mutex Mutex of the conditional
-
- RETURN VALUE
- Error code of pthread_cond_wait()
-*/
-
-int
-Event_scheduler::cond_wait(int cond, pthread_mutex_t *mutex)
-{
- int ret;
- DBUG_ENTER("Event_scheduler::cond_wait");
- DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex));
- ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex);
- cond_waiting_on= COND_NONE;
- DBUG_RETURN(ret);
-}
-
-
-/*
- Signals the main scheduler thread that the queue has changed
- its state.
-
- SYNOPSIS
- Event_scheduler::queue_changed()
-*/
-
-void
-Event_scheduler::queue_changed()
-{
- DBUG_ENTER("Event_scheduler::queue_changed");
- DBUG_PRINT("info", ("Sending COND_new_work"));
- pthread_cond_signal(&cond_vars[COND_new_work]);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Inits mutexes.
-
- SYNOPSIS
- Event_scheduler::init_mutexes()
-*/
-
-void
-Event_scheduler::init_mutexes()
-{
- pthread_mutex_init(singleton->LOCK_scheduler_data, MY_MUTEX_INIT_FAST);
-}
-
-
-/*
- Destroys mutexes.
-
- SYNOPSIS
- Event_queue::destroy_mutexes()
-*/
-
-void
-Event_scheduler::destroy_mutexes()
-{
- pthread_mutex_destroy(singleton->LOCK_scheduler_data);
-}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index 7d02e98d4fe..acd0debe391 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -16,191 +16,4 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-class sp_name;
-class Event_timed;
-class Event_db_repository;
-class Event_queue;
-
-class THD;
-
-int
-events_init();
-
-void
-events_shutdown();
-
-#include "event_queue.h"
-#include "event_scheduler.h"
-
-class Event_scheduler
-{
-public:
- enum enum_state
- {
- UNINITIALIZED= 0,
- INITIALIZED,
- COMMENCING,
- CANTSTART,
- RUNNING,
- SUSPENDED,
- IN_SHUTDOWN
- };
-
- enum enum_suspend_or_resume
- {
- SUSPEND= 1,
- RESUME= 2
- };
-
- /* This is the current status of the life-cycle of the scheduler. */
- enum enum_state state;
-
-
- static void
- create_instance(Event_queue *queue);
-
- static void
- init_mutexes();
-
- static void
- destroy_mutexes();
-
- /* Singleton access */
- static Event_scheduler*
- get_instance();
-
- bool
- init(Event_db_repository *db_repo);
-
- void
- destroy();
-
- /* State changing methods follow */
-
- bool
- start();
-
- int
- stop();
-
- bool
- start_suspended();
-
- /*
- Need to be public because has to be called from the function
- passed to pthread_create.
- */
- bool
- run(THD *thd);
-
- int
- suspend_or_resume(enum enum_suspend_or_resume action);
-/*
- static void
- init_mutexes();
-
- static void
- destroy_mutexes();
-*/
- void
- report_error_during_start();
-
- /* Information retrieving methods follow */
-
- enum enum_state
- get_state();
-
- bool
- initialized();
-
- static int
- dump_internal_status(THD *thd);
-
- /* helper functions for working with mutexes & conditionals */
- void
- lock_data(const char *func, uint line);
-
- void
- unlock_data(const char *func, uint line);
-
- int
- cond_wait(int cond, pthread_mutex_t *mutex);
-
- void
- queue_changed();
-
- Event_queue *event_queue;
-
-protected:
-
- uint
- workers_count();
-
- /* helper functions */
- bool
- execute_top(THD *thd, Event_timed *et);
-
- void
- clean_memory(THD *thd);
-
- void
- stop_all_running_events(THD *thd);
-
-
- bool
- check_n_suspend_if_needed(THD *thd);
-
- bool
- check_n_wait_for_non_empty_queue(THD *thd);
-
- /* Singleton DP is used */
- Event_scheduler();
-
- pthread_mutex_t LOCK_data;
- pthread_mutex_t *LOCK_scheduler_data;
-
- /* The MEM_ROOT of the object */
- MEM_ROOT scheduler_root;
-
- /* Set to start the scheduler in suspended state */
- bool start_scheduler_suspended;
-
- /*
- Holds the thread id of the executor thread or 0 if the executor is not
- running. It is used by ::shutdown() to know which thread to kill with
- kill_one_thread(). The latter wake ups a thread if it is waiting on a
- conditional variable and sets thd->killed to non-zero.
- */
- ulong thread_id;
-
- enum enum_cond_vars
- {
- COND_NONE= -1,
- COND_new_work= 0,
- COND_started_or_stopped,
- COND_suspend_or_resume,
- /* Must be always last */
- COND_LAST
- };
-
- uint mutex_last_locked_at_line;
- uint mutex_last_unlocked_at_line;
- const char* mutex_last_locked_in_func;
- const char* mutex_last_unlocked_in_func;
- int cond_waiting_on;
- bool mutex_scheduler_data_locked;
-
- static const char * const cond_vars_names[COND_LAST];
-
- pthread_cond_t cond_vars[COND_LAST];
-
- /* Singleton instance */
- static Event_scheduler *singleton;
-
-private:
- /* Prevent use of these */
- Event_scheduler(const Event_scheduler &);
- void operator=(Event_scheduler &);
-};
-
#endif /* _EVENT_SCHEDULER_H_ */
diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc
index 9dc3bb26bc7..b256ef8411f 100644
--- a/sql/event_scheduler_ng.cc
+++ b/sql/event_scheduler_ng.cc
@@ -212,6 +212,7 @@ end:
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
+ DBUG_RETURN(0); // Against gcc warnings
}
@@ -296,26 +297,22 @@ end:
delete event;
my_thread_end();
+ DBUG_RETURN(0); // Against gcc warnings
}
bool
-Event_scheduler_ng::init(Event_queue *q)
+Event_scheduler_ng::init_scheduler(Event_queue *q)
{
thread_id= 0;
state= INITIALIZED;
- /* init memory root */
-
queue= q;
-
return FALSE;
}
void
-Event_scheduler_ng::deinit()
-{
-}
+Event_scheduler_ng::deinit_scheduler() {}
void
@@ -477,7 +474,6 @@ Event_scheduler_ng::run(THD *thd)
pthread_cond_signal(&COND_state);
error:
state= INITIALIZED;
- stop_all_running_events(thd);
UNLOCK_SCHEDULER_DATA();
sql_print_information("SCHEDULER: Stopped");
@@ -561,97 +557,17 @@ Event_scheduler_ng::workers_count()
/*
- Stops all running events
-
- SYNOPSIS
- Event_scheduler::stop_all_running_events()
- thd Thread
-
- NOTE
- LOCK_scheduler data must be acquired prior to call to this method
-*/
-
-void
-Event_scheduler_ng::stop_all_running_events(THD *thd)
-{
- CHARSET_INFO *scs= system_charset_info;
- uint i;
- DYNAMIC_ARRAY running_threads;
- THD *tmp;
- DBUG_ENTER("Event_scheduler::stop_all_running_events");
- DBUG_PRINT("enter", ("workers_count=%d", workers_count()));
-
- my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10);
-
- bool had_super= FALSE;
- VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (tmp->command == COM_DAEMON)
- continue;
- if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
- push_dynamic(&running_threads, (gptr) &tmp->thread_id);
- }
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
-
- /* We need temporarily SUPER_ACL to be able to kill our offsprings */
- if (!(thd->security_ctx->master_access & SUPER_ACL))
- thd->security_ctx->master_access|= SUPER_ACL;
- else
- had_super= TRUE;
-
- char tmp_buff[10*STRING_BUFFER_USUAL_SIZE];
- char int_buff[STRING_BUFFER_USUAL_SIZE];
- String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
- String int_string(int_buff, sizeof(int_buff), scs);
- tmp_string.length(0);
-
- for (i= 0; i < running_threads.elements; ++i)
- {
- int ret;
- ulong thd_id= *dynamic_element(&running_threads, i, ulong*);
-
- int_string.set((longlong) thd_id,scs);
- tmp_string.append(int_string);
- if (i < running_threads.elements - 1)
- tmp_string.append(' ');
-
- if ((ret= kill_one_thread(thd, thd_id, FALSE)))
- {
- sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret);
- break;
- }
- }
- if (running_threads.elements)
- sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr());
-
- if (!had_super)
- thd->security_ctx->master_access &= ~SUPER_ACL;
-
- delete_dynamic(&running_threads);
-
- sql_print_information("SCHEDULER: Waiting for worker threads to finish");
-
- while (workers_count())
- my_sleep(100000);
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
- Event_scheduler::queue_changed()
+ Event_scheduler_ng::queue_changed()
*/
void
Event_scheduler_ng::queue_changed()
{
- DBUG_ENTER("Event_scheduler::queue_changed");
+ DBUG_ENTER("Event_scheduler_ng::queue_changed");
DBUG_PRINT("info", ("Sending COND_state"));
pthread_cond_signal(&COND_state);
DBUG_VOID_RETURN;
@@ -662,8 +578,7 @@ void
Event_scheduler_ng::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_scheduler_ng::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_scheduler_state, func, line));
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
pthread_mutex_lock(&LOCK_scheduler_state);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
@@ -675,9 +590,8 @@ Event_scheduler_ng::lock_data(const char *func, uint line)
void
Event_scheduler_ng::unlock_data(const char *func, uint line)
{
- DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- &LOCK_scheduler_state, func, line));
+ DBUG_ENTER("Event_scheduler_ng::unlock_mutex");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h
index b250923d23e..41642161fc6 100644
--- a/sql/event_scheduler_ng.h
+++ b/sql/event_scheduler_ng.h
@@ -48,10 +48,10 @@ public:
run(THD *thd);
bool
- init(Event_queue *queue);
+ init_scheduler(Event_queue *queue);
void
- deinit();
+ deinit_scheduler();
void
init_mutexes();
@@ -78,9 +78,6 @@ private:
bool
execute_top(THD *thd, Event_timed *job_data);
- void
- stop_all_running_events(THD *thd);
-
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
@@ -104,7 +101,6 @@ private:
pthread_cond_t COND_state;
Event_queue *queue;
- Event_db_repository *db_repository;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
diff --git a/sql/events.cc b/sql/events.cc
index e4b6de965f7..d1bbb6be884 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -17,10 +17,10 @@
#include "mysql_priv.h"
#include "events.h"
#include "event_data_objects.h"
-#include "event_scheduler.h"
#include "event_db_repository.h"
-#include "sp_head.h"
+#include "event_queue.h"
#include "event_scheduler_ng.h"
+#include "sp_head.h"
/*
TODO list :
@@ -48,6 +48,21 @@ Warning:
*/
+/*
+ If the user (un)intentionally removes an event directly from mysql.event
+ the following sequence has to be used to be able to remove the in-memory
+ counterpart.
+ 1. CREATE EVENT the_name ON SCHEDULE EVERY 1 SECOND DISABLE DO SELECT 1;
+ 2. DROP EVENT the_name
+
+ In other words, the first one will create a row in mysql.event . In the
+ second step because there will be a line, disk based drop will pass and
+ the scheduler will remove the memory counterpart. The reason is that
+ in-memory queue does not check whether the event we try to drop from memory
+ is disabled. Disabled events are not kept in-memory because they are not
+ eligible for execution.
+*/
+
const char *event_scheduler_state_names[]=
{ "OFF", "0", "ON", "1", "SUSPEND", "2", NullS };
@@ -284,17 +299,15 @@ Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
*/
int
-Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options,
+Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists,
uint *rows_affected)
{
int ret;
DBUG_ENTER("Events::create_event");
- if (!(ret= db_repository->
- create_event(thd, parse_data,
- create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
- rows_affected)))
+ if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists,
+ rows_affected)))
{
- if ((ret= event_queue->create_event(thd, parse_data, true)))
+ if ((ret= event_queue->create_event(thd, parse_data)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
}
/* No need to close the table, it will be closed in sql_parse::do_command */
@@ -350,9 +363,10 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
SYNOPSIS
Events::drop_event()
thd THD
- name event's name
- drop_if_exists if set and the event not existing => warning onto the stack
- rows_affected affected number of rows is returned heres
+ name Event's name
+ if_exists When set and the event does not exist => warning onto
+ the stack
+ rows_affected Affected number of rows is returned heres
RETURN VALUE
0 OK
@@ -360,15 +374,13 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
*/
int
-Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists,
- uint *rows_affected)
+Events::drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected)
{
int ret;
-
DBUG_ENTER("Events::drop_event");
- if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name,
- drop_if_exists, rows_affected)))
+ if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, if_exists,
+ rows_affected)))
{
if ((ret= event_queue->drop_event(thd, name)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
@@ -401,7 +413,7 @@ Events::show_create_event(THD *thd, sp_name *spn)
DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str));
thd->reset_n_backup_open_tables_state(&backup);
- ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL, thd->mem_root);
+ ret= db_repository->find_event(thd, spn->m_db, spn->m_name, &et, NULL);
thd->restore_backup_open_tables_state(&backup);
if (!ret)
@@ -472,7 +484,7 @@ Events::drop_schema_events(THD *thd, char *db)
DBUG_ENTER("evex_drop_db_events");
DBUG_PRINT("enter", ("dropping events from %s", db));
- ret= event_queue->drop_schema_events(thd, db_lex);
+ event_queue->drop_schema_events(thd, db_lex);
ret= db_repository->drop_schema_events(thd, db_lex);
DBUG_RETURN(ret);
@@ -500,9 +512,8 @@ Events::init()
Event_db_repository *db_repo;
DBUG_ENTER("Events::init");
db_repository->init_repository();
- event_queue->init(db_repository);
- event_queue->scheduler= scheduler_ng;
- scheduler_ng->init(event_queue);
+ event_queue->init_queue(db_repository, scheduler_ng);
+ scheduler_ng->init_scheduler(event_queue);
/* it should be an assignment! */
if (opt_event_scheduler)
@@ -532,8 +543,9 @@ Events::deinit()
DBUG_ENTER("Events::deinit");
scheduler_ng->stop();
- scheduler_ng->deinit();
- event_queue->deinit();
+ scheduler_ng->deinit_scheduler();
+
+ event_queue->deinit_queue();
db_repository->deinit_repository();
DBUG_VOID_RETURN;
diff --git a/sql/events.h b/sql/events.h
index 357312b44d1..bb9fe7a8fc5 100644
--- a/sql/events.h
+++ b/sql/events.h
@@ -75,7 +75,7 @@ public:
get_instance();
int
- create_event(THD *thd, Event_parse_data *parse_data, uint create_options,
+ create_event(THD *thd, Event_parse_data *parse_data, bool if_exists,
uint *rows_affected);
int
@@ -83,7 +83,7 @@ public:
uint *rows_affected);
int
- drop_event(THD *thd, sp_name *name, bool drop_if_exists, uint *rows_affected);
+ drop_event(THD *thd, sp_name *name, bool if_exists, uint *rows_affected);
int
drop_schema_events(THD *thd, char *db);
@@ -105,9 +105,9 @@ public:
int
dump_internal_status(THD *thd);
+ Event_queue *event_queue;
+ Event_scheduler_ng *scheduler_ng;
Event_db_repository *db_repository;
- Event_queue *event_queue;
- Event_scheduler_ng *scheduler_ng;
private:
/* Singleton DP is used */
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 1f55d9ea3cf..15f6ccc2089 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -57,7 +57,6 @@
#include <myisam.h>
#include <my_dir.h>
-#include "event_scheduler.h"
#include "events.h"
/* WITH_BERKELEY_STORAGE_ENGINE */
@@ -3894,7 +3893,6 @@ bool
sys_var_event_scheduler::update(THD *thd, set_var *var)
{
int res;
- Event_scheduler *scheduler= Event_scheduler::get_instance();
/* here start the thread if not running. */
DBUG_ENTER("sys_var_event_scheduler::update");
if (Events::opt_event_scheduler == 0)
@@ -3927,8 +3925,6 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
LEX_STRING *base)
{
- Event_scheduler *scheduler= Event_scheduler::get_instance();
-
if (Events::opt_event_scheduler == 0)
thd->sys_var_tmp.long_value= 0;
else if (Events::get_instance()->is_started())
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 80e5fff5a58..a4987080b02 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -3832,25 +3832,26 @@ end_with_restore_list:
case SQLCOM_CREATE_EVENT:
case SQLCOM_ALTER_EVENT:
{
- uint rows_affected= 1;
+ uint affected= 1;
DBUG_ASSERT(lex->event_parse_data);
switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT:
- res= Events::get_instance()->create_event(thd, lex->event_parse_data,
- (uint) lex->create_info.options,
- &rows_affected);
+ res= Events::get_instance()->
+ create_event(thd, lex->event_parse_data,
+ lex->create_info.options & HA_LEX_CREATE_IF_NOT_EXISTS,
+ &affected);
break;
case SQLCOM_ALTER_EVENT:
- res= Events::get_instance()->update_event(thd, lex->event_parse_data,
- lex->spname, &rows_affected);
+ res= Events::get_instance()->
+ update_event(thd, lex->event_parse_data, lex->spname, &affected);
break;
default:;
}
- DBUG_PRINT("info", ("CREATE/ALTER/DROP returned error code=%d af_rows=%d",
- res, rows_affected));
+ DBUG_PRINT("info",("DDL error code=%d affected=%d", res, affected));
if (!res)
- send_ok(thd, rows_affected);
+ send_ok(thd, affected);
+ /* Don't do it, if we are inside a SP */
if (!thd->spcont)
{
delete lex->sphead;
@@ -3867,8 +3868,7 @@ end_with_restore_list:
if (! lex->spname->m_db.str)
{
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
- res= true;
- break;
+ goto error;
}
if (check_access(thd, EVENT_ACL, lex->spname->m_db.str, 0, 0, 0,
is_schema_db(lex->spname->m_db.str)))
@@ -3885,11 +3885,10 @@ end_with_restore_list:
res= Events::get_instance()->show_create_event(thd, lex->spname);
else
{
- uint rows_affected= 1;
- if (!(res= Events::get_instance()->drop_event(thd, lex->spname,
- lex->drop_if_exists,
- &rows_affected)))
- send_ok(thd, rows_affected);
+ uint affected= 1;
+ if (!(res= Events::get_instance()->
+ drop_event(thd, lex->spname, lex->drop_if_exists, &affected)))
+ send_ok(thd, affected);
}
break;
}