summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc173
1 files changed, 124 insertions, 49 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index ba12b732726..5316bae8f4a 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -18,7 +18,6 @@
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
-#include "event_scheduler.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
@@ -87,6 +86,7 @@ Event_queue::Event_queue()
{
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
mutex_last_attempted_lock_in_func= "";
+ set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
}
@@ -135,8 +135,7 @@ Event_queue::deinit_mutexes()
*/
bool
-Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
- Event_scheduler *sched)
+Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
{
pthread_t th;
bool res;
@@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
LOCK_QUEUE_DATA();
db_repository= db_repo;
- scheduler= sched;
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
0 /*max_on_top*/, event_queue_element_compare_q,
@@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
dbug_dump_queue(thd->query_start());
+ pthread_cond_broadcast(&COND_queue_state);
UNLOCK_QUEUE_DATA();
-
- notify_observers();
}
DBUG_RETURN(res);
@@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
{
DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
queue_insert_safe(&queue, (byte *) new_element);
+ pthread_cond_broadcast(&COND_queue_state);
}
+
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
- if (new_element)
- notify_observers();
-
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
@@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
i++;
}
/*
- We don't call notify_observers() . If we remove the top event:
+ We don't call pthread_cond_broadcast(&COND_queue_state);
+ If we remove the top event:
1. The queue is empty. The scheduler will wake up at some time and
realize that the queue is empty. If create_event() comes inbetween
it will signal the scheduler
@@ -422,24 +419,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
/*
- Signals the observers (the main scheduler thread) that the
- state of the queue has been changed.
-
- SYNOPSIS
- Event_queue::notify_observers()
-*/
-
-void
-Event_queue::notify_observers()
-{
- DBUG_ENTER("Event_queue::notify_observers");
- DBUG_PRINT("info", ("Signalling change of the queue"));
- scheduler->queue_changed();
- DBUG_VOID_RETURN;
-}
-
-
-/*
Searches for an event in the queue
SYNOPSIS
@@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now)
#endif
}
+static const char *queue_empty_msg= "Waiting on empty queue";
+static const char *queue_wait_msg= "Waiting for next activation";
/*
Checks whether the top of the queue is elligible for execution and
@@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now)
*/
bool
-Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
- Event_job_data **job_data,
- struct timespec *abstime)
+Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
{
bool ret= FALSE;
struct timespec top_time;
+ struct timespec *abstime;
*job_data= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
- DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
- abstime->tv_nsec= 0;
+
+ top_time.tv_nsec= 0;
LOCK_QUEUE_DATA();
- do {
+ for (;;)
+ {
int res;
- if (!queue.elements)
- {
- abstime->tv_sec= 0;
- break;
- }
+ Event_queue_element *top= NULL;
- Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
+ thd->end_time();
+ time_t now= thd->query_start();
+ abstime= NULL;
- top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
+ if (queue.elements)
+ {
+ top= ((Event_queue_element*) queue_element(&queue, 0));
+ top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
- if (top_time.tv_sec > now)
+ abstime= &top_time;
+ }
+
+ if (!abstime || abstime->tv_sec > now)
{
- abstime->tv_sec= top_time.tv_sec;
- DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
- abstime->tv_sec));
- break;
+ const char *msg;
+ if (abstime)
+ {
+ next_activation_at= top->execute_at;
+ msg= queue_wait_msg;
+ }
+ else
+ {
+ set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
+ msg= queue_wait_msg;
+ }
+
+ cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__);
+ if (thd->killed)
+ {
+ DBUG_PRINT("info", ("thd->killed=%d", thd->killed));
+ goto end;
+ }
+ /*
+ The queue could have been emptied. Therefore it's safe to start from
+ the beginning. Moreover, this way we will get also the new top, if
+ the element at the top has been changed.
+ */
+ continue;
}
DBUG_PRINT("info", ("Ready for execution"));
- abstime->tv_sec= 0;
if (!(*job_data= new Event_job_data()))
{
ret= TRUE;
@@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
*job_data)))
{
+ DBUG_PRINT("error", ("Got %d from load_named_event", res));
delete *job_data;
*job_data= NULL;
ret= TRUE;
@@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
queue_replaced(&queue);
dbug_dump_queue(now);
- } while (0);
+ break;
+ }
+end:
UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
- ret, *job_data, abstime->tv_sec));
+ ret, *job_data, abstime? abstime->tv_sec:0));
if (*job_data)
DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
@@ -865,6 +872,52 @@ Event_queue::unlock_data(const char *func, uint line)
/*
+ Wrapper for pthread_cond_wait/timedwait
+
+ SYNOPSIS
+ Event_queue::cond_wait()
+ thd Thread (Could be NULL during shutdown procedure)
+ msg Message for thd->proc_info
+ abstime If not null then call pthread_cond_timedwait()
+ func Which function is requesting cond_wait
+ line On which line cond_wait is requested
+*/
+
+void
+Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
+ const char *func, uint line)
+{
+ DBUG_ENTER("Event_queue::cond_wait");
+ waiting_on_cond= TRUE;
+ mutex_last_unlocked_at_line= line;
+ mutex_queue_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+
+ thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg);
+
+ DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
+ if (!abstime)
+ pthread_cond_wait(&COND_queue_state, &LOCK_event_queue);
+ else
+ pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime);
+
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_queue_data_locked= TRUE;
+ waiting_on_cond= FALSE;
+
+ /*
+ This will free the lock so we need to relock. Not the best thing to
+ do but we need to obey cond_wait()
+ */
+ thd->exit_cond("");
+ LOCK_QUEUE_DATA();
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
Dumps the internal status of the queue
SYNOPSIS
@@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd)
protocol->store(&tmp_string);
ret= protocol->write();
+ /* waiting on */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs);
+ int_string.set((longlong) waiting_on_cond, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("next activation at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(),
+ "%4d-%02d-%02d %02d:%02d:%02d",
+ next_activation_at.year,
+ next_activation_at.month,
+ next_activation_at.day,
+ next_activation_at.hour,
+ next_activation_at.minute,
+ next_activation_at.second
+ ));
+ protocol->store(&tmp_string);
+ ret= protocol->write();
+
#endif
DBUG_RETURN(FALSE);
}