summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc249
1 files changed, 84 insertions, 165 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 44920b29c16..b42372ba6bd 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -90,34 +90,29 @@ Event_queue::Event_queue()
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
+ OP_ALREADY_EXISTS Event already in the queue
*/
int
-Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
+Event_queue::create_event(THD *thd, Event_parse_data *et)
{
int res;
Event_timed *et_new;
DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue));
+ res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new);
LOCK_QUEUE_DATA();
- if (check_existence && find_event(et->dbname, et->name, FALSE))
- {
- res= OP_ALREADY_EXISTS;
- goto end;
- }
-
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
+ if (!res)
{
DBUG_PRINT("info", ("new event in the queue %p", et_new));
queue_insert_safe(&queue, (byte *) et_new);
- on_queue_change();
+ notify_observers();
}
else if (res == OP_DISABLED_EVENT)
res= OP_OK;
-end:
UNLOCK_QUEUE_DATA();
+
DBUG_RETURN(res);
}
@@ -129,104 +124,54 @@ end:
Event_queue::update_event()
thd Thread
et The event to replace(add) into the queue
- new_schema New schema
- new_name New name
+ new_schema New schema, in case of RENAME TO
+ new_name New name, in case of RENAME TO
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
- OP_ALREADY_EXISTS Event already in the queue
*/
int
Event_queue::update_event(THD *thd, Event_parse_data *et,
- LEX_STRING *new_schema,
- LEX_STRING *new_name)
+ LEX_STRING *new_schema, LEX_STRING *new_name)
{
- int res= OP_OK;
- Event_timed *et_old, *et_new= NULL;
- LEX_STRING old_schema, old_name;
-
- LINT_INIT(old_schema.str);
- LINT_INIT(old_schema.length);
- LINT_INIT(old_name.str);
- LINT_INIT(old_name.length);
+ int res;
+ Event_timed *et_old= NULL, *et_new= NULL;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
thd, et, et->dbname.str, et->name.str, &LOCK_event_queue));
+ res= db_repository->
+ load_named_event_timed(thd, new_schema?*new_schema:et->dbname,
+ new_name? *new_name:et->name, &et_new);
+
+ if (res && res != OP_DISABLED_EVENT)
+ goto end;
+
LOCK_QUEUE_DATA();
if (!(et_old= find_event(et->dbname, et->name, TRUE)))
+ {
DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
et->dbname.str, et->name.str));
-
- if (new_schema && new_name)
- {
- old_schema= et->dbname;
- old_name= et->name;
- et->dbname= *new_schema;
- et->name= *new_name;
}
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
+ if (!res)
{
DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
queue_insert_safe(&queue, (byte *) et_new);
- on_queue_change();
}
else if (res == OP_DISABLED_EVENT)
res= OP_OK;
-
- if (new_schema && new_name)
- {
- et->dbname= old_schema;
- et->name= old_name;
- }
- DBUG_PRINT("info", ("res=%d", res));
UNLOCK_QUEUE_DATA();
- /*
- Andrey: Is this comment still truthful ???
-
- We don't move this code above because a potential kill_thread will call
- THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
- which is LOCK_event_queue on which the COND_new_work in ::run() locks.
- Hence, we try to acquire a lock which we have already acquired and we run
- into an assert. Holding LOCK_event_queue however is not needed because
- we don't touch any invariant of the scheduler anymore. ::drop_event() does
- the same.
- */
- if (et_old)
- {
- switch (et_old->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
+ notify_observers();
+
+ if (et_old)
+ delete et_old;
+end:
+ DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
}
@@ -256,40 +201,13 @@ Event_queue::drop_event(THD *thd, sp_name *name)
LOCK_QUEUE_DATA();
if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
DBUG_PRINT("info", ("No such event found, probably DISABLED"));
-
UNLOCK_QUEUE_DATA();
-
- /* See comments in ::replace_event() why this is split in two parts. */
if (et_old)
- {
- switch ((res= et_old->kill_thread(thd))) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- sql_print_error("SCHEDULER: Got unexpected error %d", res);
- DBUG_ASSERT(0);
- }
- }
+ delete et_old;
+ /*
+ We don't signal here because the scheduler will catch the change
+ next time it wakes up.
+ */
DBUG_RETURN(FALSE);
}
@@ -361,7 +279,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
- uint i= 0, dropped= 0;
+ uint i= 0;
while (i < queue.elements)
{
Event_timed *et= (Event_timed *) queue_element(&queue, i);
@@ -375,32 +293,22 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
counter and the (i < queue.elements) condition is ok.
*/
queue_remove(&queue, i);
-
- /* See replace_event() */
- switch (et->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et->flags |= EVENT_FREE_WHEN_FINISHED;
- ++dropped;
- break;
- case 0:
- delete et;
- ++dropped;
- break;
- default:
- DBUG_ASSERT(0);
- }
+ delete et;
}
else
i++;
}
- DBUG_PRINT("info", ("Dropped %lu", dropped));
/*
- Don't send COND_new_work because no need to wake up the scheduler thread.
- When it wakes next time up it will recalculate how much more it should
- sleep if the top of the queue has been changed by this method.
+ We don't call notify_observers() . If we remove the top event:
+ 1. The queue is empty. The scheduler will wake up at some time and realize
+ that the queue is empty. If create_event() comes inbetween it will
+ signal the scheduler
+ 2. The queue is not empty, but the next event after the previous top, won't
+ be executed any time sooner than the element we removed. Hence, we may
+ not notify the scheduler and it will realize the change when it
+ wakes up from timedwait.
*/
-
+
DBUG_VOID_RETURN;
}
@@ -418,16 +326,14 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
>=0 Number of dropped events
*/
-int
+void
Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
{
- int ret;
DBUG_ENTER("Event_queue::drop_schema_events");
LOCK_QUEUE_DATA();
drop_matching_events(thd, schema, event_timed_db_equal);
UNLOCK_QUEUE_DATA();
-
- DBUG_RETURN(ret);
+ DBUG_VOID_RETURN;
}
@@ -744,13 +650,13 @@ Event_queue::deinit_mutexes()
its state.
SYNOPSIS
- Event_queue::on_queue_change()
+ Event_queue::notify_observers()
*/
void
-Event_queue::on_queue_change()
+Event_queue::notify_observers()
{
- DBUG_ENTER("Event_queue::on_queue_change");
+ DBUG_ENTER("Event_queue::notify_observers");
DBUG_PRINT("info", ("Signalling change of the queue"));
scheduler->queue_changed();
DBUG_VOID_RETURN;
@@ -761,7 +667,7 @@ Event_queue::on_queue_change()
The implementation of full-fledged initialization.
SYNOPSIS
- Event_scheduler::init()
+ Event_queue::init()
RETURN VALUE
FALSE OK
@@ -769,15 +675,16 @@ Event_queue::on_queue_change()
*/
bool
-Event_queue::init(Event_db_repository *db_repo)
+Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
{
int i= 0;
bool ret= FALSE;
- DBUG_ENTER("Event_queue::init");
+ DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this=%p", this));
LOCK_QUEUE_DATA();
db_repository= db_repo;
+ scheduler= sched;
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_timed_compare_q, NULL, 30 /*auto_extent*/))
@@ -803,9 +710,9 @@ end:
void
-Event_queue::deinit()
+Event_queue::deinit_queue()
{
- DBUG_ENTER("Event_queue::deinit");
+ DBUG_ENTER("Event_queue::deinit_queue");
LOCK_QUEUE_DATA();
empty_queue();
@@ -833,6 +740,8 @@ void
Event_queue::empty_queue()
{
uint i;
+ DBUG_ENTER("Event_queue::empty_queue");
+ DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
/* empty the queue */
for (i= 0; i < events_count_no_lock(); ++i)
{
@@ -840,6 +749,7 @@ Event_queue::empty_queue()
delete et;
}
resize_queue(&queue, 0);
+ DBUG_VOID_RETURN;
}
@@ -864,6 +774,29 @@ Event_queue::top_changed()
}
+inline void
+Event_queue::dbug_dump_queue(time_t now)
+{
+#ifndef DBUG_OFF
+ Event_timed *et;
+ uint i;
+ DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
+ for (i = 0; i < queue.elements; i++)
+ {
+ et= ((Event_timed*)queue_element(&queue, i));
+ DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
+ DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
+ " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
+ TIME_to_ulonglong_datetime(&et->execute_at),
+ TIME_to_ulonglong_datetime(&et->starts),
+ TIME_to_ulonglong_datetime(&et->ends),
+ et->expression, sec_since_epoch_TIME(&et->execute_at), now,
+ (int)(sec_since_epoch_TIME(&et->execute_at) - now),
+ sec_since_epoch_TIME(&et->execute_at) <= now));
+ }
+#endif
+}
+
Event_timed *
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
struct timespec *abstime)
@@ -876,36 +809,22 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
LOCK_QUEUE_DATA();
do {
int res;
- Event_timed *et= NULL;
if (!queue.elements)
{
abstime->tv_sec= 0;
break;
}
- int i;
- DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
- for (i = 0; i < queue.elements; i++)
- {
- et= ((Event_timed*)queue_element(&queue, i));
- DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
- DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
- " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
- TIME_to_ulonglong_datetime(&et->execute_at),
- TIME_to_ulonglong_datetime(&et->starts),
- TIME_to_ulonglong_datetime(&et->ends),
- et->expression, sec_since_epoch_TIME(&et->execute_at), now,
- (int)(sec_since_epoch_TIME(&et->execute_at) - now),
- sec_since_epoch_TIME(&et->execute_at) <= now));
- }
- et= ((Event_timed*)queue_element(&queue, 0));
+ dbug_dump_queue(now);
+
+ Event_timed *et= ((Event_timed*)queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
if (top_time.tv_sec <= now)
{
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
- if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
- &et_new)))
+ if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name,
+ &et_new)))
{
DBUG_ASSERT(0);
break;