diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/event_data_objects.cc | 4 | ||||
-rw-r--r-- | sql/event_data_objects.h | 8 | ||||
-rw-r--r-- | sql/event_db_repository.cc | 29 | ||||
-rw-r--r-- | sql/event_queue.cc | 34 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 67 | ||||
-rw-r--r-- | sql/event_scheduler.h | 3 | ||||
-rw-r--r-- | sql/events.cc | 3 |
7 files changed, 99 insertions, 49 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index 5f879c6bea0..5cf8ec53a4d 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -676,7 +676,7 @@ Event_basic::load_string_fields(Field **fields, ...) Event_queue_element::Event_queue_element(): status_changed(FALSE), last_executed_changed(FALSE), on_completion(ON_COMPLETION_DROP), status(ENABLED), - expression(0), dropped(FALSE), flags(0) + expression(0), dropped(FALSE), execution_count(0) { DBUG_ENTER("Event_queue_element::Event_queue_element"); @@ -1413,6 +1413,8 @@ Event_queue_element::mark_last_executed(THD *thd) last_executed= time_now; /* was execute_at */ last_executed_changed= TRUE; + + execution_count++; } diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h index cf1bd5e390a..a912953c927 100644 --- a/sql/event_data_objects.h +++ b/sql/event_data_objects.h @@ -74,9 +74,9 @@ public: enum enum_status status; TIME last_executed; + TIME execute_at; TIME starts; TIME ends; - TIME execute_at; my_bool starts_null; my_bool ends_null; my_bool execute_at_null; @@ -84,10 +84,10 @@ public: longlong expression; interval_type interval; - uint flags;//all kind of purposes - bool dropped; + uint execution_count; + Event_queue_element(); virtual ~Event_queue_element(); @@ -170,6 +170,8 @@ public: ulong sql_mode; + uint execution_count; + Event_job_data(); virtual ~Event_job_data(); diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index ecf8d68e788..69bbaeeae03 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -523,7 +523,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data, { int ret= 0; CHARSET_INFO *scs= system_charset_info; - TABLE *table; + TABLE *table= NULL; char old_db_buf[NAME_LEN+1]; LEX_STRING old_db= { old_db_buf, sizeof(old_db_buf) }; bool dbchanged= FALSE; @@ -621,8 +621,29 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data, ok: if (dbchanged) (void) mysql_change_db(thd, old_db.str, 1); - if (table) - close_thread_tables(thd); + /* + When valgrinded, the following call may lead to the following error: + + Syscall param pwrite64(buf) points to uninitialised byte(s) + at 0x406003B: do_pwrite64 (in /lib/tls/libpthread.so.0) + by 0x40600EF: pwrite64 (in /lib/tls/libpthread.so.0) + by 0x856FF74: my_pwrite (my_pread.c:146) + by 0x85734E1: flush_cached_blocks (mf_keycache.c:2280) + .... + Address 0x6618110 is 56 bytes inside a block of size 927,772 alloc'd + at 0x401C451: malloc (vg_replace_malloc.c:149) + by 0x8578CDC: _mymalloc (safemalloc.c:138) + by 0x858E5E2: my_large_malloc (my_largepage.c:65) + by 0x8570634: init_key_cache (mf_keycache.c:343) + by 0x82EDA51: ha_init_key_cache(char const*, st_key_cache*) (handler.cc:2509) + by 0x8212071: process_key_caches(int (*)(char const*, st_key_cache*)) + (set_var.cc:3824) + by 0x8206D75: init_server_components() (mysqld.cc:3304) + by 0x8207163: main (mysqld.cc:3578) + + I think it is safe not to think about it. + */ + close_thread_tables(thd); DBUG_RETURN(0); err: @@ -900,7 +921,7 @@ Event_db_repository::drop_events_by_field(THD *thd, LEX_STRING field_value) { int ret= 0; - TABLE *table; + TABLE *table= NULL; Open_tables_state backup; READ_RECORD read_record_info; DBUG_ENTER("Event_db_repository::drop_events_by_field"); diff --git a/sql/event_queue.cc b/sql/event_queue.cc index c6c7d7f14ac..39b237987e9 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -98,7 +98,6 @@ event_queue_loader_thread(void *arg) end: deinit_event_thread(thd); - DBUG_RETURN(0); // Against gcc warnings } @@ -177,7 +176,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) scheduler= sched; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, - 0 /*smallest_on_top*/, event_queue_element_compare_q, + 0 /*max_on_top*/, event_queue_element_compare_q, NULL, EVENT_QUEUE_EXTENT)) { sql_print_error("SCHEDULER: Can't initialize the execution queue"); @@ -196,6 +195,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) goto err; pre_init_event_thread(new_thd); + new_thd->security_ctx->set_user((char*)"event_scheduler_loader"); event_queue_param_value= (struct event_queue_param *) my_malloc(sizeof(struct event_queue_param), MYF(0)); @@ -285,6 +285,7 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) LOCK_QUEUE_DATA(); DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); + dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); notify_observers(); @@ -356,6 +357,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, new_element, old_element)); queue_insert_safe(&queue, (byte *) new_element); } + dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); if (new_element) @@ -389,6 +391,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name) LOCK_QUEUE_DATA(); element= find_n_remove_event(dbname, name); + dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); if (element) @@ -427,7 +430,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, bool (*comparator)(LEX_STRING, Event_basic *)) { DBUG_ENTER("Event_queue::drop_matching_events"); - DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str)); + DBUG_PRINT("enter", ("pattern=%s", pattern.str)); uint i= 0; while (i < queue.elements) @@ -808,26 +811,29 @@ Event_queue::empty_queue() now Current timestamp */ -inline void +void Event_queue::dbug_dump_queue(time_t now) { #ifndef DBUG_OFF Event_queue_element *et; uint i; + DBUG_ENTER("Event_queue::dbug_dump_queue"); DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements)); for (i = 0; i < queue.elements; i++) { et= ((Event_queue_element*)queue_element(&queue, i)); DBUG_PRINT("info",("et=0x%lx db=%s name=%s",et, et->dbname.str, et->name.str)); - DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu " + DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu execs_so_far=%u" " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d", TIME_to_ulonglong_datetime(&et->execute_at), TIME_to_ulonglong_datetime(&et->starts), TIME_to_ulonglong_datetime(&et->ends), + et->execution_count, et->expression, sec_since_epoch_TIME(&et->execute_at), now, (int)(sec_since_epoch_TIME(&et->execute_at) - now), sec_since_epoch_TIME(&et->execute_at) <= now)); } + DBUG_VOID_RETURN; #endif } @@ -838,7 +844,7 @@ Event_queue::dbug_dump_queue(time_t now) `now` is compared against `execute_at` of the top element in the queue. SYNOPSIS - Event_queue::dbug_dump_queue() + Event_queue::get_top_for_execution_if_time() thd [in] Thread now [in] Current timestamp job_data [out] The object to execute @@ -873,7 +879,6 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, abstime->tv_sec= 0; break; } - dbug_dump_queue(now); Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); @@ -889,7 +894,11 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; - *job_data= new Event_job_data(); + if (!(*job_data= new Event_job_data())) + { + ret= TRUE; + break; + } if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) { @@ -902,13 +911,18 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, top->mark_last_executed(thd); if (top->compute_next_execution_time()) top->status= Event_queue_element::DISABLED; - DBUG_PRINT("info", ("event's status is %d", top->status)); + DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status)); + + (*job_data)->execution_count= top->execution_count; top->update_timing_fields(thd); if (((top->execute_at.year && !top->expression) || top->execute_at_null) || (top->status == Event_queue_element::DISABLED)) { DBUG_PRINT("info", ("removing from the queue")); + sql_print_information("SCHEDULER: Last execution of %s.%s. %s", + top->dbname.str, top->name.str, + top->dropped? "Dropping.":""); if (top->dropped) top->drop(thd); delete top; @@ -916,6 +930,8 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, } else queue_replaced(&queue); + + dbug_dump_queue(now); } while (0); UNLOCK_QUEUE_DATA(); diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index 0c39c1a512b..6e3a7ec2be8 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -28,9 +28,10 @@ #define SCHED_FUNC "<unknown>" #endif -#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__) -#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__) -#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__) +#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__) +#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__) +#define COND_STATE_WAIT(mythd, abstime, msg) \ + cond_wait(mythd, abstime, msg, SCHED_FUNC, __LINE__) extern pthread_attr_t connection_attrib; @@ -140,7 +141,7 @@ deinit_event_thread(THD *thd) thd->proc_info= "Clearing"; DBUG_ASSERT(thd->net.buff != 0); net_end(&thd->net); - DBUG_PRINT("exit", ("Scheduler thread finishing")); + DBUG_PRINT("exit", ("Event thread finishing")); pthread_mutex_lock(&LOCK_thread_count); thread_count--; thread_running--; @@ -262,9 +263,11 @@ event_worker_thread(void *arg) DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." "THD=0x%lx", time(NULL), thd)); - sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. " + "Execution %u", event->dbname.str, event->name.str, - event->definer.str, thd->thread_id); + event->definer.str, thd->thread_id, + event->execution_count); thd->enable_slow_log= TRUE; @@ -272,9 +275,8 @@ event_worker_thread(void *arg) evex_print_warnings(thd, event); - sql_print_information("SCHEDULER: [%s.%s of %s] executed " - " in thread thread %lu. RetCode=%d", - event->dbname.str, event->name.str, + sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. " + "RetCode=%d", event->dbname.str, event->name.str, event->definer.str, thd->thread_id, ret); if (ret == EVEX_COMPILE_ERROR) sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", @@ -456,7 +458,7 @@ Event_scheduler::run(THD *thd) thd->end_time(); /* Gets a minimized version */ if (queue->get_top_for_execution_if_time(thd, thd->query_start(), - &job_data, &abstime)) + &job_data, &abstime)) { sql_print_information("SCHEDULER: Serious error during getting next" " event to execute. Stopping."); @@ -469,31 +471,22 @@ Event_scheduler::run(THD *thd) if (!job_data && !abstime.tv_sec) { DBUG_PRINT("info", ("The queue is empty. Going to sleep")); - thd->enter_cond(&COND_state, &LOCK_scheduler_state, - "Waiting on empty queue"); - COND_STATE_WAIT(NULL); - thd->exit_cond(""); + COND_STATE_WAIT(thd, NULL, "Waiting on empty queue"); DBUG_PRINT("info", ("Woke up. Got COND_state")); - LOCK_DATA(); } else if (abstime.tv_sec) { - DBUG_PRINT("info", ("Have to sleep some time %u till", + DBUG_PRINT("info", ("Have to sleep some time %u s. till %u", abstime.tv_sec - thd->query_start(), abstime.tv_sec)); - thd->enter_cond(&COND_state, &LOCK_scheduler_state, - "Waiting for next activation"); - COND_STATE_WAIT(&abstime); + COND_STATE_WAIT(thd, &abstime, "Waiting for next activation"); /* If we get signal we should recalculate the whether it's the right time because there could be : 1. Spurious wake-up 2. The top of the queue was changed (new one becase of create/update) */ - /* This will do implicit UNLOCK_DATA() */ - thd->exit_cond(""); DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); - LOCK_DATA(); } else { @@ -610,7 +603,7 @@ Event_scheduler::stop() "workers count=%d", scheduler_states_names[state].str, workers_count())); /* thd could be 0x0, when shutting down */ - COND_STATE_WAIT(NULL); + COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop"); } while (state == STOPPING); DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); @@ -720,29 +713,43 @@ Event_scheduler::unlock_data(const char *func, uint line) SYNOPSIS Event_scheduler::cond_wait() - cond Conditional to wait for - mutex Mutex of the conditional - - RETURN VALUE - Error code of pthread_cond_wait() + thd Thread (Could be NULL during shutdown procedure) + abstime If not null then call pthread_cond_timedwait() + func Which function is requesting cond_wait + line On which line cond_wait is requested */ void -Event_scheduler::cond_wait(struct timespec *abstime, const char *func, - uint line) +Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg, + const char *func, uint line) { + DBUG_ENTER("Event_scheduler::cond_wait"); waiting_on_cond= TRUE; mutex_last_unlocked_at_line= line; mutex_scheduler_data_locked= FALSE; mutex_last_unlocked_in_func= func; + if (thd) + thd->enter_cond(&COND_state, &LOCK_scheduler_state, msg); + + DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":"")); if (!abstime) pthread_cond_wait(&COND_state, &LOCK_scheduler_state); else pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime); + if (thd) + { + /* + This will free the lock so we need to relock. Not the best thing to + do but we need to obey cond_wait() + */ + thd->exit_cond(""); + LOCK_DATA(); + } mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; mutex_scheduler_data_locked= TRUE; waiting_on_cond= FALSE; + DBUG_VOID_RETURN; } diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index b5c3dae49f8..cc6d0f7a20a 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -95,7 +95,8 @@ private: unlock_data(const char *func, uint line); void - cond_wait(struct timespec *abstime, const char *func, uint line); + cond_wait(THD *thd, struct timespec *abstime, const char* msg, + const char *func, uint line); pthread_mutex_t LOCK_scheduler_state; diff --git a/sql/events.cc b/sql/events.cc index 94ef8faa83d..f75a362f947 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -622,7 +622,7 @@ Events::init_mutexes() event_queue= new Event_queue; event_queue->init_mutexes(); - scheduler= new Event_scheduler(); + scheduler= new Event_scheduler; scheduler->init_mutexes(); } @@ -642,6 +642,7 @@ Events::destroy_mutexes() delete scheduler; delete db_repository; + delete event_queue; pthread_mutex_destroy(&LOCK_event_metadata); } |