summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-13 10:59:58 +0200
committerunknown <andrey@lmy004.>2006-07-13 10:59:58 +0200
commit31caa8c433ace692ea5f31c2c2ae0d872533e8de (patch)
treefb7407480fd7ae1b764e1bf59ec33047559bf6dd /sql/event_queue.cc
parent628be8a71611bc86f7f0cf809b27d63bdd9b12c8 (diff)
downloadmariadb-git-31caa8c433ace692ea5f31c2c2ae0d872533e8de.tar.gz
WL #3337 (Events new architecture)
Final stroke, events should be loaded from disk on server startup. Also check the validity of their bodies if possible during loading. sql/event_data_objects.cc: Remove Event_job_data::free_sp(), move the code to the destructor Change the way we change the security context Steal some code from sql_parse.cc sql/event_data_objects.h: Remove free_sp() Make compile() public, to be used when booting for verifying the integrity of mysql.event sql/event_queue.cc: Make the queue load events from disk on server boot. Compile and thus check for integrity the events. sql/event_queue.h: shift methods around. add queue_loaded boolean. sql/event_scheduler.cc: Rename init_event_thread() to pre_init_event_thread() and make it more generic. Add post_init_event_thread() Export these two as well as deinit_event_thread(). Now it is quite easy to write code to spawn a new event thread whenever needed. sql/event_scheduler.h: export pre_init_event_thread(), post_init_event_thread() and deinit_event_thread() to simplify writing of thread functions. sql/events.cc: Events::init() returns only one error code, then make it bool sql/events.h: Events::init() returns only one error code, then make it bool sql/mysqld.cc: Check the return code of Events::init() sql/sp_head.cc: Add trace info sql/sql_class.cc: Reorganize thd::change_security_context() to load main_security_ctx sql/sql_class.h: Reorganize thd::change_security_context() to load main_security_ctx sql/sql_lex.cc: Initialize lex->spname sql/sql_yacc.yy: Add a comment
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc144
1 files changed, 110 insertions, 34 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 12eceee8cfb..69114c53118 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -35,6 +35,14 @@
#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
+struct event_queue_param
+{
+ THD *thd;
+ Event_queue *queue;
+ pthread_mutex_t LOCK_loaded;
+ pthread_cond_t COND_loaded;
+};
+
/*
Compares the execute_at members of two Event_queue_element instances.
@@ -64,6 +72,31 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
}
+pthread_handler_t
+event_queue_loader_thread(void *arg)
+{
+ /* needs to be first for thread_stack */
+ THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
+ struct event_queue_param *param= (struct event_queue_param *) arg;
+ thd->thread_stack= (char *) &thd;
+
+ if (post_init_event_thread(thd))
+ goto end;
+
+ DBUG_ENTER("event_queue_loader_thread");
+
+ pthread_mutex_lock(&param->LOCK_loaded);
+ param->queue->load_events_from_db(thd);
+ pthread_cond_signal(&param->COND_loaded);
+ pthread_mutex_unlock(&param->LOCK_loaded);
+
+end:
+ deinit_event_thread(thd);
+
+ DBUG_RETURN(0); // Against gcc warnings
+}
+
+
/*
Constructor of class Event_queue.
@@ -80,6 +113,8 @@ Event_queue::Event_queue()
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
+
+ queue_loaded= FALSE;
}
@@ -125,8 +160,11 @@ Event_queue::deinit_mutexes()
bool
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
{
- int i= 0;
- bool ret= FALSE;
+ THD *new_thd;
+ pthread_t th;
+ bool res;
+ struct event_queue_param *event_queue_param_value= NULL;
+
DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this=0x%lx", this));
@@ -139,8 +177,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
NULL, EVENT_QUEUE_EXTENT))
{
sql_print_error("SCHEDULER: Can't initialize the execution queue");
- ret= TRUE;
- goto end;
+ goto err;
}
if (sizeof(my_time_t) != sizeof(time_t))
@@ -148,13 +185,43 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler may not work correctly. Stopping.");
DBUG_ASSERT(0);
- ret= TRUE;
- goto end;
+ goto err;
}
-end:
+ if (!(new_thd= new THD))
+ goto err;
+
+ pre_init_event_thread(new_thd);
+
+ event_queue_param_value= (struct event_queue_param *)
+ my_malloc(sizeof(struct event_queue_param), MYF(0));
+ event_queue_param_value->thd= new_thd;
+ event_queue_param_value->queue= this;
+ pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
+
+ pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
+ DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
+ if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
+ (void*)event_queue_param_value)))
+ {
+ do {
+ pthread_cond_wait(&event_queue_param_value->COND_loaded,
+ &event_queue_param_value->LOCK_loaded);
+ } while (queue_loaded == FALSE);
+ }
+
+ pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
+ pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
+ pthread_cond_destroy(&event_queue_param_value->COND_loaded);
+ my_free((char *)event_queue_param_value, MYF(0));
+
UNLOCK_QUEUE_DATA();
- DBUG_RETURN(ret);
+ DBUG_RETURN(res);
+
+err:
+ UNLOCK_QUEUE_DATA();
+ DBUG_RETURN(TRUE);
}
@@ -498,7 +565,7 @@ Event_queue::load_events_from_db(THD *thd)
READ_RECORD read_record_info;
int ret= -1;
uint count= 0;
- bool clean_the_queue= FALSE;
+ bool clean_the_queue= TRUE;
/* Compile the events on this root but only for syntax check, then discard */
MEM_ROOT boot_root;
@@ -518,14 +585,12 @@ Event_queue::load_events_from_db(THD *thd)
if (!(et= new Event_queue_element))
{
DBUG_PRINT("info", ("Out of memory"));
- clean_the_queue= TRUE;
break;
}
DBUG_PRINT("info", ("Loading event from row."));
if ((ret= et->load_from_row(table)))
{
- clean_the_queue= TRUE;
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
break;
@@ -536,27 +601,6 @@ Event_queue::load_events_from_db(THD *thd)
delete et;
continue;
}
-#if 0
- init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- /* We load only on scheduler root just to check whether the body compiles */
- switch (ret= et->compile(thd, &boot_root)) {
- case EVEX_MICROSECOND_UNSUP:
- et->free_sp();
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- goto end;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
- et->dbname.str, et->name.str);
- goto end;
- default:
- /* Free it, it will be compiled again on the worker thread */
- et->free_sp();
- break;
- }
- free_root(&boot_root, MYF(0));
/* let's find when to be executed */
if (et->compute_next_execution_time())
@@ -565,11 +609,40 @@ Event_queue::load_events_from_db(THD *thd)
" Skipping", et->dbname.str, et->name.str);
continue;
}
-#endif
+
+ {
+ Event_job_data temp_job_data;
+ DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
+
+ temp_job_data.load_from_row(table);
+
+ /* We load only on scheduler root just to check whether the body compiles */
+ switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
+ case EVEX_MICROSECOND_UNSUP:
+ sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
+ "supported but found in mysql.event");
+ break;
+ case EVEX_COMPILE_ERROR:
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
+ et->dbname.str, et->name.str);
+ break;
+ default:
+ break;
+ }
+ thd->end_statement();
+ thd->cleanup_after_query();
+ }
+ if (ret)
+ {
+ delete et;
+ goto end;
+ }
+
DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
queue_insert_safe(&queue, (byte *) et);
count++;
}
+ clean_the_queue= FALSE;
end:
end_read_record(&read_record_info);
@@ -585,10 +658,12 @@ end:
}
/* Force close to free memory */
- thd->version--;
+ thd->version--;
close_thread_tables(thd);
+ queue_loaded= TRUE;
+
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
@@ -713,6 +788,7 @@ Event_queue::empty_queue()
uint i;
DBUG_ENTER("Event_queue::empty_queue");
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
+ sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
/* empty the queue */
for (i= 0; i < queue.elements; ++i)
{