summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc234
1 files changed, 49 insertions, 185 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 39b237987e9..ba12b732726 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -73,35 +73,6 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
}
-pthread_handler_t
-event_queue_loader_thread(void *arg)
-{
- /* needs to be first for thread_stack */
- THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
- struct event_queue_param *param= (struct event_queue_param *) arg;
- thd->thread_stack= (char *) &thd;
-
- if (post_init_event_thread(thd))
- goto end;
-
- DBUG_ENTER("event_queue_loader_thread");
-
-
- pthread_mutex_lock(&param->LOCK_loaded);
- param->queue->check_system_tables(thd);
- param->queue->load_events_from_db(thd);
-
- param->loading_finished= TRUE;
- pthread_cond_signal(&param->COND_loaded);
-
- pthread_mutex_unlock(&param->LOCK_loaded);
-
-end:
- deinit_event_thread(thd);
- DBUG_RETURN(0); // Against gcc warnings
-}
-
-
/*
Constructor of class Event_queue.
@@ -110,14 +81,12 @@ end:
*/
Event_queue::Event_queue()
+ :mutex_last_unlocked_at_line(0), mutex_last_locked_at_line(0),
+ mutex_last_attempted_lock_at_line(0),
+ mutex_queue_data_locked(FALSE), mutex_queue_data_attempting_lock(FALSE)
{
- mutex_last_unlocked_at_line= mutex_last_locked_at_line=
- mutex_last_attempted_lock_at_line= 0;
-
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
mutex_last_attempted_lock_in_func= "";
-
- mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
}
@@ -150,7 +119,12 @@ Event_queue::deinit_mutexes()
/*
- Inits the queue
+ This is a queue's constructor. Until this method is called, the
+ queue is unusable. We don't use a C++ constructor instead in
+ order to be able to check the return value. The queue is
+ initialized once at server startup. Initialization can fail in
+ case of a failure reading events from the database or out of
+ memory.
SYNOPSIS
Event_queue::init()
@@ -161,9 +135,9 @@ Event_queue::deinit_mutexes()
*/
bool
-Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
+Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
+ Event_scheduler *sched)
{
- THD *new_thd;
pthread_t th;
bool res;
struct event_queue_param *event_queue_param_value= NULL;
@@ -186,43 +160,16 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
if (sizeof(my_time_t) != sizeof(time_t))
{
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
- "The scheduler may not work correctly. Stopping.");
+ "The scheduler may not work correctly. Stopping");
DBUG_ASSERT(0);
goto err;
}
- if (!(new_thd= new THD))
- goto err;
-
- pre_init_event_thread(new_thd);
- new_thd->security_ctx->set_user((char*)"event_scheduler_loader");
-
- event_queue_param_value= (struct event_queue_param *)
- my_malloc(sizeof(struct event_queue_param), MYF(0));
-
- event_queue_param_value->thd= new_thd;
- event_queue_param_value->queue= this;
- event_queue_param_value->loading_finished= FALSE;
- pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
-
- pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
- DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
- if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
- (void*)event_queue_param_value)))
- {
- do {
- pthread_cond_wait(&event_queue_param_value->COND_loaded,
- &event_queue_param_value->LOCK_loaded);
- } while (event_queue_param_value->loading_finished == FALSE);
- }
-
- pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
- pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
- pthread_cond_destroy(&event_queue_param_value->COND_loaded);
- my_free((char *)event_queue_param_value, MYF(0));
-
+ res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
+ if (res)
+ deinit_queue();
+
DBUG_RETURN(res);
err:
@@ -316,16 +263,15 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
{
int res;
- Event_queue_element *old_element= NULL,
- *new_element;
+ Event_queue_element *new_element;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
new_element= new Event_queue_element();
- res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
- new_name? *new_name:name, new_element);
+ res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
+ new_name ? *new_name:name, new_element);
if (res)
{
delete new_element;
@@ -345,16 +291,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
- if (!(old_element= find_n_remove_event(dbname, name)))
- {
- DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
- dbname.str, name.str));
- }
+ find_n_remove_event(dbname, name);
+
/* If not disabled event */
if (new_element)
{
- DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
- new_element, old_element));
+ DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
}
dbug_dump_queue(thd->query_start());
@@ -363,8 +305,6 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
if (new_element)
notify_observers();
- if (old_element)
- delete old_element;
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
@@ -385,19 +325,13 @@ void
Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
- Event_queue_element *element;
DBUG_ENTER("Event_queue::drop_event");
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA();
- element= find_n_remove_event(dbname, name);
+ find_n_remove_event(dbname, name);
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
-
- if (element)
- delete element;
- else
- DBUG_PRINT("info", ("No such event found, probably DISABLED"));
/*
We don't signal here because the scheduler will catch the change
@@ -429,10 +363,10 @@ void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*comparator)(LEX_STRING, Event_basic *))
{
+ uint i= 0;
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%s", pattern.str));
- uint i= 0;
while (i < queue.elements)
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
@@ -440,10 +374,10 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
if (comparator(pattern, et))
{
/*
- The queue is ordered. If we remove an element, then all elements after
- it will shift one position to the left, if we imagine it as an array
- from left to the right. In this case we should not increment the
- counter and the (i < queue.elements) condition is ok.
+ The queue is ordered. If we remove an element, then all elements
+ after it will shift one position to the left, if we imagine it as
+ an array from left to the right. In this case we should not
+ increment the counter and the (i < queue.elements) condition is ok.
*/
queue_remove(&queue, i);
delete et;
@@ -453,12 +387,12 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
}
/*
We don't call notify_observers() . If we remove the top event:
- 1. The queue is empty. The scheduler will wake up at some time and realize
- that the queue is empty. If create_event() comes inbetween it will
- signal the scheduler
- 2. The queue is not empty, but the next event after the previous top, won't
- be executed any time sooner than the element we removed. Hence, we may
- not notify the scheduler and it will realize the change when it
+ 1. The queue is empty. The scheduler will wake up at some time and
+ realize that the queue is empty. If create_event() comes inbetween
+ it will signal the scheduler
+ 2. The queue is not empty, but the next event after the previous top,
+ won't be executed any time sooner than the element we removed. Hence,
+ we may not notify the scheduler and it will realize the change when it
wakes up from timedwait.
*/
@@ -472,11 +406,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
SYNOPSIS
Event_queue::drop_schema_events()
- thd THD
- db The schema name
-
- RETURN VALUE
- >=0 Number of dropped events
+ thd HD
+ schema The schema name
*/
void
@@ -516,16 +447,12 @@ Event_queue::notify_observers()
db The schema of the event to find
name The event to find
- RETURN VALUE
- NULL Not found
- otherwise Address
-
NOTE
The caller should do the locking also the caller is responsible for
actual signalling in case an event is removed from the queue.
*/
-Event_queue_element *
+void
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
{
uint i;
@@ -539,11 +466,12 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
if (event_basic_identifier_equal(db, name, et))
{
queue_remove(&queue, i);
- DBUG_RETURN(et);
+ delete et;
+ break;
}
}
- DBUG_RETURN(NULL);
+ DBUG_VOID_RETURN;
}
@@ -583,7 +511,7 @@ Event_queue::load_events_from_db(THD *thd)
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
{
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
@@ -625,14 +553,17 @@ Event_queue::load_events_from_db(THD *thd)
temp_job_data.load_from_row(table);
- /* We load only on scheduler root just to check whether the body compiles */
+ /*
+ We load only on scheduler root just to check whether the body
+ compiles.
+ */
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
break;
case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
et->dbname.str, et->name.str);
break;
default:
@@ -663,12 +594,10 @@ end:
else
{
ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
+ sql_print_information("SCHEDULER: Loaded %d event%s", count,
+ (count == 1)?"":"s");
}
- /* Force close to free memory */
- thd->version--;
-
close_thread_tables(thd);
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
@@ -677,71 +606,6 @@ end:
/*
- Opens mysql.db and mysql.user and checks whether:
- 1. mysql.db has column Event_priv at column 20 (0 based);
- 2. mysql.user has column Event_priv at column 29 (0 based);
-
- SYNOPSIS
- Event_queue::check_system_tables()
- thd Thread
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-void
-Event_queue::check_system_tables(THD *thd)
-{
- TABLE_LIST tables;
- bool not_used;
- Open_tables_state backup;
- bool ret;
-
- DBUG_ENTER("Event_queue::check_system_tables");
- DBUG_PRINT("enter", ("thd=0x%lx", thd));
-
- thd->reset_n_backup_open_tables_state(&backup);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "db";
- tables.lock_type= TL_READ;
-
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
- {
- sql_print_error("Cannot open mysql.db");
- goto end;
- }
- ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
- mysql_db_table_fields, &mysql_db_table_last_check,
- ER_CANNOT_LOAD_FROM_TABLE);
- close_thread_tables(thd);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "user";
- tables.lock_type= TL_READ;
-
- if (simple_open_n_lock_tables(thd, &tables))
- sql_print_error("Cannot open mysql.db");
- else
- {
- if (tables.table->s->fields < 29 ||
- strncmp(tables.table->field[29]->field_name,
- STRING_WITH_LEN("Event_priv")))
- sql_print_error("mysql.user has no `Event_priv` column at position 29");
- close_thread_tables(thd);
- }
-
-end:
- thd->restore_backup_open_tables_state(&backup);
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
Recalculates activation times in the queue. There is one reason for
that. Because the values (execute_at) by which the queue is ordered are
changed by calls to compute_next_execution_time() on a request from the
@@ -782,7 +646,7 @@ Event_queue::recalculate_activation_times(THD *thd)
Event_queue::empty_queue()
NOTE
- Should be called with LOCK_event_queue locked
+ Should be called with LOCK_event_queue locked
*/
void