diff options
author | unknown <andrey@lmy004.> | 2006-01-10 21:02:19 +0100 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2006-01-10 21:02:19 +0100 |
commit | bb0b7f5dda0933e60cd9520d172fd27a9724cdb6 (patch) | |
tree | ed7905772f33cdc3e24dd8fbfa8894d09ed4ec0a /sql/event_executor.cc | |
parent | 12c8b61aad80d04c99d77ef2a92fc26393b4f68d (diff) | |
parent | 1ef97f1f3b3dc53a5945fb4fc7db6a3fea926379 (diff) | |
download | mariadb-git-bb0b7f5dda0933e60cd9520d172fd27a9724cdb6.tar.gz |
WL #1034 (Internal CRON)
merge before push
BUILD/SETUP.sh:
Auto merged
include/my_sys.h:
Auto merged
mysql-test/r/grant.result:
Auto merged
mysql-test/r/ps.result:
Auto merged
sql/Makefile.am:
Auto merged
sql/lex.h:
Auto merged
sql/mysqld.cc:
Auto merged
sql/set_var.cc:
Auto merged
sql/set_var.h:
Auto merged
sql/sp.cc:
Auto merged
sql/sp.h:
Auto merged
sql/sp_head.h:
Auto merged
sql/sql_acl.cc:
Auto merged
sql/sql_acl.h:
Auto merged
sql/sql_lex.cc:
Auto merged
sql/sql_parse.cc:
Auto merged
sql/sql_show.cc:
Auto merged
sql/table.cc:
Auto merged
sql/tztime.cc:
Auto merged
libmysqld/Makefile.am:
manual merge
mysql-test/r/information_schema.result:
manual merge
sql/share/errmsg.txt:
manual merge
sql/sp_head.cc:
manual merge
sql/sql_lex.h:
manual merge
sql/sql_yacc.yy:
manual merge
Diffstat (limited to 'sql/event_executor.cc')
-rw-r--r-- | sql/event_executor.cc | 618 |
1 files changed, 618 insertions, 0 deletions
diff --git a/sql/event_executor.cc b/sql/event_executor.cc new file mode 100644 index 00000000000..07e53d62a5e --- /dev/null +++ b/sql/event_executor.cc @@ -0,0 +1,618 @@ +/* Copyright (C) 2004-2005 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "event_priv.h" +#include "event.h" +#include "sp.h" + + +/* + Make this define DBUG_FAULTY_THR to be able to put breakpoints inside + code used by the scheduler's thread(s). In this case user connections + are not possible because the scheduler thread code is ran inside the + main thread (no spawning takes place. If you want to debug client + connection then start with --one-thread and make the define + DBUG_FAULTY_THR ! +*/ +#define DBUG_FAULTY_THR2 + +extern ulong thread_created; +extern const char *my_localhost; + +pthread_mutex_t LOCK_event_arrays, + LOCK_workers_count, + LOCK_evex_running; + + +bool evex_is_running= false; + +ulonglong evex_main_thread_id= 0; +ulong opt_event_executor; +volatile my_bool event_executor_running_global_var; +static my_bool evex_mutexes_initted= false; +static uint workers_count; + +static int +evex_load_events_from_db(THD *thd); + + + +/* + TODO Andrey: Check for command line option whether to start + the main thread or not. +*/ + +pthread_handler_t +event_executor_worker(void *arg); + +pthread_handler_t +event_executor_main(void *arg); + +static int +evex_time_diff(TIME *a, TIME *b) +{ + return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b); +} + + +static void +evex_init_mutexes() +{ + if (evex_mutexes_initted) + return; + + evex_mutexes_initted= true; + pthread_mutex_init(&LOCK_event_arrays, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_workers_count, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST); + + event_executor_running_global_var= opt_event_executor; +} + + +int +init_events() +{ + pthread_t th; + + DBUG_ENTER("init_events"); + + DBUG_PRINT("info",("Starting events main thread")); + + evex_init_mutexes(); + + VOID(pthread_mutex_lock(&LOCK_evex_running)); + evex_is_running= false; + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + + if (event_executor_running_global_var) + { +#ifndef DBUG_FAULTY_THR + //TODO Andrey: Change the error code returned! + if (pthread_create(&th, NULL, event_executor_main, (void*)NULL)) + DBUG_RETURN(ER_SLAVE_THREAD); +#else + event_executor_main(NULL); +#endif + } + + DBUG_RETURN(0); +} + + +void +shutdown_events() +{ + DBUG_ENTER("shutdown_events"); + + VOID(pthread_mutex_lock(&LOCK_evex_running)); + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + + pthread_mutex_destroy(&LOCK_event_arrays); + pthread_mutex_destroy(&LOCK_workers_count); + pthread_mutex_destroy(&LOCK_evex_running); + + DBUG_VOID_RETURN; +} + + +static int +init_event_thread(THD* thd) +{ + DBUG_ENTER("init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->skip_grants(); + thd->security_ctx->host= (char*)my_localhost; + my_net_init(&thd->net, 0); + thd->net.read_timeout = slave_net_timeout; + thd->slave_thread= 0; + thd->options= OPTION_AUTO_IS_NULL; + thd->client_capabilities= CLIENT_LOCAL_FILES; + thd->real_id=pthread_self(); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->thread_id= thread_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + delete thd; + DBUG_RETURN(-1); + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + DBUG_RETURN(0); +} + +pthread_handler_t +event_executor_main(void *arg) +{ + THD *thd; /* needs to be first for thread_stack */ + ulonglong iter_num= 0; + uint i=0, j=0; + my_ulonglong cnt= 0; + + DBUG_ENTER("event_executor_main"); + DBUG_PRINT("event_executor_main", ("EVEX thread started")); + + + // init memory root + init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + + + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + + //TODO Andrey: Check for NULL + if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! + { + sql_print_error("Cannot create THD for event_executor_main"); + goto err_no_thd; + } + thd->thread_stack = (char*)&thd; // remember where our stack is + + pthread_detach_this_thread(); + + if (init_event_thread(thd)) + goto err; + + // make this thread invisible it has no vio -> show processlist won't see + thd->system_thread= 1; + + VOID(pthread_mutex_lock(&LOCK_thread_count)); + threads.append(thd); + thread_count++; + thread_running++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + DBUG_PRINT("EVEX main thread", ("Initing events_queuey")); + + /* + eventually manifest that we are running, not to crashe because of + usage of non-initialized memory structures. + */ + VOID(pthread_mutex_lock(&LOCK_evex_running)); + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + evex_queue_init(&EVEX_EQ_NAME); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + evex_is_running= true; + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + + thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); + + if (evex_load_events_from_db(thd)) + goto err; + + evex_main_thread_id= thd->thread_id; + + sql_print_information("Scheduler thread started"); + while (!thd->killed) + { + TIME time_now; + my_time_t now; + event_timed *et; + + cnt++; + DBUG_PRINT("info", ("EVEX External Loop %d", cnt)); + + thd->proc_info = "Sleeping"; + if (!evex_queue_num_elements(EVEX_EQ_NAME) || + !event_executor_running_global_var) + { + my_sleep(1000000);// sleep 1s + continue; + } + + { + int t2sleep; + /* + now let's see how much time to sleep, we know there is at least 1 + element in the queue. + */ + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + if (et->status == MYSQL_EVENT_DISABLED) + { + DBUG_PRINT("evex_load_events_from_db",("Now it is disabled-exec no more")); + if (et->dropped) + et->drop(thd); + delete et; + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + sql_print_information("Event found disabled, dropping."); + continue; + } + + time(&now); + my_tz_UTC->gmt_sec_to_TIME(&time_now, now); + t2sleep= evex_time_diff(&et->execute_at, &time_now); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + if (t2sleep > 0) + { + /* + We sleep t2sleep seconds but we check every second whether this thread + has been killed, or there is a new candidate + */ + while (t2sleep-- && !thd->killed && + evex_queue_num_elements(EVEX_EQ_NAME) && + (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) + my_sleep(1000000); + } + if (!event_executor_running_global_var) + continue; + } + + + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + + /* + if this is the first event which is after time_now then no + more need to iterate over more elements since the array is sorted. + */ + if (et->execute_at.year > 1969 && + my_time_compare(&time_now, &et->execute_at) == -1) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + + if (et->status == MYSQL_EVENT_ENABLED) + { + pthread_t th; + + et->mark_last_executed(); + et->compute_next_execution_time(); + et->update_fields(thd); + DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num)); +#ifndef DBUG_FAULTY_THR + if (pthread_create(&th, NULL, event_executor_worker, (void*)et)) + { + sql_print_error("Problem while trying to create a thread"); + UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); + } +#else + event_executor_worker((void *) et); +#endif + if ((et->execute_at.year && !et->expression) || + TIME_to_ulonglong_datetime(&et->execute_at) == 0) + et->flags |= EVENT_EXEC_NO_MORE; + + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + else + evex_queue_first_updated(&EVEX_EQ_NAME); + } + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + }// while + +err: + // First manifest that this thread does not work and then destroy + VOID(pthread_mutex_lock(&LOCK_evex_running)); + evex_is_running= false; + evex_main_thread_id= 0; + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + + sql_print_information("Event scheduler stopping"); + + /* + TODO: A better will be with a conditional variable + */ + /* + Read workers_count without lock, no need for locking. + In the worst case we have to wait 1sec more. + */ + while (workers_count) + my_sleep(1000000);// 1s + + /* + LEX_STRINGs reside in the memory root and will be destroyed with it. + Hence no need of delete but only freeing of SP + */ + // First we free all objects ... + for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) + { + event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*); + et->free_sp(); + delete et; + } + // ... then we can thras the whole queue at once + evex_queue_destroy(&EVEX_EQ_NAME); + + thd->proc_info = "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); // destructor will not free it, because we are weird + THD_CHECK_SENTRY(thd); + + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; +#ifndef DBUG_FAULTY_THR + THD_CHECK_SENTRY(thd); + delete thd; +#endif + pthread_mutex_unlock(&LOCK_thread_count); + + +err_no_thd: + VOID(pthread_mutex_lock(&LOCK_evex_running)); + evex_is_running= false; + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + + free_root(&evex_mem_root, MYF(0)); + sql_print_information("Event scheduler stopped"); + +#ifndef DBUG_FAULTY_THR + my_thread_end(); + pthread_exit(0); +#endif + DBUG_RETURN(0);// Can't return anything here +} + + +pthread_handler_t +event_executor_worker(void *event_void) +{ + THD *thd; /* needs to be first for thread_stack */ + event_timed *event = (event_timed *) event_void; + MEM_ROOT worker_mem_root; + + DBUG_ENTER("event_executor_worker"); + VOID(pthread_mutex_lock(&LOCK_workers_count)); + ++workers_count; + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + + init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + +#ifndef DBUG_FAULTY_THR + my_thread_init(); + + if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! + { + sql_print_error("Cannot create a THD structure in a scheduler worker thread"); + goto err_no_thd; + } + thd->thread_stack = (char*)&thd; // remember where our stack is + thd->mem_root= &worker_mem_root; + + pthread_detach(pthread_self()); + + if (init_event_thread(thd)) + goto err; + + thd->init_for_queries(); + + // make this thread visible it has no vio -> show processlist needs this flag + thd->system_thread= 1; + + VOID(pthread_mutex_lock(&LOCK_thread_count)); + threads.append(thd); + thread_count++; + thread_running++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); +#else + thd= current_thd; +#endif + + // thd->security_ctx->priv_host is char[MAX_HOSTNAME] + + strxnmov(thd->security_ctx->priv_host, sizeof(thd->security_ctx->priv_host), + event->definer_host.str, NullS); + + thd->security_ctx->user= thd->security_ctx->priv_user= + my_strdup(event->definer_user.str, MYF(0)); + + thd->db= event->dbname.str; + if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0, + is_schema_db(event->dbname.str))) + { + int ret; + DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]", + event->dbname.str, event->name.str,(int) event->expression)); + sql_print_information(" EVEX EXECUTING event %s.%s [EXPR:%d]", + event->dbname.str, event->name.str,(int) event->expression); + + ret= event->execute(thd, &worker_mem_root); + + sql_print_information(" EVEX EXECUTED event %s.%s [EXPR:%d]. RetCode=%d", + event->dbname.str, event->name.str, + (int) event->expression, ret); + DBUG_PRINT("info", (" EVEX EXECUTED event %s.%s [EXPR:%d]. RetCode=%d", + event->dbname.str, event->name.str, + (int) event->expression, ret)); + } + if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED) + { + if (event->dropped) + event->drop(thd); + delete event; + } + + thd->db= 0; + +err: + VOID(pthread_mutex_lock(&LOCK_thread_count)); +#ifndef DBUG_FAULTY_THR + thread_count--; + thread_running--; + /* + Some extra safety, which should not been needed (normally, event deletion + should already have done these assignments (each event which sets these + variables is supposed to set them to 0 before terminating)). + */ + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + thd->proc_info = "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); // destructor will not free it, because we are weird + THD_CHECK_SENTRY(thd); + + VOID(pthread_mutex_lock(&LOCK_thread_count)); + THD_CHECK_SENTRY(thd); + delete thd; +#endif + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + +err_no_thd: + + free_root(&worker_mem_root, MYF(0)); + + VOID(pthread_mutex_lock(&LOCK_workers_count)); + --workers_count; + VOID(pthread_mutex_unlock(&LOCK_workers_count)); + +#ifndef DBUG_FAULTY_THR + my_thread_end(); + pthread_exit(0); +#endif + DBUG_RETURN(0); // Can't return anything here +} + + +static int +evex_load_events_from_db(THD *thd) +{ + TABLE *table; + READ_RECORD read_record_info; + MYSQL_LOCK *lock; + int ret= -1; + uint count= 0; + + DBUG_ENTER("evex_load_events_from_db"); + + if ((ret= evex_open_event_table(thd, TL_READ, &table))) + { + sql_print_error("Table mysql.event is damaged."); + DBUG_RETURN(SP_OPEN_TABLE_FAILED); + } + + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info))) + { + event_timed *et; + if (!(et= new event_timed)) + { + DBUG_PRINT("evex_load_events_from_db", ("Out of memory")); + ret= -1; + goto end; + } + DBUG_PRINT("evex_load_events_from_db", ("Loading event from row.")); + + if ((ret= et->load_from_row(&evex_mem_root, table))) + { + sql_print_error("Error while loading from mysql.event. " + "Table probably corrupted"); + goto end; + } + if (et->status != MYSQL_EVENT_ENABLED) + { + DBUG_PRINT("evex_load_events_from_db",("%s is disabled",et->name.str)); + delete et; + continue; + } + + DBUG_PRINT("evex_load_events_from_db", + ("Event %s loaded from row. Time to compile", et->name.str)); + + if ((ret= et->compile(thd, &evex_mem_root))) + { + sql_print_error("Error while compiling %s.%s. Aborting load.", + et->dbname.str, et->name.str); + goto end; + } + + // let's find when to be executed + et->compute_next_execution_time(); + + DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list.")); + + evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et); + DBUG_PRINT("evex_load_events_from_db", ("%p %*s", + et, et->name.length,et->name.str)); + count++; + } + + ret= 0; + +end: + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + end_read_record(&read_record_info); + + thd->version--; // Force close to free memory + + close_thread_tables(thd); + sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s"); + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); + + DBUG_RETURN(ret); +} + + +bool sys_var_event_executor::update(THD *thd, set_var *var) +{ + // here start the thread if not running. + VOID(pthread_mutex_lock(&LOCK_evex_running)); + *value= var->save_result.ulong_value; + if ((my_bool) *value && !evex_is_running) + { + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + init_events(); + } else + VOID(pthread_mutex_unlock(&LOCK_evex_running)); + return 0; +} + |