summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-04 18:44:35 +0200
committerunknown <andrey@lmy004.>2006-07-04 18:44:35 +0200
commita5dfeb02e991e6e5e9e332443522de1bb4592df8 (patch)
tree4459403077605a188dd88b131d998654761070d2 /sql
parent377446fa3497ffbc0f2a17614d848bfb79f52662 (diff)
downloadmariadb-git-a5dfeb02e991e6e5e9e332443522de1bb4592df8.tar.gz
WL #3337 (Event scheduler new architecture)
Cut Nr. 8. All tests pass. Separated Event_scheduler into Event_queue and Event_scheduler. Added new Event_scheduler_ng which is the new scheduler and is used system-wide. Will be moved to the event_scheduler.cc in the future. Using Event_timed in Event_queue as well as cloned during execution. Next step is to have Event_worker_data which will be used during execution and will take ::compile()/::execute() out of Event_timed. mysql-test/r/events.result: update result mysql-test/r/events_bugs.result: update result mysql-test/r/ps_1general.result: update result mysql-test/r/skip_name_resolve.result: update result mysql-test/r/sp-threads.result: update result mysql-test/r/sp_notembedded.result: update result mysql-test/r/status.result: update result mysql-test/t/events_stress.test: Make event_stress a bit longer sql/Makefile.am: Add new event_scheduler_ng.h/cc . These are only to be in the experimental clone. Later their content will be moved to event_scheduler.h/cc sql/event_data_objects.cc: Allocate strings memory on own memory root, instead on the schedulers. Thus don't "leak" memory. This should fix bug#18683 memory leak in event scheduler sql/event_data_objects.h: add mem_root add THD - this is only temporal, will be moved to class Event_job_data once Event_job_data is responsible for the execution. sql/event_db_repository.cc: Remove unused code. Cosmetic changes sql/event_queue.cc: Now use the Event_scheduler_ng (NextGen) sql/event_queue.h: Now use the Event_scheduler_ng (NextGen) sql/event_scheduler.cc: This file is no more used, but will be soon. sql/event_scheduler.h: This file is no more used but will be soon sql/events.cc: Now use the Event_scheduler_ng (NextGen) sql/events.h: Now use the Event_scheduler_ng (NextGen) sql/mysqld.cc: Make it again possible to kill the scheduler thread sql/set_var.cc: Now use the Event_scheduler_ng (NextGen) sql/share/errmsg.txt: Shorten the message. sql/sql_show.cc: Loading is on a own root, then don't use thd->mem_root
Diffstat (limited to 'sql')
-rw-r--r--sql/Makefile.am6
-rw-r--r--sql/event_data_objects.cc47
-rw-r--r--sql/event_data_objects.h33
-rw-r--r--sql/event_db_repository.cc145
-rw-r--r--sql/event_queue.cc169
-rw-r--r--sql/event_queue.h30
-rw-r--r--sql/event_scheduler.cc302
-rw-r--r--sql/event_scheduler.h31
-rw-r--r--sql/event_scheduler_ng.cc686
-rw-r--r--sql/event_scheduler_ng.h121
-rw-r--r--sql/events.cc78
-rw-r--r--sql/events.h13
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/set_var.cc26
-rw-r--r--sql/share/errmsg.txt2
-rw-r--r--sql/sql_show.cc2
16 files changed, 1183 insertions, 510 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am
index 49f85b3921d..4d76cdb5080 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_array.h sql_cursor.h events.h \
sql_plugin.h authors.h sql_partition.h event_data_objects.h \
event_queue.h event_db_repository.h \
- partition_info.h partition_element.h event_scheduler.h \
+ partition_info.h partition_element.h event_scheduler_ng.h \
contributors.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
gstream.cc spatial.cc sql_help.cc sql_cursor.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
- sp_cache.cc parse_file.cc sql_trigger.cc \
- event_scheduler.cc events.cc event_data_objects.cc \
+ sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\
+ event_scheduler_ng.cc events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index f4147d72c3d..97db443e08d 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -556,6 +556,7 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
Event_timed::~Event_timed()
{
deinit_mutexes();
+ free_root(&mem_root, MYF(0));
if (free_sphead_on_delete)
free_sp();
@@ -622,6 +623,8 @@ Event_timed::init()
definer_user.length= definer_host.length= 0;
sql_mode= 0;
+ /* init memory root */
+ init_alloc_root(&mem_root, 256, 512);
DBUG_VOID_RETURN;
}
@@ -644,7 +647,7 @@ Event_timed::init()
*/
int
-Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
+Event_timed::load_from_row(TABLE *table)
{
char *ptr;
Event_timed *et;
@@ -661,22 +664,22 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
if (table->s->fields != ET_FIELD_COUNT)
goto error;
- if ((et->dbname.str= get_field(mem_root, table->field[ET_FIELD_DB])) == NULL)
+ if ((et->dbname.str= get_field(&mem_root, table->field[ET_FIELD_DB])) == NULL)
goto error;
et->dbname.length= strlen(et->dbname.str);
- if ((et->name.str= get_field(mem_root, table->field[ET_FIELD_NAME])) == NULL)
+ if ((et->name.str= get_field(&mem_root, table->field[ET_FIELD_NAME])) == NULL)
goto error;
et->name.length= strlen(et->name.str);
- if ((et->body.str= get_field(mem_root, table->field[ET_FIELD_BODY])) == NULL)
+ if ((et->body.str= get_field(&mem_root, table->field[ET_FIELD_BODY])) == NULL)
goto error;
et->body.length= strlen(et->body.str);
- if ((et->definer.str= get_field(mem_root,
+ if ((et->definer.str= get_field(&mem_root,
table->field[ET_FIELD_DEFINER])) == NullS)
goto error;
et->definer.length= strlen(et->definer.str);
@@ -688,10 +691,10 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
len= ptr - et->definer.str;
- et->definer_user.str= strmake_root(mem_root, et->definer.str, len);
+ et->definer_user.str= strmake_root(&mem_root, et->definer.str, len);
et->definer_user.length= len;
len= et->definer.length - len - 1; //1 is because of @
- et->definer_host.str= strmake_root(mem_root, ptr + 1, len);/* 1:because of @*/
+ et->definer_host.str= strmake_root(&mem_root, ptr + 1, len);/* 1:because of @*/
et->definer_host.length= len;
et->starts_null= table->field[ET_FIELD_STARTS]->is_null();
@@ -737,21 +740,21 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
last_executed_changed= false;
/* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */
- if ((ptr= get_field(mem_root, table->field[ET_FIELD_STATUS])) == NullS)
+ if ((ptr= get_field(&mem_root, table->field[ET_FIELD_STATUS])) == NullS)
goto error;
DBUG_PRINT("load_from_row", ("Event [%s] is [%s]", et->name.str, ptr));
et->status= (ptr[0]=='E'? Event_timed::ENABLED:Event_timed::DISABLED);
/* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */
- if ((ptr= get_field(mem_root,
+ if ((ptr= get_field(&mem_root,
table->field[ET_FIELD_ON_COMPLETION])) == NullS)
goto error;
et->on_completion= (ptr[0]=='D'? Event_timed::ON_COMPLETION_DROP:
Event_timed::ON_COMPLETION_PRESERVE);
- et->comment.str= get_field(mem_root, table->field[ET_FIELD_COMMENT]);
+ et->comment.str= get_field(&mem_root, table->field[ET_FIELD_COMMENT]);
if (et->comment.str != NullS)
et->comment.length= strlen(et->comment.str);
else
@@ -953,10 +956,10 @@ Event_timed::compute_next_execution_time()
int tmp;
DBUG_ENTER("Event_timed::compute_next_execution_time");
- DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu",
+ DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu this=%p",
TIME_to_ulonglong_datetime(&starts),
TIME_to_ulonglong_datetime(&ends),
- TIME_to_ulonglong_datetime(&last_executed)));
+ TIME_to_ulonglong_datetime(&last_executed), this));
if (status == Event_timed::DISABLED)
{
@@ -1168,7 +1171,8 @@ Event_timed::compute_next_execution_time()
goto ret;
}
ret:
- DBUG_PRINT("info", ("ret=0"));
+ DBUG_PRINT("info", ("ret=0 execute_at=%llu",
+ TIME_to_ulonglong_datetime(&execute_at)));
DBUG_RETURN(false);
err:
DBUG_PRINT("info", ("ret=1"));
@@ -1392,6 +1396,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
int
Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
{
+ Security_context *save_ctx;
/* this one is local and not needed after exec */
Security_context security_ctx;
int ret= 0;
@@ -1400,14 +1405,8 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]",
dbname.str, name.str, (int) expression));
- VOID(pthread_mutex_lock(&this->LOCK_running));
- if (running)
- {
- VOID(pthread_mutex_unlock(&this->LOCK_running));
- DBUG_RETURN(-100);
- }
- running= true;
- VOID(pthread_mutex_unlock(&this->LOCK_running));
+ thd->change_security_context(definer_user, definer_host, dbname,
+ &security_ctx, &save_ctx);
if (!sphead && (ret= compile(thd, mem_root)))
goto done;
@@ -1434,14 +1433,11 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
definer_host.str, dbname.str));
ret= -99;
}
-
- VOID(pthread_mutex_lock(&this->LOCK_running));
- running= false;
/* Will compile every time a new sp_head on different root */
free_sp();
- VOID(pthread_mutex_unlock(&this->LOCK_running));
done:
+ thd->restore_security_context(save_ctx);
/*
1. Don't cache sphead if allocated on another mem_root
2. Don't call security_ctx.destroy() because this will free our dbname.str
@@ -1807,3 +1803,4 @@ event_timed_identifier_equal(LEX_STRING db, LEX_STRING name, Event_timed *b)
return !sortcmp_lex_string(name, b->name, system_charset_info) &&
!sortcmp_lex_string(db, b->dbname, system_charset_info);
}
+
diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h
index d8df8dd1e6c..5ae5c7e81ab 100644
--- a/sql/event_data_objects.h
+++ b/sql/event_data_objects.h
@@ -72,8 +72,11 @@ class Event_timed
bool status_changed;
bool last_executed_changed;
+
+ MEM_ROOT mem_root;
public:
+ THD *thd;
enum enum_status
{
ENABLED = 1,
@@ -147,7 +150,7 @@ public:
deinit_mutexes();
int
- load_from_row(MEM_ROOT *mem_root, TABLE *table);
+ load_from_row(TABLE *table);
bool
compute_next_execution_time();
@@ -264,9 +267,33 @@ public:
};
-class Event_queue_element : public Event_timed
+class Event_job_data
{
+public:
+ LEX_STRING dbname;
+ LEX_STRING name;
+ sp_head *sphead;
+ LEX_STRING definer;
+ LEX_STRING body;
+ ulong sql_mode;
-};
+ THD *thd;
+
+ Event_job_data(){}
+ ~Event_job_data(){}
+ int
+ execute();
+
+private:
+ int
+ load_from_disk();
+
+ int
+ compile();
+
+
+ Event_job_data(const Event_job_data &); /* Prevent use of these */
+ void operator=(Event_job_data &);
+};
#endif /* _EVENT_DATA_OBJECTS_H_ */
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc
index 8886992c839..074e05e5d8f 100644
--- a/sql/event_db_repository.cc
+++ b/sql/event_db_repository.cc
@@ -129,136 +129,10 @@ TABLE_FIELD_W_TYPE event_table_fields[ET_FIELD_COUNT] = {
SYNOPSIS
evex_fill_row()
- thd THD
- table the row to fill out
- et Event's data
-
- RETURN VALUE
- 0 - OK
- EVEX_GENERAL_ERROR - bad data
- EVEX_GET_FIELD_FAILED - field count does not match. table corrupted?
-
- DESCRIPTION
- Used both when an event is created and when it is altered.
-*/
-
-static int
-evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update)
-{
- CHARSET_INFO *scs= system_charset_info;
- enum enum_events_table_field field_num;
-
- DBUG_ENTER("evex_fill_row");
-
- DBUG_PRINT("info", ("dbname=[%s]", et->dbname.str));
- DBUG_PRINT("info", ("name =[%s]", et->name.str));
- DBUG_PRINT("info", ("body =[%s]", et->body.str));
-
- if (table->field[field_num= ET_FIELD_DEFINER]->
- store(et->definer.str, et->definer.length, scs))
- goto err_truncate;
-
- if (table->field[field_num= ET_FIELD_DB]->
- store(et->dbname.str, et->dbname.length, scs))
- goto err_truncate;
-
- if (table->field[field_num= ET_FIELD_NAME]->
- store(et->name.str, et->name.length, scs))
- goto err_truncate;
-
- /* both ON_COMPLETION and STATUS are NOT NULL thus not calling set_notnull()*/
- table->field[ET_FIELD_ON_COMPLETION]->
- store((longlong)et->on_completion, true);
-
- table->field[ET_FIELD_STATUS]->store((longlong)et->status, true);
-
- /*
- Change the SQL_MODE only if body was present in an ALTER EVENT and of course
- always during CREATE EVENT.
- */
- if (et->body.str)
- {
- table->field[ET_FIELD_SQL_MODE]->
- store((longlong)thd->variables.sql_mode, true);
-
- if (table->field[field_num= ET_FIELD_BODY]->
- store(et->body.str, et->body.length, scs))
- goto err_truncate;
- }
-
- if (et->expression)
- {
- table->field[ET_FIELD_INTERVAL_EXPR]->set_notnull();
- table->field[ET_FIELD_INTERVAL_EXPR]->store((longlong)et->expression, true);
-
- table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_notnull();
- /*
- In the enum (C) intervals start from 0 but in mysql enum valid values start
- from 1. Thus +1 offset is needed!
- */
- table->field[ET_FIELD_TRANSIENT_INTERVAL]->
- store((longlong)et->interval+1, true);
-
- table->field[ET_FIELD_EXECUTE_AT]->set_null();
-
- if (!et->starts_null)
- {
- table->field[ET_FIELD_STARTS]->set_notnull();
- table->field[ET_FIELD_STARTS]->
- store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME);
- }
-
- if (!et->ends_null)
- {
- table->field[ET_FIELD_ENDS]->set_notnull();
- table->field[ET_FIELD_ENDS]->
- store_time(&et->ends, MYSQL_TIMESTAMP_DATETIME);
- }
- }
- else if (et->execute_at.year)
- {
- table->field[ET_FIELD_INTERVAL_EXPR]->set_null();
- table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_null();
- table->field[ET_FIELD_STARTS]->set_null();
- table->field[ET_FIELD_ENDS]->set_null();
-
- table->field[ET_FIELD_EXECUTE_AT]->set_notnull();
- table->field[ET_FIELD_EXECUTE_AT]->
- store_time(&et->execute_at, MYSQL_TIMESTAMP_DATETIME);
- }
- else
- {
- DBUG_ASSERT(is_update);
- /*
- it is normal to be here when the action is update
- this is an error if the action is create. something is borked
- */
- }
-
- ((Field_timestamp *)table->field[ET_FIELD_MODIFIED])->set_time();
-
- if (et->comment.str)
- {
- if (table->field[field_num= ET_FIELD_COMMENT]->
- store(et->comment.str, et->comment.length, scs))
- goto err_truncate;
- }
-
- DBUG_RETURN(0);
-err_truncate:
- my_error(ER_EVENT_DATA_TOO_LONG, MYF(0), table->field[field_num]->field_name);
- DBUG_RETURN(EVEX_GENERAL_ERROR);
-}
-
-
-/*
- Puts some data common to CREATE and ALTER EVENT into a row.
-
- SYNOPSIS
- evex_fill_row()
- thd THD
- table the row to fill out
- et Event's data
+ thd THD
+ table The row to fill out
+ et Event's data
+ is_update CREATE EVENT or ALTER EVENT
RETURN VALUE
0 - OK
@@ -596,7 +470,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
TABLE *table;
int ret;
Event_timed *et= NULL;
- DBUG_ENTER("db_find_event");
+ DBUG_ENTER("Event_db_repository::find_event");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
if (tbl)
@@ -621,7 +495,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
2)::load_from_row() is silent on error therefore we emit error msg here
*/
- if ((ret= et->load_from_row(root, table)))
+ if ((ret= et->load_from_row(table)))
{
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
goto done;
@@ -722,7 +596,7 @@ evex_check_params(THD *thd, Event_parse_data *parse_data)
const char *pos= NULL;
Item *bad_item;
- DBUG_ENTER("evex_check_timing_params");
+ DBUG_ENTER("evex_check_params");
DBUG_PRINT("info", ("execute_at=0x%d expr=0x%d starts=0x%d ends=0x%d",
parse_data->item_execute_at,
parse_data->item_expression,
@@ -1212,7 +1086,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
TABLE *table;
Open_tables_state backup;
READ_RECORD read_record_info;
- DBUG_ENTER("drop_events_from_table_by_field");
+ DBUG_ENTER("Event_db_repository::drop_events_by_field");
DBUG_PRINT("enter", ("field=%d field_value=%s", field, field_value.str));
if (open_event_table(thd, TL_WRITE, &table))
@@ -1270,7 +1144,7 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
Event_timed *et_loaded= NULL;
Open_tables_state backup;
- DBUG_ENTER("Event_scheduler::load_and_compile_event");
+ DBUG_ENTER("Event_db_repository::load_named_event");
DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup);
@@ -1297,4 +1171,3 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK);
}
-
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 32c5a076a62..44920b29c16 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -16,7 +16,7 @@
#include "mysql_priv.h"
#include "events.h"
-#include "event_scheduler.h"
+#include "event_scheduler_ng.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
@@ -35,10 +35,6 @@
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
-Event_scheduler*
-Event_queue::singleton= NULL;
-
-
/*
Compares the execute_at members of 2 Event_timed instances.
Used as callback for the prioritized queue when shifting
@@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
goto end;
}
- /* We need to load the event on scheduler_root */
if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new)))
{
+ DBUG_PRINT("info", ("new event in the queue %p", et_new));
queue_insert_safe(&queue, (byte *) et_new);
on_queue_change();
}
@@ -130,7 +126,7 @@ end:
Updates an event from the scheduler queue
SYNOPSIS
- Event_scheduler::update_event()
+ Event_queue::update_event()
thd Thread
et The event to replace(add) into the queue
new_schema New schema
@@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
et->dbname= *new_schema;
et->name= *new_name;
}
- /*
- We need to load the event (it's strings but on the object itself)
- on scheduler_root. et_new could be NULL :
- 1. Error occured
- 2. If the replace is DISABLED, we don't load it into the queue.
- */
+
if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new)))
{
+ DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
queue_insert_safe(&queue, (byte *) et_new);
on_queue_change();
}
@@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
/*
- Drops an event from the scheduler queue
+ Drops an event from the queue
SYNOPSIS
Event_queue::drop_event()
@@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name)
}
-
-
/*
- Searches for an event in the scheduler queue
+ Searches for an event in the queue
SYNOPSIS
Event_queue::find_event()
@@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
comparator The function to use for comparing
RETURN VALUE
- -1 Scheduler not working
>=0 Number of dropped events
NOTE
@@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
db The schema name
RETURN VALUE
- -1 Scheduler not working
>=0 Number of dropped events
*/
@@ -459,8 +447,7 @@ void
Event_queue::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_queue::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_event_queue, func, line));
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
pthread_mutex_lock(&LOCK_event_queue);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
@@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line)
void
Event_queue::unlock_data(const char *func, uint line)
{
- DBUG_ENTER("Event_queue::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- &LOCK_event_queue, func, line));
+ DBUG_ENTER("Event_queue::unlock_mutex");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_unlocked_at_line= line;
mutex_queue_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
@@ -510,7 +496,7 @@ Event_queue::events_count()
LOCK_QUEUE_DATA();
n= queue.elements;
UNLOCK_QUEUE_DATA();
-
+ DBUG_PRINT("info", ("n=%u", n));
DBUG_RETURN(n);
}
@@ -529,7 +515,7 @@ uint
Event_queue::events_count_no_lock()
{
uint n;
- DBUG_ENTER("Event_scheduler::events_count_no_lock");
+ DBUG_ENTER("Event_queue::events_count_no_lock");
n= queue.elements;
@@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd)
}
DBUG_PRINT("info", ("Loading event from row."));
- if ((ret= et->load_from_row(&scheduler_root, table)))
+ if ((ret= et->load_from_row(table)))
{
clean_the_queue= TRUE;
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
@@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd)
void
Event_queue::init_mutexes()
{
- pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
}
@@ -743,13 +729,13 @@ Event_queue::init_mutexes()
Destroys mutexes.
SYNOPSIS
- Event_queue::destroy_mutexes()
+ Event_queue::deinit_mutexes()
*/
void
-Event_queue::destroy_mutexes()
+Event_queue::deinit_mutexes()
{
- pthread_mutex_destroy(&singleton->LOCK_event_queue);
+ pthread_mutex_destroy(&LOCK_event_queue);
}
@@ -765,8 +751,8 @@ void
Event_queue::on_queue_change()
{
DBUG_ENTER("Event_queue::on_queue_change");
- DBUG_PRINT("info", ("Sending COND_new_work"));
- singleton->queue_changed();
+ DBUG_PRINT("info", ("Signalling change of the queue"));
+ scheduler->queue_changed();
DBUG_VOID_RETURN;
}
@@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo)
{
int i= 0;
bool ret= FALSE;
- DBUG_ENTER("Event_scheduler::init");
+ DBUG_ENTER("Event_queue::init");
DBUG_PRINT("enter", ("this=%p", this));
LOCK_QUEUE_DATA();
db_repository= db_repo;
- /* init memory root */
- init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_timed_compare_q, NULL, 30 /*auto_extent*/))
@@ -824,8 +808,8 @@ Event_queue::deinit()
DBUG_ENTER("Event_queue::deinit");
LOCK_QUEUE_DATA();
+ empty_queue();
delete_queue(&queue);
- free_root(&scheduler_root, MYF(0));
UNLOCK_QUEUE_DATA();
DBUG_VOID_RETURN;
@@ -835,7 +819,7 @@ Event_queue::deinit()
void
Event_queue::recalculate_queue(THD *thd)
{
- int i;
+ uint i;
for (i= 0; i < queue.elements; i++)
{
((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
@@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd)
void
Event_queue::empty_queue()
{
- int i;
+ uint i;
/* empty the queue */
for (i= 0; i < events_count_no_lock(); ++i)
{
Event_timed *et= (Event_timed *) queue_element(&queue, i);
- et->free_sp();
delete et;
}
resize_queue(&queue, 0);
}
+
+
+Event_timed*
+Event_queue::get_top()
+{
+ return (Event_timed *)queue_top(&queue);
+}
+
+
+void
+Event_queue::remove_top()
+{
+ queue_remove(&queue, 0);// 0 is top, internally 1
+}
+
+
+void
+Event_queue::top_changed()
+{
+ queue_replaced(&queue);
+}
+
+
+Event_timed *
+Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
+ struct timespec *abstime)
+{
+ struct timespec top_time;
+ Event_timed *et_new= NULL;
+ DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
+ DBUG_PRINT("enter", ("thd=%p now=%d", thd, now));
+ abstime->tv_nsec= 0;
+ LOCK_QUEUE_DATA();
+ do {
+ int res;
+ Event_timed *et= NULL;
+ if (!queue.elements)
+ {
+ abstime->tv_sec= 0;
+ break;
+ }
+ int i;
+ DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
+ for (i = 0; i < queue.elements; i++)
+ {
+ et= ((Event_timed*)queue_element(&queue, i));
+ DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
+ DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
+ " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
+ TIME_to_ulonglong_datetime(&et->execute_at),
+ TIME_to_ulonglong_datetime(&et->starts),
+ TIME_to_ulonglong_datetime(&et->ends),
+ et->expression, sec_since_epoch_TIME(&et->execute_at), now,
+ (int)(sec_since_epoch_TIME(&et->execute_at) - now),
+ sec_since_epoch_TIME(&et->execute_at) <= now));
+ }
+ et= ((Event_timed*)queue_element(&queue, 0));
+ top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
+
+ if (top_time.tv_sec <= now)
+ {
+ DBUG_PRINT("info", ("Ready for execution"));
+ abstime->tv_sec= 0;
+ if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
+ &et_new)))
+ {
+ DBUG_ASSERT(0);
+ break;
+ }
+
+ et->mark_last_executed(thd);
+ if (et->compute_next_execution_time())
+ et->status= Event_timed::DISABLED;
+ DBUG_PRINT("info", ("event's status is %d", et->status));
+
+ et->update_fields(thd);
+ if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
+ (et->status == Event_timed::DISABLED))
+ {
+ DBUG_PRINT("info", ("removing from the queue"));
+ if (et->dropped)
+ et->drop(thd);
+ delete et;
+ queue_remove(&queue, 0);
+ }
+ else
+ queue_replaced(&queue);
+ }
+ else
+ {
+ abstime->tv_sec= top_time.tv_sec;
+ DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
+ abstime->tv_sec));
+ }
+ } while (0);
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new,
+ abstime->tv_sec));
+ if (et_new)
+ DBUG_PRINT("info", ("db=%s name=%s definer=%s "
+ "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str,
+ et_new->definer.str,
+ TIME_to_ulonglong_datetime(&et_new->execute_at)));
+ DBUG_RETURN(et_new);
+}
diff --git a/sql/event_queue.h b/sql/event_queue.h
index 8c11d7a2042..1335100be21 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -19,22 +19,23 @@
class sp_name;
class Event_timed;
class Event_db_repository;
+class Event_job_data;
class THD;
typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
-class Event_scheduler;
+class Event_scheduler_ng;
class Event_queue
{
public:
Event_queue();
- static void
+ void
init_mutexes();
- static void
- destroy_mutexes();
+ void
+ deinit_mutexes();
bool
init(Event_db_repository *db_repo);
@@ -76,6 +77,18 @@ public:
void
empty_queue();
+ Event_timed *
+ get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime);
+
+ Event_timed*
+ get_top();
+
+ void
+ remove_top();
+
+ void
+ top_changed();
+
///////////////protected
Event_timed *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
@@ -92,9 +105,6 @@ public:
Event_db_repository *db_repository;
- /* The MEM_ROOT of the object */
- MEM_ROOT scheduler_root;
-
/* The sorted queue with the Event_timed objects */
QUEUE queue;
@@ -111,11 +121,11 @@ public:
void
unlock_data(const char *func, uint line);
- static void
+ void
on_queue_change();
+
+ Event_scheduler_ng *scheduler;
protected:
- /* Singleton instance */
- static Event_scheduler *singleton;
};
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index fb60ce8ae6d..3a9b988f92e 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -20,224 +20,9 @@
#include "event_scheduler.h"
#include "event_db_repository.h"
#include "sp_head.h"
-
-/*
- ToDo:
- 1. Talk to Alik to get a check for configure.in for my_time_t and time_t
- 2. Look at guardian.h|cc to see its life cycle, has similarities.
-*/
+#include "event_queue.h"
-/*
- The scheduler is implemented as class Event_scheduler. Only one instance is
- kept during the runtime of the server, by implementing the Singleton DP.
- Object instance is always there because the memory is allocated statically
- and initialized when the OS loader loads mysqld. This initialization is
- bare. Extended initialization is done during the call to
- Event_scheduler::init() in Events::init(). The reason for that late initialization
- is that some subsystems needed to boot the Scheduler are not available at
- earlier stages of the mysqld boot procedure. Events::init() is called in
- mysqld.cc . If the mysqld is started with --event-scheduler=0 then
- no initialization takes place and the scheduler is unavailable during this
- server run. The server should be started with --event-scheduler=1 to have
- the scheduler initialized and able to execute jobs. This starting alwa
- s implies that the jobs execution will start immediately. If the server
- is started with --event-scheduler=2 then the scheduler is started in suspended
- state. Default state, if --event-scheduler is not specified is 2.
-
- The scheduler only manages execution of the events. Their creation,
- alteration and deletion is delegated to other routines found in event.cc .
- These routines interact with the scheduler :
- - CREATE EVENT -> Event_scheduler::create_event()
- - ALTER EVENT -> Event_scheduler::update_event()
- - DROP EVENT -> Event_scheduler::drop_event()
-
- There is one mutex in the single Event_scheduler object which controls
- the simultaneous access to the objects invariants. Using one lock makes
- it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is
- initialized in Event_scheduler::init(). Which in turn is called by the
- Facade class Events in event.cc, coming from init_thread_environment() from
- mysqld.cc -> no concurrency at this point. It's destroyed in
- Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc .
-
- The full initialization is done in Event_scheduler::init() called from
- Events::init(). It's done before any requests coming in, so this is a
- guarantee for not having concurrency.
-
- The scheduler is started with Event_scheduler::start() and stopped with
- Event_scheduler::stop(). When the scheduler starts it loads all events
- from mysql.event table. Unfortunately, there is a race condition between
- the event disk management functions and the scheduler ones
- (add/replace/drop_event & load_events_from_db()), because the operations
- do not happen under one global lock but the disk operations are guarded
- by the MYISAM lock on mysql.event. In the same time, the queue operations
- are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during
- server startup and stopped()-ed during server shutdown (in Events::shutdown()
- called by kill_server() in mysqld.cc) these races does not exist.
-
- Since the user may want to temporarily inhibit execution of events the
- scheduler can be suspended and then it can be forced to resume its
- operations. The API call to perform these is
- Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) .
- When the scheduler is suspended the main scheduler thread, which ATM
- happens to have thread_id 1, locks on a condition COND_suspend_or_resume.
- When this is signal is sent for the reverse operation the main scheduler
- loops continues to roll and execute events.
-
- When the scheduler is suspended all add/replace/drop_event() operations
- work as expected and the modify the queue but no events execution takes
- place.
-
- In contrast to the previous scheduler implementation, found in
- event_executor.cc, the start, shutdown, suspend and resume are synchronous
- operations. As a whole all operations are synchronized and no busy waits
- are used except in stop_all_running_events(), which waits until all
- running event worker threads have finished. It would have been nice to
- use a conditional on which this method will wait and the last thread to
- finish would signal it but this implies subclassing THD.
-
- The scheduler does not keep a counter of how many event worker threads are
- running, at any specific moment, because this will copy functionality
- already existing in the server. Namely, all THDs are registered in the
- global `threads` array. THD has member variable system_thread which
- identifies the type of thread. Connection threads being NON_SYSTEM_THREAD,
- all other have their enum value. Important for the scheduler are
- SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER.
-
- Class THD subclasses class ilink, which is the linked list of all threads.
- When a THD instance is destroyed it's being removed from threads, thus
- no manual intervention is needed. On the contrary registering is manual
- with threads.append() . Traversing the threads array every time a subclass
- of THD, for instance if we would have had THD_scheduler_worker to see
- how many events we have and whether the scheduler is shutting down will
- take much time and lead to a deadlock. stop_all_running_events() is called
- under LOCK_scheduler_data. If the THD_scheduler_worker was aware of
- the single Event_scheduler instance it will try to check
- Event_scheduler::state but for this it would need to acquire
- LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a
- busy wait.
-
- DROP DATABASE DDL should drop all events defined in a specific schema.
- DROP USER also should drop all events who has as definer the user being
- dropped (this one is not addressed at the moment but a hook exists). For
- this specific needs Event_scheduler::drop_matching_events() is
- implemented. Which expects a callback to be applied on every object in
- the queue. Thus events that match specific schema or user, will be
- removed from the queue. The exposed interface is :
- - Event_scheduler::drop_schema_events()
- - Event_scheduler::drop_user_events()
-
- This bulk dropping happens under LOCK_scheduler_data, thus no two or
- more threads can execute it in parallel. However, DROP DATABASE is also
- synchronized, currently, in the server thus this does not impact the
- overall performance. In addition, DROP DATABASE is not that often
- executed DDL.
-
- Though the interface to the scheduler is only through the public methods
- of class Event_scheduler, there are currently few functions which are
- used during its operations. Namely :
- - static evex_print_warnings()
- After every event execution all errors/warnings are dumped, so the user
- can see in case of a problem what the problem was.
-
- - static init_event_thread()
- This function is both used by event_scheduler_thread() and
- event_worker_thread(). It initializes the THD structure. The
- initialization looks pretty similar to the one in slave.cc done for the
- replication threads. However, though the similarities it cannot be
- factored out to have one routine.
-
- - static event_scheduler_thread()
- Because our way to register functions to be used by the threading library
- does not allow usage of static methods this function is used to start the
- scheduler in it. It does THD initialization and then calls
- Event_scheduler::run().
-
- - static event_worker_thread()
- With already stated the reason for not being able to use methods, this
- function executes the worker threads.
-
- The execution of events is, to some extent, synchronized to inhibit race
- conditions when Event_timed::thread_id is being updated with the thread_id of
- the THD in which the event is being executed. The thread_id is in the
- Event_timed object because we need to be able to kill quickly a specific
- event during ALTER/DROP EVENT without traversing the global `threads` array.
- However, this makes the scheduler's code more complicated. The event worker
- thread is started by Event_timed::spawn_now(), which in turn calls
- pthread_create(). The thread_id which will be associated in init_event_thread
- is not known in advance thus the registering takes place in
- event_worker_thread(). This registering has to be synchronized under
- LOCK_scheduler_data, so no kill_event() on a object in
- replace_event/drop_event/drop_matching_events() could take place.
-
- This synchronization is done through class Worker_thread_param that is
- local to this file. Event_scheduler::execute_top() is called under
- LOCK_scheduler_data. This method :
- 1. Creates an instance of Worker_thread_param on the stack
- 2. Locks Worker_thread_param::LOCK_started
- 3. Calls Event_timed::spawn_now() which in turn creates a new thread.
- 4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the
- worker thread send signal. The code is spurious wake-up safe because
- Worker_thread_param::started is checked.
- 5. The worker thread initializes its THD, then sets Event_timed::thread_id,
- sets Worker_thread_param::started to TRUE and sends back
- Worker_thread_param::COND_started. From this moment on, the event
- is being executed and could be killed by using Event_timed::thread_id.
- When Event_timed::spawn_thread_finish() is called in the worker thread,
- it sets thread_id to 0. From this moment on, the worker thread should not
- touch the Event_timed instance.
-
-
- The life-cycle of the server is a FSA.
- enum enum_state Event_scheduler::state keeps the state of the scheduler.
-
- The states are:
-
- |---UNINITIALIZED
- |
- | |------------------> IN_SHUTDOWN
- --> INITIALIZED -> COMMENCING ---> RUNNING ----------|
- ^ ^ | | ^ |
- | |- CANTSTART <--| | |- SUSPENDED <-|
- |______________________________|
-
- - UNINITIALIZED :The object is created and only the mutex is initialized
- - INITIALIZED :All member variables are initialized
- - COMMENCING :The scheduler is starting, no other attempt to start
- should succeed before the state is back to INITIALIZED.
- - CANTSTART :Set by the ::run() method in case it can't start for some
- reason. In this case the connection thread that tries to
- start the scheduler sees that some error has occurred and
- returns an error to the user. Finally, the connection
- thread sets the state to INITIALIZED, so further attempts
- to start the scheduler could be made.
- - RUNNING :The scheduler is running. New events could be added,
- dropped, altered. The scheduler could be stopped.
- - SUSPENDED :Like RUNNING but execution of events does not take place.
- Operations on the memory queue are possible.
- - IN_SHUTDOWN :The scheduler is shutting down, due to request by setting
- the global event_scheduler to 0/FALSE, or because of a
- KILL command sent by a user to the master thread.
-
- In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA()
- are used for (un)locking purposes. They are used to save the programmer
- from typing everytime
- lock_data(__FUNCTION__, __LINE__);
- All locking goes through Event_scheduler::lock_data() and ::unlock_data().
- These two functions then record in variables where for last time
- LOCK_scheduler_data was locked and unlocked (two different variables). In
- multithreaded environment, in some cases they make no sense but are useful for
- inspecting deadlocks without having the server debug log turned on and the
- server is still running.
-
- The same strategy is used for conditional variables.
- Event_scheduler::cond_wait() is invoked from all places with parameter
- an enum enum_cond_vars. In this manner, it's possible to inspect the last
- on which condition the last call to cond_wait() was waiting. If the server
- was started with debug trace switched on, the trace file also holds information
- about conditional variables used.
-*/
-
#ifdef __GNUC__
#if __GNUC__ >= 2
#define SCHED_FUNC __FUNCTION__
@@ -250,6 +35,10 @@
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
+Event_scheduler*
+Event_scheduler::singleton= NULL;
+
+
#ifndef DBUG_OFF
static
LEX_STRING states_names[] =
@@ -462,7 +251,7 @@ event_scheduler_thread(void *arg)
thd->security_ctx->set_user((char*)"event_scheduler");
sql_print_information("SCHEDULER: Manager thread booting");
- if (Event_scheduler::check_system_tables(thd))
+ if (Event_scheduler::get_instance()->event_queue->check_system_tables(thd))
scheduler->report_error_during_start();
else
scheduler->run(thd);
@@ -625,13 +414,13 @@ event_worker_thread(void *arg)
Event_scheduler::Event_scheduler()
{
thread_id= 0;
- mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0;
- mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= "";
+ mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
+ mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
cond_waiting_on= COND_NONE;
mutex_scheduler_data_locked= FALSE;
state= UNINITIALIZED;
start_scheduler_suspended= FALSE;
- LOCK_scheduler_data= &LOCK_event_queue;
+ LOCK_scheduler_data= &LOCK_data;
}
@@ -647,9 +436,10 @@ Event_scheduler::Event_scheduler()
*/
void
-Event_scheduler::create_instance()
+Event_scheduler::create_instance(Event_queue *queue)
{
singleton= new Event_scheduler();
+ singleton->event_queue= queue;
}
/*
@@ -689,8 +479,8 @@ Event_scheduler::init(Event_db_repository *db_repo)
DBUG_ENTER("Event_scheduler::init");
DBUG_PRINT("enter", ("this=%p", this));
- Event_queue::init(db_repo);
LOCK_SCHEDULER_DATA();
+ init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
for (;i < COND_LAST; i++)
if (pthread_cond_init(&cond_vars[i], NULL))
{
@@ -720,7 +510,6 @@ void
Event_scheduler::destroy()
{
DBUG_ENTER("Event_scheduler");
- Event_queue::deinit();
LOCK_SCHEDULER_DATA();
switch (state) {
case UNINITIALIZED:
@@ -879,7 +668,7 @@ Event_scheduler::run(THD *thd)
DBUG_PRINT("enter", ("thd=%p", thd));
LOCK_SCHEDULER_DATA();
- ret= load_events_from_db(thd);
+ ret= event_queue->load_events_from_db(thd);
if (!ret)
{
@@ -923,8 +712,9 @@ Event_scheduler::run(THD *thd)
}
DBUG_ASSERT(state == RUNNING);
- et= (Event_timed *)queue_top(&queue);
-
+// et= (Event_timed *)queue_top(&event_queue->queue);
+ et= event_queue->get_top();
+
/* Skip disabled events */
if (et->status != Event_timed::ENABLED)
{
@@ -935,7 +725,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue",
et->dbname.length, et->dbname.str, et->name.length,
et->name.str);
- queue_remove(&queue, 0);
+ queue_remove(&event_queue->queue, 0);
/* ToDo: check this again */
if (et->dropped)
et->drop(thd);
@@ -1095,16 +885,16 @@ Event_scheduler::execute_top(THD *thd, Event_timed *et)
sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
et->dbname.str, et->name.str);
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
- queue_remove(&queue, 0);// 0 is top, internally 1
+ event_queue->remove_top();
else
- queue_replaced(&queue);
+ event_queue->top_changed();
break;
default:
DBUG_ASSERT(!spawn_ret_code);
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
- queue_remove(&queue, 0);// 0 is top, internally 1
+ event_queue->remove_top();
else
- queue_replaced(&queue);
+ event_queue->top_changed();
/*
We don't lock LOCK_scheduler_data here because it's a pre-requisite
for calling the current_method.
@@ -1152,7 +942,7 @@ Event_scheduler::clean_memory(THD *thd)
sql_print_information("SCHEDULER: Emptying the queue");
- empty_queue();
+ event_queue->empty_queue();
DBUG_VOID_RETURN;
}
@@ -1432,7 +1222,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
if (was_suspended)
{
- recalculate_queue(thd);
+ event_queue->recalculate_queue(thd);
/* This will implicitly unlock LOCK_scheduler_data */
thd->exit_cond("");
}
@@ -1461,14 +1251,14 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
bool slept= FALSE;
DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue");
DBUG_PRINT("enter", ("q.elements=%lu state=%s",
- events_count_no_lock(), states_names[state]));
+ event_queue->events_count_no_lock(), states_names[state]));
- if (!events_count_no_lock())
+ if (!event_queue->events_count_no_lock())
thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data,
"Empty queue, sleeping");
/* Wait in a loop protecting against catching spurious signals */
- while (!events_count_no_lock() && state == RUNNING)
+ while (!event_queue->events_count_no_lock() && state == RUNNING)
{
slept= TRUE;
DBUG_PRINT("info", ("Entering condition because of empty queue"));
@@ -1485,7 +1275,7 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
thd->exit_cond("");
DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d",
- events_count_no_lock(), states_names[state], thd->killed));
+ event_queue->events_count_no_lock(), states_names[state], thd->killed));
DBUG_RETURN(slept);
}
@@ -1627,7 +1417,7 @@ Event_scheduler::dump_internal_status(THD *thd)
/* queue.elements */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue.elements"), scs);
- int_string.set((longlong) scheduler->events_count_no_lock(), scs);
+ int_string.set((longlong) scheduler->event_queue->events_count_no_lock(), scs);
protocol->store(&int_string);
ret= protocol->write();
@@ -1663,8 +1453,8 @@ Event_scheduler::lock_data(const char *func, uint line)
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
&LOCK_scheduler_data, func, line));
pthread_mutex_lock(LOCK_scheduler_data);
- mutex_last_locked_in_func_name= func;
- mutex_last_locked_at_line_nr= line;
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE;
DBUG_VOID_RETURN;
}
@@ -1685,9 +1475,9 @@ Event_scheduler::unlock_data(const char *func, uint line)
DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
LOCK_scheduler_data, func, line));
- mutex_last_unlocked_at_line_nr= line;
+ mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func_name= func;
+ mutex_last_unlocked_in_func= func;
pthread_mutex_unlock(LOCK_scheduler_data);
DBUG_VOID_RETURN;
}
@@ -1733,3 +1523,31 @@ Event_scheduler::queue_changed()
pthread_cond_signal(&cond_vars[COND_new_work]);
DBUG_VOID_RETURN;
}
+
+
+/*
+ Inits mutexes.
+
+ SYNOPSIS
+ Event_scheduler::init_mutexes()
+*/
+
+void
+Event_scheduler::init_mutexes()
+{
+ pthread_mutex_init(singleton->LOCK_scheduler_data, MY_MUTEX_INIT_FAST);
+}
+
+
+/*
+ Destroys mutexes.
+
+ SYNOPSIS
+ Event_queue::destroy_mutexes()
+*/
+
+void
+Event_scheduler::destroy_mutexes()
+{
+ pthread_mutex_destroy(singleton->LOCK_scheduler_data);
+}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index b4007d88976..7d02e98d4fe 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -19,6 +19,7 @@
class sp_name;
class Event_timed;
class Event_db_repository;
+class Event_queue;
class THD;
@@ -31,7 +32,7 @@ events_shutdown();
#include "event_queue.h"
#include "event_scheduler.h"
-class Event_scheduler : public Event_queue
+class Event_scheduler
{
public:
enum enum_state
@@ -56,7 +57,13 @@ public:
static void
- create_instance();
+ create_instance(Event_queue *queue);
+
+ static void
+ init_mutexes();
+
+ static void
+ destroy_mutexes();
/* Singleton access */
static Event_scheduler*
@@ -122,6 +129,8 @@ public:
void
queue_changed();
+ Event_queue *event_queue;
+
protected:
uint
@@ -147,9 +156,11 @@ protected:
/* Singleton DP is used */
Event_scheduler();
-
+ pthread_mutex_t LOCK_data;
pthread_mutex_t *LOCK_scheduler_data;
-
+
+ /* The MEM_ROOT of the object */
+ MEM_ROOT scheduler_root;
/* Set to start the scheduler in suspended state */
bool start_scheduler_suspended;
@@ -172,18 +183,20 @@ protected:
COND_LAST
};
- uint mutex_last_locked_at_line_nr;
- uint mutex_last_unlocked_at_line_nr;
- const char* mutex_last_locked_in_func_name;
- const char* mutex_last_unlocked_in_func_name;
+ uint mutex_last_locked_at_line;
+ uint mutex_last_unlocked_at_line;
+ const char* mutex_last_locked_in_func;
+ const char* mutex_last_unlocked_in_func;
int cond_waiting_on;
bool mutex_scheduler_data_locked;
-
static const char * const cond_vars_names[COND_LAST];
pthread_cond_t cond_vars[COND_LAST];
+ /* Singleton instance */
+ static Event_scheduler *singleton;
+
private:
/* Prevent use of these */
Event_scheduler(const Event_scheduler &);
diff --git a/sql/event_scheduler_ng.cc b/sql/event_scheduler_ng.cc
new file mode 100644
index 00000000000..9dc3bb26bc7
--- /dev/null
+++ b/sql/event_scheduler_ng.cc
@@ -0,0 +1,686 @@
+/* Copyright (C) 2004-2006 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+#include "events.h"
+#include "event_data_objects.h"
+#include "event_scheduler_ng.h"
+#include "event_queue.h"
+
+#ifdef __GNUC__
+#if __GNUC__ >= 2
+#define SCHED_FUNC __FUNCTION__
+#endif
+#else
+#define SCHED_FUNC "<unknown>"
+#endif
+
+#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
+#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
+
+extern pthread_attr_t connection_attrib;
+
+struct scheduler_param
+{
+ THD *thd;
+ Event_scheduler_ng *scheduler;
+};
+
+struct scheduler_param scheduler_param_value;
+
+
+
+static
+LEX_STRING scheduler_states_names[] =
+{
+ { C_STRING_WITH_LEN("INITIALIZED")},
+ { C_STRING_WITH_LEN("RUNNING")},
+ { C_STRING_WITH_LEN("STOPPING")}
+};
+
+
+class Worker_thread_param
+{
+public:
+ Event_timed *et;
+ pthread_mutex_t LOCK_started;
+ pthread_cond_t COND_started;
+ bool started;
+
+ Worker_thread_param(Event_timed *etn):et(etn), started(FALSE)
+ {
+ pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_started, NULL);
+ }
+
+ ~Worker_thread_param()
+ {
+ pthread_mutex_destroy(&LOCK_started);
+ pthread_cond_destroy(&COND_started);
+ }
+};
+
+
+/*
+ Prints the stack of infos, warnings, errors from thd to
+ the console so it can be fetched by the logs-into-tables and
+ checked later.
+
+ SYNOPSIS
+ evex_print_warnings
+ thd - thread used during the execution of the event
+ et - the event itself
+*/
+
+static void
+evex_print_warnings(THD *thd, Event_timed *et)
+{
+ MYSQL_ERROR *err;
+ DBUG_ENTER("evex_print_warnings");
+ if (!thd->warn_list.elements)
+ DBUG_VOID_RETURN;
+
+ char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
+ char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
+ String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
+ prefix.length(0);
+ prefix.append("SCHEDULER: [");
+
+ append_identifier(thd, &prefix, et->definer.str, et->definer.length);
+ prefix.append("][", 2);
+ append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
+ prefix.append('.');
+ append_identifier(thd,&prefix, et->name.str, et->name.length);
+ prefix.append("] ", 2);
+
+ List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ while ((err= it++))
+ {
+ String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
+ /* set it to 0 or we start adding at the end. That's the trick ;) */
+ err_msg.length(0);
+ err_msg.append(prefix);
+ err_msg.append(err->msg, strlen(err->msg), system_charset_info);
+ err_msg.append("]");
+ DBUG_ASSERT(err->level < 3);
+ (sql_print_message_handlers[err->level])("%*s", err_msg.length(),
+ err_msg.c_ptr());
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Inits an scheduler thread handler, both the main and a worker
+
+ SYNOPSIS
+ init_event_thread()
+ thd - the THD of the thread. Has to be allocated by the caller.
+
+ NOTES
+ 1. The host of the thead is my_localhost
+ 2. thd->net is initted with NULL - no communication.
+
+ RETURN VALUE
+ 0 OK
+ -1 Error
+*/
+
+static int
+init_scheduler_thread(THD* thd)
+{
+ DBUG_ENTER("init_event_thread");
+ thd->client_capabilities= 0;
+ thd->security_ctx->master_access= 0;
+ thd->security_ctx->db_access= 0;
+ thd->security_ctx->host_or_ip= (char*)my_localhost;
+ thd->security_ctx->set_user((char*)"event_scheduler");
+ my_net_init(&thd->net, 0);
+ thd->net.read_timeout= slave_net_timeout;
+ thd->slave_thread= 0;
+ thd->options|= OPTION_AUTO_IS_NULL;
+ thd->client_capabilities|= CLIENT_MULTI_RESULTS;
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ thd->thread_id= thread_id++;
+ threads.append(thd);
+ thread_count++;
+ thread_running++;
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+
+ /*
+ Guarantees that we will see the thread in SHOW PROCESSLIST though its
+ vio is NULL.
+ */
+ thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
+
+ thd->proc_info= "Initialized";
+ thd->version= refresh_version;
+ thd->set_time();
+
+ DBUG_RETURN(0);
+}
+
+
+pthread_handler_t
+event_scheduler_ng_thread(void *arg)
+{
+ /* needs to be first for thread_stack */
+ THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
+
+ thd->thread_stack= (char *)&thd; // remember where our stack is
+ DBUG_ENTER("event_scheduler_ng_thread");
+
+ my_thread_init();
+ pthread_detach_this_thread();
+ thd->real_id=pthread_self();
+ if (init_thr_lock() || thd->store_globals())
+ {
+ thd->cleanup();
+ goto end;
+ }
+
+#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
+#endif
+
+ ((struct scheduler_param *) arg)->scheduler->run(thd);
+
+end:
+ thd->proc_info= "Clearing";
+ DBUG_ASSERT(thd->net.buff != 0);
+ net_end(&thd->net);
+ DBUG_PRINT("exit", ("Scheduler thread finishing"));
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ my_thread_end();
+}
+
+
+/*
+ Function that executes an event in a child thread. Setups the
+ environment for the event execution and cleans after that.
+
+ SYNOPSIS
+ event_worker_ng_thread()
+ arg The Event_timed object to be processed
+
+ RETURN VALUE
+ 0 OK
+*/
+
+pthread_handler_t
+event_worker_ng_thread(void *arg)
+{
+ /* needs to be first for thread_stack */
+ THD *thd;
+ Event_timed *event= (Event_timed *)arg;
+ int ret;
+
+ thd= event->thd;
+ thd->thread_stack= (char *) &thd;
+
+ DBUG_ENTER("event_worker_thread");
+ DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str));
+
+ my_thread_init();
+ pthread_detach_this_thread();
+ thd->real_id=pthread_self();
+ if (init_thr_lock() || thd->store_globals())
+ {
+ thd->cleanup();
+ goto end;
+ }
+
+#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
+#endif
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
+ event->dbname.str, event->name.str,
+ event->definer.str, thd->thread_id);
+
+ thd->init_for_queries();
+ thd->enable_slow_log= TRUE;
+
+ ret= event->execute(thd, thd->mem_root);
+
+ evex_print_warnings(thd, event);
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d",
+ event->dbname.str, event->name.str,
+ event->definer.str, ret);
+ if (ret == EVEX_COMPILE_ERROR)
+ sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ event->dbname.str, event->name.str,
+ event->definer.str);
+ else if (ret == EVEX_MICROSECOND_UNSUP)
+ sql_print_information("SCHEDULER: MICROSECOND is not supported");
+
+ DBUG_PRINT("info", ("master_access=%d db_access=%d",
+ thd->security_ctx->master_access, thd->security_ctx->db_access));
+
+end:
+ thd->proc_info= "Clearing";
+ DBUG_ASSERT(thd->net.buff != 0);
+ /*
+ Free it here because net.vio is NULL for us => THD::~THD will check it
+ and won't call net_end(&net); See also replication code.
+ */
+ net_end(&thd->net);
+ DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id));
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ thread_count--;
+ thread_running--;
+ delete thd;
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ delete event;
+
+ my_thread_end();
+}
+
+
+bool
+Event_scheduler_ng::init(Event_queue *q)
+{
+ thread_id= 0;
+ state= INITIALIZED;
+ /* init memory root */
+
+ queue= q;
+
+ return FALSE;
+}
+
+
+void
+Event_scheduler_ng::deinit()
+{
+}
+
+
+void
+Event_scheduler_ng::init_mutexes()
+{
+ pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_state, NULL);
+}
+
+
+void
+Event_scheduler_ng::deinit_mutexes()
+{
+ pthread_mutex_destroy(&LOCK_scheduler_state);
+ pthread_cond_destroy(&COND_state);
+}
+
+
+bool
+Event_scheduler_ng::start()
+{
+ THD *new_thd= NULL;
+ bool ret= FALSE;
+ pthread_t th;
+ DBUG_ENTER("Event_scheduler_ng::start");
+
+ LOCK_SCHEDULER_DATA();
+ if (state > INITIALIZED)
+ goto end;
+
+ if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
+ {
+ sql_print_error("SCHEDULER: Cannot init manager event thread.");
+ ret= TRUE;
+ goto end;
+ }
+
+ scheduler_param_value.thd= new_thd;
+ scheduler_param_value.scheduler= this;
+
+ if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread,
+ (void*)&scheduler_param_value))
+ {
+ DBUG_PRINT("error", ("cannot create a new thread"));
+ state= INITIALIZED;
+ ret= TRUE;
+ }
+
+ state= RUNNING;
+end:
+ UNLOCK_SCHEDULER_DATA();
+
+ if (ret && new_thd)
+ {
+ new_thd->proc_info= "Clearing";
+ DBUG_ASSERT(new_thd->net.buff != 0);
+ net_end(&new_thd->net);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete new_thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ }
+ DBUG_RETURN(ret);
+}
+
+
+bool
+Event_scheduler_ng::stop()
+{
+ THD *thd= current_thd;
+ DBUG_ENTER("Event_scheduler_ng::stop");
+ DBUG_PRINT("enter", ("thd=%p", current_thd));
+
+ LOCK_SCHEDULER_DATA();
+ if (state != RUNNING)
+ goto end;
+
+ state= STOPPING;
+
+ DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
+ sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
+
+ pthread_cond_signal(&COND_state);
+
+ /* Guarantee we don't catch spurious signals */
+ sql_print_information("SCHEDULER: Waiting the manager thread to reply");
+ do {
+ DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
+ "thread. Current value of state is %s . "
+ "workers count=%d", scheduler_states_names[state].str,
+ workers_count()));
+ /* thd could be 0x0, when shutting down */
+ pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
+ } while (state == STOPPING);
+ DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
+end:
+ UNLOCK_SCHEDULER_DATA();
+ DBUG_RETURN(FALSE);
+}
+
+
+bool
+Event_scheduler_ng::run(THD *thd)
+{
+ struct timespec abstime;
+ Event_timed *job_data;
+
+ LOCK_SCHEDULER_DATA();
+
+ thread_id= thd->thread_id;
+ sql_print_information("SCHEDULER: Manager thread started with id %lu",
+ thread_id);
+ while (state == RUNNING)
+ {
+ thd->end_time();
+ /* Gets a minimized version */
+ job_data= queue->get_top_for_execution_if_time(thd, thd->query_start(),
+ &abstime);
+ DBUG_PRINT("info", ("get_top returned job_data=%p now=%d abs_time.tv_sec=%d",
+ job_data, thd->query_start(), abstime.tv_sec));
+ if (!job_data && !abstime.tv_sec)
+ {
+ thd->enter_cond(&COND_state, &LOCK_scheduler_state,
+ "Waiting on empty queue");
+ pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
+ thd->exit_cond("");
+ DBUG_PRINT("info", ("Woke up. Got COND_state"));
+ LOCK_SCHEDULER_DATA();
+ }
+ else if (abstime.tv_sec)
+ {
+ thd->enter_cond(&COND_state, &LOCK_scheduler_state,
+ "Waiting for next activation");
+ pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, &abstime);
+ /*
+ If we get signal we should recalculate the whether it's the right time
+ because there could be :
+ 1. Spurious wake-up
+ 2. The top of the queue was changed (new one becase of create/update)
+ */
+ /* This will do implicit UNLOCK_SCHEDULER_DATA() */
+ thd->exit_cond("");
+ DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
+ LOCK_SCHEDULER_DATA();
+ }
+ else
+ {
+ int res;
+ UNLOCK_SCHEDULER_DATA();
+ res= execute_top(thd, job_data);
+ LOCK_SCHEDULER_DATA();
+ if (res)
+ break;
+ }
+ DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
+ }
+ DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
+ pthread_cond_signal(&COND_state);
+error:
+ state= INITIALIZED;
+ stop_all_running_events(thd);
+ UNLOCK_SCHEDULER_DATA();
+ sql_print_information("SCHEDULER: Stopped");
+
+ return FALSE;
+}
+
+
+bool
+Event_scheduler_ng::execute_top(THD *thd, Event_timed *job_data)
+{
+ THD *new_thd;
+ pthread_t th;
+ DBUG_ENTER("Event_scheduler_ng::execute_top");
+ if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
+ goto error;
+
+ /* Major failure */
+ job_data->thd= new_thd;
+ DBUG_PRINT("info", ("Starting new thread for %s@%s",
+ job_data->dbname.str, job_data->name.str));
+ if (pthread_create(&th, &connection_attrib, event_worker_ng_thread, job_data))
+ goto error;
+
+ DBUG_RETURN(FALSE);
+
+error:
+ if (new_thd)
+ {
+ new_thd->proc_info= "Clearing";
+ DBUG_ASSERT(new_thd->net.buff != 0);
+ net_end(&new_thd->net);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count--;
+ thread_running--;
+ delete new_thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ }
+ DBUG_RETURN(TRUE);
+}
+
+
+enum Event_scheduler_ng::enum_state
+Event_scheduler_ng::get_state()
+{
+ enum Event_scheduler_ng::enum_state ret;
+ LOCK_SCHEDULER_DATA();
+ ret= state;
+ UNLOCK_SCHEDULER_DATA();
+ return ret;
+}
+
+
+int
+Event_scheduler_ng::dump_internal_status(THD *thd)
+{
+ return 1;
+
+}
+
+
+uint
+Event_scheduler_ng::workers_count()
+{
+ THD *tmp;
+ uint count= 0;
+
+ DBUG_ENTER("Event_scheduler_ng::workers_count");
+ VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (tmp->command == COM_DAEMON)
+ continue;
+ if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
+ ++count;
+ }
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ DBUG_PRINT("exit", ("%d", count));
+ DBUG_RETURN(count);
+}
+
+
+/*
+ Stops all running events
+
+ SYNOPSIS
+ Event_scheduler::stop_all_running_events()
+ thd Thread
+
+ NOTE
+ LOCK_scheduler data must be acquired prior to call to this method
+*/
+
+void
+Event_scheduler_ng::stop_all_running_events(THD *thd)
+{
+ CHARSET_INFO *scs= system_charset_info;
+ uint i;
+ DYNAMIC_ARRAY running_threads;
+ THD *tmp;
+ DBUG_ENTER("Event_scheduler::stop_all_running_events");
+ DBUG_PRINT("enter", ("workers_count=%d", workers_count()));
+
+ my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10);
+
+ bool had_super= FALSE;
+ VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (tmp->command == COM_DAEMON)
+ continue;
+ if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
+ push_dynamic(&running_threads, (gptr) &tmp->thread_id);
+ }
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+
+ /* We need temporarily SUPER_ACL to be able to kill our offsprings */
+ if (!(thd->security_ctx->master_access & SUPER_ACL))
+ thd->security_ctx->master_access|= SUPER_ACL;
+ else
+ had_super= TRUE;
+
+ char tmp_buff[10*STRING_BUFFER_USUAL_SIZE];
+ char int_buff[STRING_BUFFER_USUAL_SIZE];
+ String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
+ String int_string(int_buff, sizeof(int_buff), scs);
+ tmp_string.length(0);
+
+ for (i= 0; i < running_threads.elements; ++i)
+ {
+ int ret;
+ ulong thd_id= *dynamic_element(&running_threads, i, ulong*);
+
+ int_string.set((longlong) thd_id,scs);
+ tmp_string.append(int_string);
+ if (i < running_threads.elements - 1)
+ tmp_string.append(' ');
+
+ if ((ret= kill_one_thread(thd, thd_id, FALSE)))
+ {
+ sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret);
+ break;
+ }
+ }
+ if (running_threads.elements)
+ sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr());
+
+ if (!had_super)
+ thd->security_ctx->master_access &= ~SUPER_ACL;
+
+ delete_dynamic(&running_threads);
+
+ sql_print_information("SCHEDULER: Waiting for worker threads to finish");
+
+ while (workers_count())
+ my_sleep(100000);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_scheduler::queue_changed()
+*/
+
+void
+Event_scheduler_ng::queue_changed()
+{
+ DBUG_ENTER("Event_scheduler::queue_changed");
+ DBUG_PRINT("info", ("Sending COND_state"));
+ pthread_cond_signal(&COND_state);
+ DBUG_VOID_RETURN;
+}
+
+
+void
+Event_scheduler_ng::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler_ng::lock_mutex");
+ DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
+ &LOCK_scheduler_state, func, line));
+ pthread_mutex_lock(&LOCK_scheduler_state);
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_scheduler_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+void
+Event_scheduler_ng::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex");
+ DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
+ &LOCK_scheduler_state, func, line));
+ mutex_last_unlocked_at_line= line;
+ mutex_scheduler_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+ pthread_mutex_unlock(&LOCK_scheduler_state);
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/event_scheduler_ng.h b/sql/event_scheduler_ng.h
new file mode 100644
index 00000000000..b250923d23e
--- /dev/null
+++ b/sql/event_scheduler_ng.h
@@ -0,0 +1,121 @@
+#ifndef _EVENT_SCHEDULER_NG_H_
+#define _EVENT_SCHEDULER_NG_H_
+/* Copyright (C) 2004-2006 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+class Event_timed;
+class Event_queue;
+
+class Event_scheduler_ng
+{
+public:
+ Event_scheduler_ng(){}
+ ~Event_scheduler_ng(){}
+
+ enum enum_state
+ {
+ INITIALIZED = 0,
+ RUNNING,
+ STOPPING
+ };
+
+ /* State changing methods follow */
+
+ bool
+ start();
+
+ bool
+ stop();
+
+ /*
+ Need to be public because has to be called from the function
+ passed to pthread_create.
+ */
+ bool
+ run(THD *thd);
+
+ bool
+ init(Event_queue *queue);
+
+ void
+ deinit();
+
+ void
+ init_mutexes();
+
+ void
+ deinit_mutexes();
+
+ /* Information retrieving methods follow */
+
+ enum enum_state
+ get_state();
+
+ void
+ queue_changed();
+
+ static int
+ dump_internal_status(THD *thd);
+
+private:
+ uint
+ workers_count();
+
+ /* helper functions */
+ bool
+ execute_top(THD *thd, Event_timed *job_data);
+
+ void
+ stop_all_running_events(THD *thd);
+
+ /* helper functions for working with mutexes & conditionals */
+ void
+ lock_data(const char *func, uint line);
+
+ void
+ unlock_data(const char *func, uint line);
+
+ pthread_mutex_t LOCK_scheduler_state;
+
+ /* This is the current status of the life-cycle of the scheduler. */
+ enum enum_state state;
+
+ /*
+ Holds the thread id of the executor thread or 0 if the scheduler is not
+ running. It is used by ::shutdown() to know which thread to kill with
+ kill_one_thread(). The latter wake ups a thread if it is waiting on a
+ conditional variable and sets thd->killed to non-zero.
+ */
+ ulong thread_id;
+
+ pthread_cond_t COND_state;
+
+ Event_queue *queue;
+ Event_db_repository *db_repository;
+
+ uint mutex_last_locked_at_line;
+ uint mutex_last_unlocked_at_line;
+ const char* mutex_last_locked_in_func;
+ const char* mutex_last_unlocked_in_func;
+ bool mutex_scheduler_data_locked;
+
+private:
+ /* Prevent use of these */
+ Event_scheduler_ng(const Event_scheduler_ng &);
+ void operator=(Event_scheduler_ng &);
+};
+
+#endif /* _EVENT_SCHEDULER_NG_H_ */
diff --git a/sql/events.cc b/sql/events.cc
index 09d5ee21a4f..e4b6de965f7 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -20,6 +20,7 @@
#include "event_scheduler.h"
#include "event_db_repository.h"
#include "sp_head.h"
+#include "event_scheduler_ng.h"
/*
TODO list :
@@ -293,9 +294,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options
create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
rows_affected)))
{
- Event_scheduler *scheduler= Event_scheduler::get_instance();
- if (scheduler->initialized() &&
- (ret= scheduler->create_event(thd, parse_data, true)))
+ if ((ret= event_queue->create_event(thd, parse_data, true)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
}
/* No need to close the table, it will be closed in sql_parse::do_command */
@@ -336,11 +335,9 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
*/
if (!(ret= db_repository->update_event(thd, parse_data, new_name)))
{
- Event_scheduler *scheduler= Event_scheduler::get_instance();
- if (scheduler->initialized() &&
- (ret= scheduler->update_event(thd, parse_data,
- new_name? &new_name->m_db: NULL,
- new_name? &new_name->m_name: NULL)))
+ if ((ret= event_queue->update_event(thd, parse_data,
+ new_name? &new_name->m_db: NULL,
+ new_name? &new_name->m_name: NULL)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
}
DBUG_RETURN(ret);
@@ -373,8 +370,7 @@ Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists,
if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name,
drop_if_exists, rows_affected)))
{
- Event_scheduler *scheduler= Event_scheduler::get_instance();
- if (scheduler->initialized() && (ret= scheduler->drop_event(thd, name)))
+ if ((ret= event_queue->drop_event(thd, name)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
}
DBUG_RETURN(ret);
@@ -476,8 +472,7 @@ Events::drop_schema_events(THD *thd, char *db)
DBUG_ENTER("evex_drop_db_events");
DBUG_PRINT("enter", ("dropping events from %s", db));
- Event_scheduler *scheduler= Event_scheduler::get_instance();
- ret= scheduler->drop_schema_events(thd, db_lex);
+ ret= event_queue->drop_schema_events(thd, db_lex);
ret= db_repository->drop_schema_events(thd, db_lex);
DBUG_RETURN(ret);
@@ -505,16 +500,18 @@ Events::init()
Event_db_repository *db_repo;
DBUG_ENTER("Events::init");
db_repository->init_repository();
+ event_queue->init(db_repository);
+ event_queue->scheduler= scheduler_ng;
+ scheduler_ng->init(event_queue);
/* it should be an assignment! */
if (opt_event_scheduler)
{
- Event_scheduler *scheduler= Event_scheduler::get_instance();
DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2);
- DBUG_RETURN(scheduler->init(db_repository) ||
- (opt_event_scheduler == 1? scheduler->start():
- scheduler->start_suspended()));
+ if (opt_event_scheduler == 1)
+ DBUG_RETURN(scheduler_ng->start());
}
+
DBUG_RETURN(0);
}
@@ -534,13 +531,9 @@ Events::deinit()
{
DBUG_ENTER("Events::deinit");
- Event_scheduler *scheduler= Event_scheduler::get_instance();
- if (scheduler->initialized())
- {
- scheduler->stop();
- scheduler->destroy();
- }
-
+ scheduler_ng->stop();
+ scheduler_ng->deinit();
+ event_queue->deinit();
db_repository->deinit_repository();
DBUG_VOID_RETURN;
@@ -559,8 +552,12 @@ void
Events::init_mutexes()
{
db_repository= new Event_db_repository;
- Event_scheduler::create_instance();
- Event_scheduler::init_mutexes();
+
+ event_queue= new Event_queue;
+ event_queue->init_mutexes();
+
+ scheduler_ng= new Event_scheduler_ng();
+ scheduler_ng->init_mutexes();
}
@@ -574,9 +571,11 @@ Events::init_mutexes()
void
Events::destroy_mutexes()
{
- Event_scheduler::destroy_mutexes();
+ event_queue->deinit_mutexes();
+ scheduler_ng->deinit_mutexes();
+
+ delete scheduler_ng;
delete db_repository;
- db_repository= NULL;
}
@@ -595,7 +594,7 @@ Events::destroy_mutexes()
int
Events::dump_internal_status(THD *thd)
{
- return Event_scheduler::dump_internal_status(thd);
+ return Event_scheduler_ng::dump_internal_status(thd);
}
@@ -633,3 +632,26 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
}
DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db));
}
+
+
+bool
+Events::start_execution_of_events()
+{
+ DBUG_ENTER("Events::start_execution_of_events");
+ DBUG_RETURN(scheduler_ng->start());
+}
+
+
+bool
+Events::stop_execution_of_events()
+{
+ DBUG_ENTER("Events::stop_execution_of_events");
+ DBUG_RETURN(scheduler_ng->stop());
+}
+
+bool
+Events::is_started()
+{
+ DBUG_ENTER("Events::is_started");
+ DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING);
+}
diff --git a/sql/events.h b/sql/events.h
index 1239cf58c7d..357312b44d1 100644
--- a/sql/events.h
+++ b/sql/events.h
@@ -19,6 +19,8 @@
class sp_name;
class Event_parse_data;
class Event_db_repository;
+class Event_queue;
+class Event_scheduler_ng;
/* Return codes */
enum enum_events_error_code
@@ -60,6 +62,15 @@ public:
void
destroy_mutexes();
+ bool
+ start_execution_of_events();
+
+ bool
+ stop_execution_of_events();
+
+ bool
+ is_started();
+
static Events*
get_instance();
@@ -95,6 +106,8 @@ public:
dump_internal_status(THD *thd);
Event_db_repository *db_repository;
+ Event_queue *event_queue;
+ Event_scheduler_ng *scheduler_ng;
private:
/* Singleton DP is used */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index a27be384ee2..e0c0243b301 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -864,7 +864,7 @@ static void close_connections(void)
DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->slave_thread || tmp->system_thread == SYSTEM_THREAD_EVENT_SCHEDULER)
+ if (tmp->slave_thread)
continue;
tmp->killed= THD::KILL_CONNECTION;
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 6c3606c9150..1f55d9ea3cf 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -58,6 +58,7 @@
#include <my_dir.h>
#include "event_scheduler.h"
+#include "events.h"
/* WITH_BERKELEY_STORAGE_ENGINE */
extern bool berkeley_shared_data;
@@ -3896,26 +3897,29 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
Event_scheduler *scheduler= Event_scheduler::get_instance();
/* here start the thread if not running. */
DBUG_ENTER("sys_var_event_scheduler::update");
-
- DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value));
- if (!scheduler->initialized())
+ if (Events::opt_event_scheduler == 0)
{
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--event-scheduler=0");
- DBUG_RETURN(true);
+ DBUG_RETURN(TRUE);
}
+ DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value));
+
if (var->save_result.ulonglong_value < 1 ||
var->save_result.ulonglong_value > 2)
{
char buf[64];
my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "event_scheduler",
llstr(var->save_result.ulonglong_value, buf));
- DBUG_RETURN(true);
+ DBUG_RETURN(TRUE);
}
- if ((res= scheduler->suspend_or_resume(var->save_result.ulonglong_value == 1?
- Event_scheduler::RESUME :
- Event_scheduler::SUSPEND)))
- my_error(ER_EVENT_SET_VAR_ERROR, MYF(0), (uint) res);
+ if (var->save_result.ulonglong_value == 1)
+ res= Events::get_instance()->start_execution_of_events();
+ else
+ res= Events::get_instance()->stop_execution_of_events();
+
+ if (res)
+ my_error(ER_EVENT_SET_VAR_ERROR, MYF(0));
DBUG_RETURN((bool) res);
}
@@ -3925,9 +3929,9 @@ byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
{
Event_scheduler *scheduler= Event_scheduler::get_instance();
- if (!scheduler->initialized())
+ if (Events::opt_event_scheduler == 0)
thd->sys_var_tmp.long_value= 0;
- else if (scheduler->get_state() == Event_scheduler::RUNNING)
+ else if (Events::get_instance()->is_started())
thd->sys_var_tmp.long_value= 1;
else
thd->sys_var_tmp.long_value= 2;
diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt
index ac4f2dd9237..40596d85f56 100644
--- a/sql/share/errmsg.txt
+++ b/sql/share/errmsg.txt
@@ -5831,7 +5831,7 @@ ER_DUP_ENTRY_AUTOINCREMENT_CASE
ER_EVENT_MODIFY_QUEUE_ERROR
eng "Internal scheduler error %d"
ER_EVENT_SET_VAR_ERROR
- eng "Error during starting/stopping of the scheduler. Error code %u"
+ eng "Error during starting/stopping of the scheduler."
ER_PARTITION_MERGE_ERROR
eng "%s handler cannot be used in partitioned tables"
swe "%s kan inte användas i en partitionerad tabell"
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 86a9c380ee1..52bd8da3785 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -4177,7 +4177,7 @@ copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table)
restore_record(sch_table, s->default_values);
- if (et.load_from_row(thd->mem_root, event_table))
+ if (et.load_from_row(event_table))
{
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0));
DBUG_RETURN(1);