summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc169
1 files changed, 129 insertions, 40 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 32c5a076a62..44920b29c16 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -16,7 +16,7 @@
#include "mysql_priv.h"
#include "events.h"
-#include "event_scheduler.h"
+#include "event_scheduler_ng.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
@@ -35,10 +35,6 @@
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
-Event_scheduler*
-Event_queue::singleton= NULL;
-
-
/*
Compares the execute_at members of 2 Event_timed instances.
Used as callback for the prioritized queue when shifting
@@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
goto end;
}
- /* We need to load the event on scheduler_root */
if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new)))
{
+ DBUG_PRINT("info", ("new event in the queue %p", et_new));
queue_insert_safe(&queue, (byte *) et_new);
on_queue_change();
}
@@ -130,7 +126,7 @@ end:
Updates an event from the scheduler queue
SYNOPSIS
- Event_scheduler::update_event()
+ Event_queue::update_event()
thd Thread
et The event to replace(add) into the queue
new_schema New schema
@@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
et->dbname= *new_schema;
et->name= *new_name;
}
- /*
- We need to load the event (it's strings but on the object itself)
- on scheduler_root. et_new could be NULL :
- 1. Error occured
- 2. If the replace is DISABLED, we don't load it into the queue.
- */
+
if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new)))
{
+ DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
queue_insert_safe(&queue, (byte *) et_new);
on_queue_change();
}
@@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
/*
- Drops an event from the scheduler queue
+ Drops an event from the queue
SYNOPSIS
Event_queue::drop_event()
@@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name)
}
-
-
/*
- Searches for an event in the scheduler queue
+ Searches for an event in the queue
SYNOPSIS
Event_queue::find_event()
@@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
comparator The function to use for comparing
RETURN VALUE
- -1 Scheduler not working
>=0 Number of dropped events
NOTE
@@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
db The schema name
RETURN VALUE
- -1 Scheduler not working
>=0 Number of dropped events
*/
@@ -459,8 +447,7 @@ void
Event_queue::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_queue::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_event_queue, func, line));
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
pthread_mutex_lock(&LOCK_event_queue);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
@@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line)
void
Event_queue::unlock_data(const char *func, uint line)
{
- DBUG_ENTER("Event_queue::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- &LOCK_event_queue, func, line));
+ DBUG_ENTER("Event_queue::unlock_mutex");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
mutex_last_unlocked_at_line= line;
mutex_queue_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
@@ -510,7 +496,7 @@ Event_queue::events_count()
LOCK_QUEUE_DATA();
n= queue.elements;
UNLOCK_QUEUE_DATA();
-
+ DBUG_PRINT("info", ("n=%u", n));
DBUG_RETURN(n);
}
@@ -529,7 +515,7 @@ uint
Event_queue::events_count_no_lock()
{
uint n;
- DBUG_ENTER("Event_scheduler::events_count_no_lock");
+ DBUG_ENTER("Event_queue::events_count_no_lock");
n= queue.elements;
@@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd)
}
DBUG_PRINT("info", ("Loading event from row."));
- if ((ret= et->load_from_row(&scheduler_root, table)))
+ if ((ret= et->load_from_row(table)))
{
clean_the_queue= TRUE;
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
@@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd)
void
Event_queue::init_mutexes()
{
- pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
}
@@ -743,13 +729,13 @@ Event_queue::init_mutexes()
Destroys mutexes.
SYNOPSIS
- Event_queue::destroy_mutexes()
+ Event_queue::deinit_mutexes()
*/
void
-Event_queue::destroy_mutexes()
+Event_queue::deinit_mutexes()
{
- pthread_mutex_destroy(&singleton->LOCK_event_queue);
+ pthread_mutex_destroy(&LOCK_event_queue);
}
@@ -765,8 +751,8 @@ void
Event_queue::on_queue_change()
{
DBUG_ENTER("Event_queue::on_queue_change");
- DBUG_PRINT("info", ("Sending COND_new_work"));
- singleton->queue_changed();
+ DBUG_PRINT("info", ("Signalling change of the queue"));
+ scheduler->queue_changed();
DBUG_VOID_RETURN;
}
@@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo)
{
int i= 0;
bool ret= FALSE;
- DBUG_ENTER("Event_scheduler::init");
+ DBUG_ENTER("Event_queue::init");
DBUG_PRINT("enter", ("this=%p", this));
LOCK_QUEUE_DATA();
db_repository= db_repo;
- /* init memory root */
- init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_timed_compare_q, NULL, 30 /*auto_extent*/))
@@ -824,8 +808,8 @@ Event_queue::deinit()
DBUG_ENTER("Event_queue::deinit");
LOCK_QUEUE_DATA();
+ empty_queue();
delete_queue(&queue);
- free_root(&scheduler_root, MYF(0));
UNLOCK_QUEUE_DATA();
DBUG_VOID_RETURN;
@@ -835,7 +819,7 @@ Event_queue::deinit()
void
Event_queue::recalculate_queue(THD *thd)
{
- int i;
+ uint i;
for (i= 0; i < queue.elements; i++)
{
((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
@@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd)
void
Event_queue::empty_queue()
{
- int i;
+ uint i;
/* empty the queue */
for (i= 0; i < events_count_no_lock(); ++i)
{
Event_timed *et= (Event_timed *) queue_element(&queue, i);
- et->free_sp();
delete et;
}
resize_queue(&queue, 0);
}
+
+
+Event_timed*
+Event_queue::get_top()
+{
+ return (Event_timed *)queue_top(&queue);
+}
+
+
+void
+Event_queue::remove_top()
+{
+ queue_remove(&queue, 0);// 0 is top, internally 1
+}
+
+
+void
+Event_queue::top_changed()
+{
+ queue_replaced(&queue);
+}
+
+
+Event_timed *
+Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
+ struct timespec *abstime)
+{
+ struct timespec top_time;
+ Event_timed *et_new= NULL;
+ DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
+ DBUG_PRINT("enter", ("thd=%p now=%d", thd, now));
+ abstime->tv_nsec= 0;
+ LOCK_QUEUE_DATA();
+ do {
+ int res;
+ Event_timed *et= NULL;
+ if (!queue.elements)
+ {
+ abstime->tv_sec= 0;
+ break;
+ }
+ int i;
+ DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
+ for (i = 0; i < queue.elements; i++)
+ {
+ et= ((Event_timed*)queue_element(&queue, i));
+ DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
+ DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
+ " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
+ TIME_to_ulonglong_datetime(&et->execute_at),
+ TIME_to_ulonglong_datetime(&et->starts),
+ TIME_to_ulonglong_datetime(&et->ends),
+ et->expression, sec_since_epoch_TIME(&et->execute_at), now,
+ (int)(sec_since_epoch_TIME(&et->execute_at) - now),
+ sec_since_epoch_TIME(&et->execute_at) <= now));
+ }
+ et= ((Event_timed*)queue_element(&queue, 0));
+ top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
+
+ if (top_time.tv_sec <= now)
+ {
+ DBUG_PRINT("info", ("Ready for execution"));
+ abstime->tv_sec= 0;
+ if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
+ &et_new)))
+ {
+ DBUG_ASSERT(0);
+ break;
+ }
+
+ et->mark_last_executed(thd);
+ if (et->compute_next_execution_time())
+ et->status= Event_timed::DISABLED;
+ DBUG_PRINT("info", ("event's status is %d", et->status));
+
+ et->update_fields(thd);
+ if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
+ (et->status == Event_timed::DISABLED))
+ {
+ DBUG_PRINT("info", ("removing from the queue"));
+ if (et->dropped)
+ et->drop(thd);
+ delete et;
+ queue_remove(&queue, 0);
+ }
+ else
+ queue_replaced(&queue);
+ }
+ else
+ {
+ abstime->tv_sec= top_time.tv_sec;
+ DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
+ abstime->tv_sec));
+ }
+ } while (0);
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new,
+ abstime->tv_sec));
+ if (et_new)
+ DBUG_PRINT("info", ("db=%s name=%s definer=%s "
+ "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str,
+ et_new->definer.str,
+ TIME_to_ulonglong_datetime(&et_new->execute_at)));
+ DBUG_RETURN(et_new);
+}