summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/event_data_objects.cc4
-rw-r--r--sql/event_data_objects.h8
-rw-r--r--sql/event_db_repository.cc29
-rw-r--r--sql/event_queue.cc34
-rw-r--r--sql/event_scheduler.cc67
-rw-r--r--sql/event_scheduler.h3
-rw-r--r--sql/events.cc3
7 files changed, 99 insertions, 49 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index 5f879c6bea0..5cf8ec53a4d 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -676,7 +676,7 @@ Event_basic::load_string_fields(Field **fields, ...)
Event_queue_element::Event_queue_element():
status_changed(FALSE), last_executed_changed(FALSE),
on_completion(ON_COMPLETION_DROP), status(ENABLED),
- expression(0), dropped(FALSE), flags(0)
+ expression(0), dropped(FALSE), execution_count(0)
{
DBUG_ENTER("Event_queue_element::Event_queue_element");
@@ -1413,6 +1413,8 @@ Event_queue_element::mark_last_executed(THD *thd)
last_executed= time_now; /* was execute_at */
last_executed_changed= TRUE;
+
+ execution_count++;
}
diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h
index cf1bd5e390a..a912953c927 100644
--- a/sql/event_data_objects.h
+++ b/sql/event_data_objects.h
@@ -74,9 +74,9 @@ public:
enum enum_status status;
TIME last_executed;
+ TIME execute_at;
TIME starts;
TIME ends;
- TIME execute_at;
my_bool starts_null;
my_bool ends_null;
my_bool execute_at_null;
@@ -84,10 +84,10 @@ public:
longlong expression;
interval_type interval;
- uint flags;//all kind of purposes
-
bool dropped;
+ uint execution_count;
+
Event_queue_element();
virtual ~Event_queue_element();
@@ -170,6 +170,8 @@ public:
ulong sql_mode;
+ uint execution_count;
+
Event_job_data();
virtual ~Event_job_data();
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc
index ecf8d68e788..69bbaeeae03 100644
--- a/sql/event_db_repository.cc
+++ b/sql/event_db_repository.cc
@@ -523,7 +523,7 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
{
int ret= 0;
CHARSET_INFO *scs= system_charset_info;
- TABLE *table;
+ TABLE *table= NULL;
char old_db_buf[NAME_LEN+1];
LEX_STRING old_db= { old_db_buf, sizeof(old_db_buf) };
bool dbchanged= FALSE;
@@ -621,8 +621,29 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
ok:
if (dbchanged)
(void) mysql_change_db(thd, old_db.str, 1);
- if (table)
- close_thread_tables(thd);
+ /*
+ When valgrinded, the following call may lead to the following error:
+
+ Syscall param pwrite64(buf) points to uninitialised byte(s)
+ at 0x406003B: do_pwrite64 (in /lib/tls/libpthread.so.0)
+ by 0x40600EF: pwrite64 (in /lib/tls/libpthread.so.0)
+ by 0x856FF74: my_pwrite (my_pread.c:146)
+ by 0x85734E1: flush_cached_blocks (mf_keycache.c:2280)
+ ....
+ Address 0x6618110 is 56 bytes inside a block of size 927,772 alloc'd
+ at 0x401C451: malloc (vg_replace_malloc.c:149)
+ by 0x8578CDC: _mymalloc (safemalloc.c:138)
+ by 0x858E5E2: my_large_malloc (my_largepage.c:65)
+ by 0x8570634: init_key_cache (mf_keycache.c:343)
+ by 0x82EDA51: ha_init_key_cache(char const*, st_key_cache*) (handler.cc:2509)
+ by 0x8212071: process_key_caches(int (*)(char const*, st_key_cache*))
+ (set_var.cc:3824)
+ by 0x8206D75: init_server_components() (mysqld.cc:3304)
+ by 0x8207163: main (mysqld.cc:3578)
+
+ I think it is safe not to think about it.
+ */
+ close_thread_tables(thd);
DBUG_RETURN(0);
err:
@@ -900,7 +921,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
LEX_STRING field_value)
{
int ret= 0;
- TABLE *table;
+ TABLE *table= NULL;
Open_tables_state backup;
READ_RECORD read_record_info;
DBUG_ENTER("Event_db_repository::drop_events_by_field");
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index c6c7d7f14ac..39b237987e9 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -98,7 +98,6 @@ event_queue_loader_thread(void *arg)
end:
deinit_event_thread(thd);
-
DBUG_RETURN(0); // Against gcc warnings
}
@@ -177,7 +176,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
scheduler= sched;
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
- 0 /*smallest_on_top*/, event_queue_element_compare_q,
+ 0 /*max_on_top*/, event_queue_element_compare_q,
NULL, EVENT_QUEUE_EXTENT))
{
sql_print_error("SCHEDULER: Can't initialize the execution queue");
@@ -196,6 +195,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
goto err;
pre_init_event_thread(new_thd);
+ new_thd->security_ctx->set_user((char*)"event_scheduler_loader");
event_queue_param_value= (struct event_queue_param *)
my_malloc(sizeof(struct event_queue_param), MYF(0));
@@ -285,6 +285,7 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
LOCK_QUEUE_DATA();
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
+ dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
notify_observers();
@@ -356,6 +357,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
new_element, old_element));
queue_insert_safe(&queue, (byte *) new_element);
}
+ dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
if (new_element)
@@ -389,6 +391,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
LOCK_QUEUE_DATA();
element= find_n_remove_event(dbname, name);
+ dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
if (element)
@@ -427,7 +430,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*comparator)(LEX_STRING, Event_basic *))
{
DBUG_ENTER("Event_queue::drop_matching_events");
- DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
+ DBUG_PRINT("enter", ("pattern=%s", pattern.str));
uint i= 0;
while (i < queue.elements)
@@ -808,26 +811,29 @@ Event_queue::empty_queue()
now Current timestamp
*/
-inline void
+void
Event_queue::dbug_dump_queue(time_t now)
{
#ifndef DBUG_OFF
Event_queue_element *et;
uint i;
+ DBUG_ENTER("Event_queue::dbug_dump_queue");
DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
for (i = 0; i < queue.elements; i++)
{
et= ((Event_queue_element*)queue_element(&queue, i));
DBUG_PRINT("info",("et=0x%lx db=%s name=%s",et, et->dbname.str, et->name.str));
- DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
+ DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu execs_so_far=%u"
" expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
TIME_to_ulonglong_datetime(&et->execute_at),
TIME_to_ulonglong_datetime(&et->starts),
TIME_to_ulonglong_datetime(&et->ends),
+ et->execution_count,
et->expression, sec_since_epoch_TIME(&et->execute_at), now,
(int)(sec_since_epoch_TIME(&et->execute_at) - now),
sec_since_epoch_TIME(&et->execute_at) <= now));
}
+ DBUG_VOID_RETURN;
#endif
}
@@ -838,7 +844,7 @@ Event_queue::dbug_dump_queue(time_t now)
`now` is compared against `execute_at` of the top element in the queue.
SYNOPSIS
- Event_queue::dbug_dump_queue()
+ Event_queue::get_top_for_execution_if_time()
thd [in] Thread
now [in] Current timestamp
job_data [out] The object to execute
@@ -873,7 +879,6 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
abstime->tv_sec= 0;
break;
}
- dbug_dump_queue(now);
Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
@@ -889,7 +894,11 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
- *job_data= new Event_job_data();
+ if (!(*job_data= new Event_job_data()))
+ {
+ ret= TRUE;
+ break;
+ }
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
*job_data)))
{
@@ -902,13 +911,18 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
- DBUG_PRINT("info", ("event's status is %d", top->status));
+ DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
+
+ (*job_data)->execution_count= top->execution_count;
top->update_timing_fields(thd);
if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
(top->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
+ sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
+ top->dbname.str, top->name.str,
+ top->dropped? "Dropping.":"");
if (top->dropped)
top->drop(thd);
delete top;
@@ -916,6 +930,8 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
}
else
queue_replaced(&queue);
+
+ dbug_dump_queue(now);
} while (0);
UNLOCK_QUEUE_DATA();
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index 0c39c1a512b..6e3a7ec2be8 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -28,9 +28,10 @@
#define SCHED_FUNC "<unknown>"
#endif
-#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
-#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
-#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
+#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
+#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
+#define COND_STATE_WAIT(mythd, abstime, msg) \
+ cond_wait(mythd, abstime, msg, SCHED_FUNC, __LINE__)
extern pthread_attr_t connection_attrib;
@@ -140,7 +141,7 @@ deinit_event_thread(THD *thd)
thd->proc_info= "Clearing";
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net);
- DBUG_PRINT("exit", ("Scheduler thread finishing"));
+ DBUG_PRINT("exit", ("Event thread finishing"));
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
@@ -262,9 +263,11 @@ event_worker_thread(void *arg)
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
"THD=0x%lx", time(NULL), thd));
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. "
+ "Execution %u",
event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id);
+ event->definer.str, thd->thread_id,
+ event->execution_count);
thd->enable_slow_log= TRUE;
@@ -272,9 +275,8 @@ event_worker_thread(void *arg)
evex_print_warnings(thd, event);
- sql_print_information("SCHEDULER: [%s.%s of %s] executed "
- " in thread thread %lu. RetCode=%d",
- event->dbname.str, event->name.str,
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
+ "RetCode=%d", event->dbname.str, event->name.str,
event->definer.str, thd->thread_id, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
@@ -456,7 +458,7 @@ Event_scheduler::run(THD *thd)
thd->end_time();
/* Gets a minimized version */
if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
- &job_data, &abstime))
+ &job_data, &abstime))
{
sql_print_information("SCHEDULER: Serious error during getting next"
" event to execute. Stopping.");
@@ -469,31 +471,22 @@ Event_scheduler::run(THD *thd)
if (!job_data && !abstime.tv_sec)
{
DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
- thd->enter_cond(&COND_state, &LOCK_scheduler_state,
- "Waiting on empty queue");
- COND_STATE_WAIT(NULL);
- thd->exit_cond("");
+ COND_STATE_WAIT(thd, NULL, "Waiting on empty queue");
DBUG_PRINT("info", ("Woke up. Got COND_state"));
- LOCK_DATA();
}
else if (abstime.tv_sec)
{
- DBUG_PRINT("info", ("Have to sleep some time %u till",
+ DBUG_PRINT("info", ("Have to sleep some time %u s. till %u",
abstime.tv_sec - thd->query_start(), abstime.tv_sec));
- thd->enter_cond(&COND_state, &LOCK_scheduler_state,
- "Waiting for next activation");
- COND_STATE_WAIT(&abstime);
+ COND_STATE_WAIT(thd, &abstime, "Waiting for next activation");
/*
If we get signal we should recalculate the whether it's the right time
because there could be :
1. Spurious wake-up
2. The top of the queue was changed (new one becase of create/update)
*/
- /* This will do implicit UNLOCK_DATA() */
- thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
- LOCK_DATA();
}
else
{
@@ -610,7 +603,7 @@ Event_scheduler::stop()
"workers count=%d", scheduler_states_names[state].str,
workers_count()));
/* thd could be 0x0, when shutting down */
- COND_STATE_WAIT(NULL);
+ COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop");
} while (state == STOPPING);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
@@ -720,29 +713,43 @@ Event_scheduler::unlock_data(const char *func, uint line)
SYNOPSIS
Event_scheduler::cond_wait()
- cond Conditional to wait for
- mutex Mutex of the conditional
-
- RETURN VALUE
- Error code of pthread_cond_wait()
+ thd Thread (Could be NULL during shutdown procedure)
+ abstime If not null then call pthread_cond_timedwait()
+ func Which function is requesting cond_wait
+ line On which line cond_wait is requested
*/
void
-Event_scheduler::cond_wait(struct timespec *abstime, const char *func,
- uint line)
+Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
+ const char *func, uint line)
{
+ DBUG_ENTER("Event_scheduler::cond_wait");
waiting_on_cond= TRUE;
mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
+ if (thd)
+ thd->enter_cond(&COND_state, &LOCK_scheduler_state, msg);
+
+ DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
if (!abstime)
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
else
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
+ if (thd)
+ {
+ /*
+ This will free the lock so we need to relock. Not the best thing to
+ do but we need to obey cond_wait()
+ */
+ thd->exit_cond("");
+ LOCK_DATA();
+ }
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE;
waiting_on_cond= FALSE;
+ DBUG_VOID_RETURN;
}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index b5c3dae49f8..cc6d0f7a20a 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -95,7 +95,8 @@ private:
unlock_data(const char *func, uint line);
void
- cond_wait(struct timespec *abstime, const char *func, uint line);
+ cond_wait(THD *thd, struct timespec *abstime, const char* msg,
+ const char *func, uint line);
pthread_mutex_t LOCK_scheduler_state;
diff --git a/sql/events.cc b/sql/events.cc
index 94ef8faa83d..f75a362f947 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -622,7 +622,7 @@ Events::init_mutexes()
event_queue= new Event_queue;
event_queue->init_mutexes();
- scheduler= new Event_scheduler();
+ scheduler= new Event_scheduler;
scheduler->init_mutexes();
}
@@ -642,6 +642,7 @@ Events::destroy_mutexes()
delete scheduler;
delete db_repository;
+ delete event_queue;
pthread_mutex_destroy(&LOCK_event_metadata);
}