diff options
author | unknown <andrey@lmy004.> | 2005-12-12 21:19:19 +0100 |
---|---|---|
committer | unknown <andrey@lmy004.> | 2005-12-12 21:19:19 +0100 |
commit | 81eadfcac18f3adc29c55d440e62823855f5b070 (patch) | |
tree | bb204756d7ec8bb96576f0bacb52879ab6ddddbf | |
parent | 1c5573a47c4ef43558770c1c4e536ba39114dcbb (diff) | |
download | mariadb-git-81eadfcac18f3adc29c55d440e62823855f5b070.tar.gz |
WL#1034 update
QUEUE implementation working now. this should be ready more or less
for testing once the debug output is being cleaned and some things
around DYNAMIC_ARRAY are cleaned
- fix handling in case of errors that lead to crashes, now no more crashes
in case of table corruption and such.
include/queues.h:
introduce a safe version of queue_insert that will extend the queue if
necessary. the auto_extent is passed to the _ex version of init_queue()
mysys/queues.c:
add init_queue_ex() implementation
add queue_insert_safe() implementation
sql/event.cc:
- move mysql_priv.h inclusion to event_priv.h
- use a priority queue instead of DYNAMIC_ARRAY which is sorted
sql/event.h:
reorder
sql/event_executor.cc:
reorder
sql/event_priv.h:
- reorder a bit
- add macroses and functions for queue manipulation which stay on top
of QUEUE (partly implemented for DYNAMIC_ARRAY but will be cleared to be
only for QUEUE).
sql/event_timed.cc:
allocate one more byte and zeroterminate, really
-rw-r--r-- | include/queues.h | 6 | ||||
-rw-r--r-- | mysys/queues.c | 67 | ||||
-rw-r--r-- | sql/event.cc | 127 | ||||
-rw-r--r-- | sql/event.h | 6 | ||||
-rw-r--r-- | sql/event_executor.cc | 226 | ||||
-rw-r--r-- | sql/event_priv.h | 80 | ||||
-rw-r--r-- | sql/event_timed.cc | 7 |
7 files changed, 402 insertions, 117 deletions
diff --git a/include/queues.h b/include/queues.h index a8b676b763c..8cb053831f2 100644 --- a/include/queues.h +++ b/include/queues.h @@ -35,6 +35,7 @@ typedef struct st_queue { uint offset_to_key; /* compare is done on element+offset */ int max_at_top; /* Set if queue_top gives max */ int (*compare)(void *, byte *,byte *); + uint auto_extent; } QUEUE; #define queue_top(queue) ((queue)->root[1]) @@ -49,14 +50,19 @@ typedef int (*queue_compare)(void *,byte *, byte *); int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key, pbool max_at_top, queue_compare compare, void *first_cmp_arg); +int init_queue_ex(QUEUE *queue,uint max_elements,uint offset_to_key, + pbool max_at_top, queue_compare compare, + void *first_cmp_arg, uint auto_extent); int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key, pbool max_at_top, queue_compare compare, void *first_cmp_arg); int resize_queue(QUEUE *queue, uint max_elements); void delete_queue(QUEUE *queue); void queue_insert(QUEUE *queue,byte *element); +int queue_insert_safe(QUEUE *queue, byte *element); byte *queue_remove(QUEUE *queue,uint idx); #define queue_remove_all(queue) { (queue)->elements= 0; } +#define queue_is_full(queue) (queue->elements == queue->max_elements) void _downheap(QUEUE *queue,uint idx); void queue_fix(QUEUE *queue); #define is_queue_inited(queue) ((queue)->root != 0) diff --git a/mysys/queues.c b/mysys/queues.c index 0e4e251f7e7..8e572a0f195 100644 --- a/mysys/queues.c +++ b/mysys/queues.c @@ -19,7 +19,7 @@ Implemention of queues from "Algoritms in C" by Robert Sedgewick. An optimisation of _downheap suggested in Exercise 7.51 in "Data Structures & Algorithms in C++" by Mark Allen Weiss, Second Edition - was implemented by Mikael Ronström 2005. Also the O(N) algorithm + was implemented by Mikael Ronstrom 2005. Also the O(N) algorithm of queue_fix was implemented. */ @@ -67,6 +67,46 @@ int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key, } + +/* + Init queue, uses init_queue internally for init work but also accepts + auto_extent as parameter + + SYNOPSIS + init_queue_ex() + queue Queue to initialise + max_elements Max elements that will be put in queue + offset_to_key Offset to key in element stored in queue + Used when sending pointers to compare function + max_at_top Set to 1 if you want biggest element on top. + compare Compare function for elements, takes 3 arguments. + first_cmp_arg First argument to compare function + auto_extent When the queue is full and there is insert operation + extend the queue. + + NOTES + Will allocate max_element pointers for queue array + + RETURN + 0 ok + 1 Could not allocate memory +*/ + +int init_queue_ex(QUEUE *queue, uint max_elements, uint offset_to_key, + pbool max_at_top, int (*compare) (void *, byte *, byte *), + void *first_cmp_arg, uint auto_extent) +{ + int ret; + DBUG_ENTER("init_queue_ex"); + + if ((ret= init_queue(queue, max_elements, offset_to_key, max_at_top, compare, + first_cmp_arg))) + DBUG_RETURN(ret); + + queue->auto_extent= auto_extent; + DBUG_RETURN(0); +} + /* Reinitialize queue for other usage @@ -192,6 +232,31 @@ void queue_insert(register QUEUE *queue, byte *element) } } +/* + Does safe insert. If no more space left on the queue resize it. + Return codes: + 0 - OK + 1 - Cannot allocate more memory + 2 - auto_extend is 0, the operation would + +*/ + +int queue_insert_safe(register QUEUE *queue, byte *element) +{ + + if (queue->elements == queue->max_elements) + { + if (!queue->auto_extent) + return 2; + else if (resize_queue(queue, queue->max_elements + queue->auto_extent)) + return 1; + } + + queue_insert(queue, element); + return 0; +} + + /* Remove item from queue */ /* Returns pointer to removed element */ diff --git a/sql/event.cc b/sql/event.cc index 7a9c476a35a..87a846cd2dd 100644 --- a/sql/event.cc +++ b/sql/event.cc @@ -14,16 +14,14 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "mysql_priv.h" -#include "event.h" #include "event_priv.h" +#include "event.h" #include "sp.h" /* TODO list : - The default value of created/modified should not be 0000-00-00 because of STRICT mode restricions. - - Remove m_ prefixes of member variables. - Use timestamps instead of datetime. @@ -53,9 +51,17 @@ - Consider using conditional variable when doing shutdown instead of waiting till all worker threads end. - Make event_timed::get_show_create_event() work + - Add function documentation whenever needed. + - Add logging to file + - Move comparison code to class event_timed + + - Overload event_timed::new to put the event directly in the DYNAMIC_ARRAY. + This will skip copy operation as well as will simplify the code which is + now aware of events_array DYNAMIC_ARRAY + Warning: - For now parallel execution is not possible because the same sp_head cannot be executed few times!!! There is still no lock attached to particular event. @@ -67,19 +73,60 @@ Warning: bool mysql_event_table_exists= 1; DYNAMIC_ARRAY events_array; -DYNAMIC_ARRAY evex_executing_queue; +DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME; +QUEUE EXEC_QUEUE_QUEUE_NAME; MEM_ROOT evex_mem_root; - //extern volatile uint thread_running; //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////// Static functions follow /////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// +void +evex_queue_init(EVEX_QUEUE_TYPE *queue) +{ +#ifndef EVEX_USE_QUEUE + VOID(my_init_dynamic_array(queue, sizeof(event_timed *), 50, 100)); +#else + if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, + 0 /*smallest_on_top*/, event_timed_compare_q, NULL, + 100 /*auto_extent*/)) + sql_print_error("Insufficient memory to initialize executing queue."); +#endif +} + + +int +evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element) +{ +#ifndef EVEX_USE_QUEUE + VOID(push_dynamic(queue, element)); + return 0; +#else + return queue_insert_safe(queue, element); +#endif +} +void +evex_queue_top_updated(EVEX_QUEUE_TYPE *queue) +{ +#ifdef EVEX_USE_QUEUE + queue_replaced(queue); +#endif +} +void +evex_queue_sort(EVEX_QUEUE_TYPE *queue) +{ +#ifndef EVEX_USE_QUEUE + qsort((gptr) dynamic_element(queue, 0, event_timed**), + queue->elements, + sizeof(event_timed **), + (qsort_cmp) event_timed_compare); +#endif +} /* NOTE Andrey: Document better Compares two TIME structures. @@ -98,7 +145,7 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) } -inline int +int my_time_compare(TIME *a, TIME *b) { /* @@ -107,6 +154,7 @@ my_time_compare(TIME *a, TIME *b) */ DBUG_ENTER("my_time_compare"); + if (a->year > b->year) DBUG_RETURN(1); @@ -143,19 +191,53 @@ my_time_compare(TIME *a, TIME *b) if (a->second < b->second) DBUG_RETURN(-1); - /*!! second_part is not compared !*/ + + if (a->second_part > b->second_part) + DBUG_RETURN(1); + + if (a->second_part < b->second_part) + DBUG_RETURN(-1); + DBUG_RETURN(0); } +int +evex_time_diff(TIME *a, TIME *b) +{ + my_bool in_gap; + DBUG_ENTER("my_time_diff"); + + return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b); +} + inline int event_timed_compare(event_timed **a, event_timed **b) { - return my_time_compare(&(*a)->execute_at, &(*b)->execute_at); + my_ulonglong a_t, b_t; + a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L + + (*a)->execute_at.second_part; + b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L + + (*b)->execute_at.second_part; + + if (a_t > b_t) + return 1; + else if (a_t < b_t) + return -1; + else + return 0; + +// return my_time_compare(&(*a)->execute_at, &(*b)->execute_at); } +int +event_timed_compare_q(void *vptr, byte* a, byte *b) +{ + return event_timed_compare((event_timed **)&a, (event_timed **)&b); +} + /* Open mysql.event table for read @@ -660,7 +742,10 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) VOID(push_dynamic(&events_array,(gptr) ett)); ett_copy= dynamic_element(&events_array, events_array.elements - 1, event_timed*); +/** VOID(push_dynamic(&evex_executing_queue, (gptr) &ett_copy)); +**/ + evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett_copy); /* There is a copy in the array which we don't need. sphead won't be @@ -674,11 +759,14 @@ evex_load_and_compile_event(THD * thd, sp_name *spn, bool use_lock) qsort of events_array.elements (the current number of elements). We know that the elements are stored in a contiguous block w/o holes. */ +/** qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**), evex_executing_queue.elements, sizeof(event_timed **), (qsort_cmp) event_timed_compare); - +**/ + evex_queue_sort(&EVEX_EQ_NAME); + if (use_lock) VOID(pthread_mutex_unlock(&LOCK_event_arrays)); @@ -703,7 +791,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock) if (use_lock) VOID(pthread_mutex_lock(&LOCK_event_arrays)); - +/** for (i= 0; i < evex_executing_queue.elements; ++i) { event_timed *et= *dynamic_element(&evex_executing_queue, i, event_timed**); @@ -733,6 +821,25 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock) goto done; } } +**/ + for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) + { + event_timed *et= *evex_queue_element(&EVEX_EQ_NAME, i, event_timed**); + DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?",db->str,name->str, et->dbname.str, + et->name.str)); + if (!sortcmp_lex_string(*name, et->name, system_charset_info) && + !sortcmp_lex_string(*db, et->dbname, system_charset_info)) + { + int idx= get_index_dynamic(&events_array, (gptr) et); + //we are lucky the event is in the executing queue, no need of second pass + //destruct first and then remove. the destructor will delete sp_head + et->free_sp(); + evex_queue_delete_element(&EVEX_EQ_NAME, idx); + evex_queue_delete_element(&EVEX_EQ_NAME, i); + // ok, we have cleaned + goto done; + } + } /* ToDo Andrey : Think about whether second pass is needed. All events diff --git a/sql/event.h b/sql/event.h index 12a965c1ec8..f3b49a99488 100644 --- a/sql/event.h +++ b/sql/event.h @@ -16,11 +16,9 @@ #ifndef _EVENT_H_ #define _EVENT_H_ -#include "sp_head.h" -#include "sp.h" - -extern ulong opt_event_executor; +#include "sp.h" +#include "sp_head.h" #define EVEX_OK SP_OK #define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND diff --git a/sql/event_executor.cc b/sql/event_executor.cc index 1e45d984425..74ac1b12297 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -14,9 +14,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "mysql_priv.h" -#include "event.h" #include "event_priv.h" +#include "event.h" #include "sp.h" @@ -40,6 +39,7 @@ pthread_mutex_t LOCK_event_arrays, bool evex_is_running= false; +ulonglong evex_main_thread_id= 0; ulong opt_event_executor; my_bool event_executor_running_global_var= false; static my_bool evex_mutexes_initted= false; @@ -74,6 +74,7 @@ void evex_init_mutexes() pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST); } + int init_events() { @@ -84,7 +85,7 @@ init_events() DBUG_PRINT("info",("Starting events main thread")); evex_init_mutexes(); - + VOID(pthread_mutex_lock(&LOCK_evex_running)); evex_is_running= false; event_executor_running_global_var= false; @@ -109,6 +110,7 @@ shutdown_events() VOID(pthread_mutex_lock(&LOCK_evex_running)); VOID(pthread_mutex_unlock(&LOCK_evex_running)); + pthread_mutex_destroy(&LOCK_event_arrays); pthread_mutex_destroy(&LOCK_workers_count); pthread_mutex_destroy(&LOCK_evex_running); @@ -182,7 +184,7 @@ event_executor_main(void *arg) if (init_event_thread(thd)) goto err; - + // make this thread invisible it has no vio -> show processlist won't see thd->system_thread= 1; @@ -200,7 +202,12 @@ event_executor_main(void *arg) thus data should be freed at later stage. */ VOID(my_init_dynamic_array(&events_array, sizeof(event_timed), 50, 100)); +/** VOID(my_init_dynamic_array(&evex_executing_queue, sizeof(event_timed *), 50, 100)); +**/ + + evex_queue_init(&EVEX_EQ_NAME); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); /* @@ -217,107 +224,132 @@ event_executor_main(void *arg) THD_CHECK_SENTRY(thd); /* Read queries from the IO/THREAD until this thread is killed */ + evex_main_thread_id= thd->thread_id; + while (!thd->killed) { TIME time_now; my_time_t now; my_ulonglong cnt; + event_timed *et; DBUG_PRINT("info", ("EVEX External Loop %d", ++cnt)); thd->proc_info = "Sleeping"; - my_sleep(1000000);// sleep 1s - if (!event_executor_running_global_var) + if (!evex_queue_num_elements(EVEX_EQ_NAME) || + !event_executor_running_global_var) + { + my_sleep(1000000);// sleep 1s continue; - time(&now); - my_tz_UTC->gmt_sec_to_TIME(&time_now, now); + } - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - for (i= 0; (i < evex_executing_queue.elements) && !thd->killed; ++i) { - event_timed *et= *dynamic_element(&evex_executing_queue,i,event_timed**); -// printf("%llu\n", TIME_to_ulonglong_datetime(&et->execute_at)); + int t2sleep; + + + /* + now let's see how much time to sleep, we know there is at least 1 + element in the queue. + */ + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); + + time(&now); + my_tz_UTC->gmt_sec_to_TIME(&time_now, now); + t2sleep= evex_time_diff(&et->execute_at, &time_now); + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + if (t2sleep > 0) + { + sql_print_information("Sleeping for %d seconds.", t2sleep); + printf("\nWHEN=%llu NOW=%llu\n", TIME_to_ulonglong_datetime(&et->execute_at), TIME_to_ulonglong_datetime(&time_now)); + /* + We sleep t2sleep seconds but we check every second whether this thread + has been killed, or there is new candidate + */ + while (t2sleep-- && !thd->killed && + evex_queue_num_elements(EVEX_EQ_NAME) && + (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et)) + my_sleep(1000000); + sql_print_information("Finished sleeping"); + } if (!event_executor_running_global_var) - break; + continue; - thd->proc_info = "Iterating"; - THD_CHECK_SENTRY(thd); - /* - if this is the first event which is after time_now then no - more need to iterate over more elements since the array is sorted. - */ - if (et->execute_at.year > 1969 && - my_time_compare(&time_now, &et->execute_at) == -1) - break; + } + + + VOID(pthread_mutex_lock(&LOCK_event_arrays)); + + if (!evex_queue_num_elements(EVEX_EQ_NAME)) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*); - if (et->status == MYSQL_EVENT_ENABLED && - !check_access(thd, EVENT_ACL, et->dbname.str, 0, 0, 0, - is_schema_db(et->dbname.str))) - { - pthread_t th; + /* + if this is the first event which is after time_now then no + more need to iterate over more elements since the array is sorted. + */ + if (et->execute_at.year > 1969 && + my_time_compare(&time_now, &et->execute_at) == -1) + { + VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + continue; + } + + if (et->status == MYSQL_EVENT_ENABLED) + { + pthread_t th; - DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num)); - thd->proc_info = "Starting new thread"; - sql_print_information(" Spawning a thread %d", ++iter_num); + DBUG_PRINT("info", (" Spawning a thread %d", ++iter_num)); + sql_print_information(" Spawning a thread %d", ++iter_num); #ifndef DBUG_FAULTY_THR - if (pthread_create(&th, NULL, event_executor_worker, (void*)et)) - { - sql_print_error("Problem while trying to create a thread"); - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); - } + sql_print_information(" Thread is not debuggable!"); + if (pthread_create(&th, NULL, event_executor_worker, (void*)et)) + { + sql_print_error("Problem while trying to create a thread"); + UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err); + } #else - event_executor_worker((void *) et); + event_executor_worker((void *) et); #endif - et->mark_last_executed(); - thd->proc_info = "Computing next time"; - et->compute_next_execution_time(); - et->update_fields(thd); - if ((et->execute_at.year && !et->expression) - || TIME_to_ulonglong_datetime(&et->execute_at) == 0L) - et->flags |= EVENT_EXEC_NO_MORE; - } + printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at)); + et->mark_last_executed(); + et->compute_next_execution_time(); + printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at)); + et->update_fields(thd); + if ((et->execute_at.year && !et->expression) || + TIME_to_ulonglong_datetime(&et->execute_at) == 0L) + et->flags |= EVENT_EXEC_NO_MORE; } - /* - Let's remove elements which won't be executed any more - The number is "i" and it is <= up to evex_executing_queue.elements - */ - j= 0; - while (j < i && j < evex_executing_queue.elements) + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) { - event_timed *et= *dynamic_element(&evex_executing_queue, j, event_timed**); - if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) + evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top + if (et->dropped) { - delete_dynamic_element(&evex_executing_queue, j); - DBUG_PRINT("EVEX main thread", ("DELETING FROM EXECUTION QUEUE [%s.%s]", - et->dbname.str, et->name.str)); - // nulling the position, will delete later - if (et->dropped) - { - // we have to drop the event - int idx; - et->drop(thd); - idx= get_index_dynamic(&events_array, (gptr) et); - DBUG_ASSERT(idx != -1); - delete_dynamic_element(&events_array, idx); - } - continue; + // we have to drop the event + int idx; + et->drop(thd); + idx= get_index_dynamic(&events_array, (gptr) et); + DBUG_ASSERT(idx != -1); + delete_dynamic_element(&events_array, idx); } - ++j; - } - if (evex_executing_queue.elements) - //ToDo Andrey : put a lock here - qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**), - evex_executing_queue.elements, - sizeof(event_timed **), - (qsort_cmp) event_timed_compare - ); + } else + evex_queue_first_updated(&EVEX_EQ_NAME); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - } + }// while err: // First manifest that this thread does not work and then destroy VOID(pthread_mutex_lock(&LOCK_evex_running)); - evex_is_running= false; + evex_is_running= false; + evex_main_thread_id= 0; VOID(pthread_mutex_unlock(&LOCK_evex_running)); sql_print_information("Event scheduler stopping"); @@ -341,7 +373,8 @@ err: VOID(pthread_mutex_lock(&LOCK_event_arrays)); // No need to use lock here if EVEX is not running but anyway - delete_dynamic(&evex_executing_queue); + delete_queue(&executing_queue); + evex_queue_destroy(&EVEX_EQ_NAME); delete_dynamic(&events_array); VOID(pthread_mutex_unlock(&LOCK_event_arrays)); @@ -353,8 +386,10 @@ err: pthread_mutex_lock(&LOCK_thread_count); thread_count--; thread_running--; +#ifndef DBUG_FAULTY_THR THD_CHECK_SENTRY(thd); delete thd; +#endif pthread_mutex_unlock(&LOCK_thread_count); @@ -366,8 +401,10 @@ err_no_thd: free_root(&evex_mem_root, MYF(0)); sql_print_information("Event scheduler stopped"); +#ifndef DBUG_FAULTY_THR my_thread_end(); pthread_exit(0); +#endif DBUG_RETURN(0);// Can't return anything here } @@ -386,6 +423,7 @@ event_executor_worker(void *event_void) init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); +#ifndef DBUG_FAULTY_THR my_thread_init(); if (!(thd = new THD)) // note that contructor of THD uses DBUG_ ! @@ -411,6 +449,9 @@ event_executor_worker(void *event_void) thread_count++; thread_running++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); +#else + thd= current_thd; +#endif // thd->security_ctx->priv_host is char[MAX_HOSTNAME] @@ -420,6 +461,8 @@ event_executor_worker(void *event_void) thd->security_ctx->priv_user= event->definer_user.str; thd->db= event->dbname.str; + if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0, + is_schema_db(event->dbname.str))) { char exec_time[200]; int ret; @@ -434,6 +477,7 @@ event_executor_worker(void *event_void) err: VOID(pthread_mutex_lock(&LOCK_thread_count)); +#ifndef DBUG_FAULTY_THR thread_count--; thread_running--; /* @@ -451,6 +495,7 @@ err: VOID(pthread_mutex_lock(&LOCK_thread_count)); THD_CHECK_SENTRY(thd); delete thd; +#endif VOID(pthread_mutex_unlock(&LOCK_thread_count)); err_no_thd: @@ -502,6 +547,12 @@ evex_load_events_from_db(THD *thd) "Table probably corrupted"); goto end; } + if (et->status != MYSQL_EVENT_ENABLED) + { + DBUG_PRINT("evex_load_events_from_db",("Event %s is disabled", et->name.str)); + delete et; + continue; + } DBUG_PRINT("evex_load_events_from_db", ("Event %s loaded from row. Time to compile", et->name.str)); @@ -515,8 +566,7 @@ evex_load_events_from_db(THD *thd) // let's find when to be executed et->compute_next_execution_time(); - DBUG_PRINT("evex_load_events_from_db", - ("Adding %s to the executor list.", et->name.str)); + DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list.")); VOID(push_dynamic(&events_array,(gptr) et)); /* We always add at the end so the number of elements - 1 is the place @@ -526,23 +576,21 @@ evex_load_events_from_db(THD *thd) */ et_copy= dynamic_element(&events_array, events_array.elements - 1, event_timed*); - VOID(push_dynamic(&evex_executing_queue,(gptr) &et_copy)); + + evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et_copy); + printf("%p %s\n", et_copy, et_copy->name.str); et->free_sphead_on_delete= false; delete et; } - end_read_record(&read_record_info); - qsort((gptr) dynamic_element(&evex_executing_queue, 0, event_timed**), - evex_executing_queue.elements, - sizeof(event_timed **), - (qsort_cmp) event_timed_compare - ); + ret= 0; + +end: VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + end_read_record(&read_record_info); thd->version--; // Force close to free memory - ret= 0; -end: close_thread_tables(thd); DBUG_PRINT("info", ("Finishing with status code %d", ret)); diff --git a/sql/event_priv.h b/sql/event_priv.h index bb18dee6e6d..f6653e473da 100644 --- a/sql/event_priv.h +++ b/sql/event_priv.h @@ -16,8 +16,11 @@ #ifndef _EVENT_PRIV_H_ #define _EVENT_PRIV_H_ +#include "mysql_priv.h" +#define EVEX_USE_QUEUE + #define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \ { VOID(pthread_mutex_unlock(&__mutex)); goto __label; } @@ -41,14 +44,6 @@ enum EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */ }; -extern bool evex_is_running; -extern bool mysql_event_table_exists; -extern DYNAMIC_ARRAY events_array; -extern DYNAMIC_ARRAY evex_executing_queue; -extern MEM_ROOT evex_mem_root; -extern pthread_mutex_t LOCK_event_arrays, - LOCK_workers_count, - LOCK_evex_running; int @@ -59,5 +54,72 @@ evex_db_find_event_aux(THD *thd, const LEX_STRING dbname, const LEX_STRING rname, TABLE *table); TABLE * -evex_open_event_table(THD *thd, enum thr_lock_type lock_type); +evex_open_event_table(THD *thd, enum thr_lock_type lock_type); + +int +event_timed_compare_q(void *vptr, byte* a, byte *b); + +int +evex_time_diff(TIME *a, TIME *b); + + + +#define EXEC_QUEUE_QUEUE_NAME executing_queue +#define EXEC_QUEUE_DARR_NAME evex_executing_queue + +#ifdef EVEX_USE_QUEUE + #define EVEX_QUEUE_TYPE QUEUE + #define EVEX_PTOQEL byte * + #define EVEX_EQ_NAME executing_queue + + #define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue)) + #define evex_queue_element(queue, idx, __cast) ((__cast)queue_top(queue)) + #define evex_queue_delete_element(queue, idx) queue_remove(queue, idx) + #define evex_queue_destroy(queue) delete_queue(queue) + #define evex_queue_first_updated(queue) queue_replaced(queue) + #define evex_queue_insert(queue, element) queue_insert_safe(queue, element); + +#else + #define EVEX_QUEUE_TYPE DYNAMIC_ARRAY + #define EVEX_PTOQEL gptr + #define EVEX_EQ_NAME evex_executing_queue + + #define evex_queue_element(queue, idx, __cast) dynamic_element(queue,idx, __cast) + #define evex_queue_delete_element(queue, idx) delete_dynamic_element(queue, idx); + #define evex_queue_destroy(queue) delete_dynamic(queue) +/* + push_dynamic() expects ptr to the memory to put in, to make things fast + so when a pointer has to be put inside a ptr-to-ptr is being passed +*/ + #define evex_queue_first_updated(queue) + #define evex_queue_insert(queue, element) VOID(push_dynamic(queue, &element)) + + +#endif + + +void +evex_queue_init(EVEX_QUEUE_TYPE *queue); + +int +evex_queue_insert2(EVEX_QUEUE_TYPE *queue, EVEX_PTOQEL element); + +void +evex_queue_sort(EVEX_QUEUE_TYPE *queue); + +#define evex_queue_num_elements(queue) queue.elements + + +extern bool evex_is_running; +extern bool mysql_event_table_exists; +extern DYNAMIC_ARRAY events_array; +extern DYNAMIC_ARRAY EXEC_QUEUE_DARR_NAME; +extern QUEUE EXEC_QUEUE_QUEUE_NAME; +extern MEM_ROOT evex_mem_root; +extern pthread_mutex_t LOCK_event_arrays, + LOCK_workers_count, + LOCK_evex_running; +extern ulonglong evex_main_thread_id; + + #endif /* _EVENT_PRIV_H_ */ diff --git a/sql/event_timed.cc b/sql/event_timed.cc index 22c6160edab..7c6399e91a1 100644 --- a/sql/event_timed.cc +++ b/sql/event_timed.cc @@ -14,9 +14,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "mysql_priv.h" -#include "event.h" #include "event_priv.h" +#include "event.h" #include "sp.h" @@ -789,7 +788,7 @@ event_timed::get_show_create_event(THD *thd, uint *length) + strlen("DO ") + + body.length + strlen(";"); - ret= dst= (char*) alloc_root(thd->mem_root, len); + ret= dst= (char*) alloc_root(thd->mem_root, len + 1); memcpy(dst, "CREATE EVENT ", tmp_len= strlen("CREATE EVENT ")); dst+= tmp_len; memcpy(dst, dbname.str, tmp_len=dbname.length); @@ -832,7 +831,7 @@ event_timed::get_show_create_event(THD *thd, uint *length) *dst= '\0'; *length= len; - + dst[len]= '\0'; return ret; } |