summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-11 18:28:15 +0200
committerunknown <andrey@lmy004.>2006-07-11 18:28:15 +0200
commit42a8e2c9421854710679a0f6c3ceef6c0777ded4 (patch)
treef2fc5508efd161a1ef60e481f22fe90017197fa1 /sql/event_queue.cc
parent084f74426b5f19b47984ef298309e9a4015940c3 (diff)
downloadmariadb-git-42a8e2c9421854710679a0f6c3ceef6c0777ded4.tar.gz
WL#3337 (Event scheduler new architecture)
More small fixes to the API : use LEX_STRING instead of LEX_STRING* and if error then return bool(true) instead of error code. Merged functions. Reduced usage of sp_name. Fixed a lot of function documentation errors. Added function documentation wherever needed. Removed some unused defines and error codes. Next to come is batch rename of Event_scheduler_ng to Event_scheduler. mysql-test/r/events.result: update result mysql-test/r/events_logs_tests.result: update result mysql-test/t/events.test: more test coverage mysql-test/t/events_logs_tests.test: fix test sql/event_data_objects.cc: Cosmetics. Fix function documentation whenever needed. Move Event_job_data::compile() next to Event_job_data::execute() sql/event_data_objects.h: Remove unneeded error codes and defines Move function declarations at the end of the header sql/event_db_repository.cc: Fix function documentation. Event_db_repository::update_event() now uses LEX_STRING *-s instead of sp_name . Lower coupling. sql/event_db_repository.h: Event_db_repository::update_event() now uses LEX_STRING *-s instead of sp_name . Lower coupling. find_event -> find_named_event find_event_by_name is not used externally, merge with load_named_event() sql/event_queue.cc: LEX_STRING* to LEX_STRING Fix comments. Fix and add function documentation. Remove Event_queue::events_count() as it is unused Change get_top_for_execution_if_time() to return status code as return value and the object is in out parameter. sql/event_queue.h: LEX_STRING* to LEX_STRING Fix comments. Fix and add function documentation. Remove Event_queue::events_count() as it is unused Change get_top_for_execution_if_time() to return status code as return value and the object is in out parameter. Try to detect also lock attemptions for deadlocks. sql/event_scheduler_ng.cc: Always execute on thd->mem_root Fix according to changed API of Event_queue::get_top_for_execution_if_time() sql/events.cc: Fix function documentation. Fix code after API changes of internal Event module classes. sql/events.h: sp_name -> LEX_STRINGs sql/sql_parse.cc: Fix according to changed API of Events::show_create_event() sql/sql_yacc.yy: Don't pass NULL as third parameter to sp_head::init_strings()
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc371
1 files changed, 209 insertions, 162 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 0fbde5d8910..63dee303fc2 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -15,13 +15,14 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
-#include "events.h"
-#include "event_scheduler_ng.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
-#include "sp_head.h"
+#include "event_scheduler_ng.h"
+
+#define EVENT_QUEUE_INITIAL_SIZE 30
+#define EVENT_QUEUE_EXTENT 30
#ifdef __GNUC__
#if __GNUC__ >= 2
@@ -36,21 +37,20 @@
/*
- Compares the execute_at members of 2 Event_queue_element instances.
+ Compares the execute_at members of two Event_queue_element instances.
Used as callback for the prioritized queue when shifting
elements inside.
SYNOPSIS
event_queue_element_data_compare_q()
-
- vptr - not used (set it to NULL)
- a - first Event_queue_element object
- b - second Event_queue_element object
+ vptr Not used (set it to NULL)
+ a First Event_queue_element object
+ b Second Event_queue_element object
RETURN VALUE
- -1 - a->execute_at < b->execute_at
- 0 - a->execute_at == b->execute_at
- 1 - a->execute_at > b->execute_at
+ -1 a->execute_at < b->execute_at
+ 0 a->execute_at == b->execute_at
+ 1 a->execute_at > b->execute_at
NOTES
execute_at.second_part is not considered during comparison
@@ -73,9 +73,13 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
Event_queue::Event_queue()
{
- mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
- mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
- mutex_queue_data_locked= FALSE;
+ mutex_last_unlocked_at_line= mutex_last_locked_at_line=
+ mutex_last_attempted_lock_at_line= 0;
+
+ mutex_last_unlocked_in_func= mutex_last_locked_in_func=
+ mutex_last_attempted_lock_in_func= "";
+
+ mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
}
@@ -108,24 +112,6 @@ Event_queue::deinit_mutexes()
/*
- Signals the main scheduler thread that the queue has changed
- its state.
-
- SYNOPSIS
- Event_queue::notify_observers()
-*/
-
-void
-Event_queue::notify_observers()
-{
- DBUG_ENTER("Event_queue::notify_observers");
- DBUG_PRINT("info", ("Signalling change of the queue"));
- scheduler->queue_changed();
- DBUG_VOID_RETURN;
-}
-
-
-/*
Inits the queue
SYNOPSIS
@@ -148,8 +134,9 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
db_repository= db_repo;
scheduler= sched;
- if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
- event_queue_element_compare_q, NULL, 30 /*auto_extent*/))
+ if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
+ 0 /*smallest_on_top*/, event_queue_element_compare_q,
+ NULL, EVENT_QUEUE_EXTENT))
{
sql_print_error("SCHEDULER: Can't initialize the execution queue");
ret= TRUE;
@@ -172,7 +159,8 @@ end:
/*
- Deinits the queue
+ Deinits the queue. Remove all elements from it and destroys them
+ too.
SYNOPSIS
Event_queue::deinit_queue()
@@ -193,12 +181,12 @@ Event_queue::deinit_queue()
/*
- Creates an event in the scheduler queue
+ Adds an event to the queue.
SYNOPSIS
Event_queue::create_event()
- et The event to add
- check_existence Whether to check if already loaded.
+ dbname The schema of the new event
+ name The name of the new event
RETURN VALUE
OP_OK OK or scheduler not working
@@ -209,21 +197,21 @@ int
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
- Event_queue_element *element_new;
+ Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
- element_new= new Event_queue_element();
- res= db_repository->load_named_event(thd, dbname, name, element_new);
- if (res || element_new->status == Event_queue_element::DISABLED)
- delete element_new;
+ new_element= new Event_queue_element();
+ res= db_repository->load_named_event(thd, dbname, name, new_element);
+ if (res || new_element->status == Event_queue_element::DISABLED)
+ delete new_element;
else
{
- element_new->compute_next_execution_time();
+ new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new));
- queue_insert_safe(&queue, (byte *) element_new);
+ DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
+ queue_insert_safe(&queue, (byte *) new_element);
UNLOCK_QUEUE_DATA();
notify_observers();
@@ -254,53 +242,54 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
{
int res;
- Event_queue_element *element_old= NULL,
- *element_new;
+ Event_queue_element *old_element= NULL,
+ *new_element;
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
- element_new= new Event_queue_element();
+ new_element= new Event_queue_element();
res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
- new_name? *new_name:name, element_new);
+ new_name? *new_name:name, new_element);
if (res)
{
- delete element_new;
+ delete new_element;
goto end;
}
- else if (element_new->status == Event_queue_element::DISABLED)
+ else if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
Destroy the object but don't skip to end: because we may have to remove
object from the cache.
*/
- delete element_new;
- element_new= NULL;
+ delete new_element;
+ new_element= NULL;
}
else
- element_new->compute_next_execution_time();
+ new_element->compute_next_execution_time();
LOCK_QUEUE_DATA();
- if (!(element_old= find_event(dbname, name, TRUE)))
+ if (!(old_element= find_n_remove_event(dbname, name)))
{
DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
dbname.str, name.str));
}
/* If not disabled event */
- if (element_new)
+ if (new_element)
{
DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
- element_new, element_old));
- queue_insert_safe(&queue, (byte *) element_new);
+ new_element, old_element));
+ queue_insert_safe(&queue, (byte *) new_element);
}
UNLOCK_QUEUE_DATA();
- notify_observers();
+ if (new_element)
+ notify_observers();
- if (element_old)
- delete element_old;
+ if (old_element)
+ delete old_element;
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
@@ -326,7 +315,7 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA();
- element= find_event(dbname, name, TRUE);
+ element= find_n_remove_event(dbname, name);
UNLOCK_QUEUE_DATA();
if (element)
@@ -344,48 +333,6 @@ Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
/*
- Searches for an event in the queue
-
- SYNOPSIS
- Event_queue::find_event()
- db The schema of the event to find
- name The event to find
- remove_from_q If found whether to remove from the Q
-
- RETURN VALUE
- NULL Not found
- otherwise Address
-
- NOTE
- The caller should do the locking also the caller is responsible for
- actual signalling in case an event is removed from the queue
- (signalling COND_new_work for instance).
-*/
-
-Event_queue_element *
-Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
-{
- uint i;
- DBUG_ENTER("Event_queue::find_event");
-
- for (i= 0; i < queue.elements; ++i)
- {
- Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
- DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
- et->dbname.str, et->name.str));
- if (event_basic_identifier_equal(db, name, et))
- {
- if (remove_from_q)
- queue_remove(&queue, i);
- DBUG_RETURN(et);
- }
- }
-
- DBUG_RETURN(NULL);
-}
-
-
-/*
Drops all events from the in-memory queue and disk that match
certain pattern evaluated by a comparator function
@@ -404,7 +351,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
- bool (*comparator)(LEX_STRING *, Event_basic *))
+ bool (*comparator)(LEX_STRING, Event_basic *))
{
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
@@ -414,7 +361,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
{
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
- if (comparator(&pattern, et))
+ if (comparator(pattern, et))
{
/*
The queue is ordered. If we remove an element, then all elements after
@@ -468,25 +415,59 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
/*
- Returns the number of elements in the queue
+ Signals the observers (the main scheduler thread) that the
+ state of the queue has been changed.
SYNOPSIS
- Event_queue::events_count()
+ Event_queue::notify_observers()
+*/
+
+void
+Event_queue::notify_observers()
+{
+ DBUG_ENTER("Event_queue::notify_observers");
+ DBUG_PRINT("info", ("Signalling change of the queue"));
+ scheduler->queue_changed();
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Searches for an event in the queue
+
+ SYNOPSIS
+ Event_queue::find_n_remove_event()
+ db The schema of the event to find
+ name The event to find
RETURN VALUE
- Number of Event_queue_element objects in the queue
+ NULL Not found
+ otherwise Address
+
+ NOTE
+ The caller should do the locking also the caller is responsible for
+ actual signalling in case an event is removed from the queue.
*/
-uint
-Event_queue::events_count()
+Event_queue_element *
+Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
{
- uint n;
- DBUG_ENTER("Event_scheduler::events_count");
- LOCK_QUEUE_DATA();
- n= queue.elements;
- UNLOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("n=%u", n));
- DBUG_RETURN(n);
+ uint i;
+ DBUG_ENTER("Event_queue::find_n_remove_event");
+
+ for (i= 0; i < queue.elements; ++i)
+ {
+ Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
+ DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
+ et->dbname.str, et->name.str));
+ if (event_basic_identifier_equal(db, name, et))
+ {
+ queue_remove(&queue, i);
+ DBUG_RETURN(et);
+ }
+ }
+
+ DBUG_RETURN(NULL);
}
@@ -620,6 +601,11 @@ end:
SYNOPSIS
Event_queue::check_system_tables()
+ thd Thread
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error
*/
bool
@@ -738,6 +724,14 @@ Event_queue::empty_queue()
}
+/*
+ Dumps the queue to the trace log.
+
+ SYNOPSIS
+ Event_queue::dbug_dump_queue()
+ now Current timestamp
+*/
+
inline void
Event_queue::dbug_dump_queue(time_t now)
{
@@ -761,12 +755,37 @@ Event_queue::dbug_dump_queue(time_t now)
#endif
}
-Event_job_data *
+
+/*
+ Checks whether the top of the queue is elligible for execution and
+ returns an Event_job_data instance in case it should be executed.
+ `now` is compared against `execute_at` of the top element in the queue.
+
+ SYNOPSIS
+ Event_queue::dbug_dump_queue()
+ thd [in] Thread
+ now [in] Current timestamp
+ job_data [out] The object to execute
+ abstime [out] Time to sleep
+
+ RETURN VALUE
+ FALSE No error. If *job_data==NULL then top not elligible for execution.
+ Could be that there is no top. If abstime->tv_sec is set to value
+ greater than zero then use abstime with pthread_cond_timedwait().
+ If abstime->tv_sec is zero then sleep with pthread_cond_wait().
+ abstime->tv_nsec is always zero.
+ TRUE Error
+
+*/
+
+bool
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
+ Event_job_data **job_data,
struct timespec *abstime)
{
+ bool ret= FALSE;
struct timespec top_time;
- Event_job_data *et_new= NULL;
+ *job_data= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
abstime->tv_nsec= 0;
@@ -780,56 +799,58 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
}
dbug_dump_queue(now);
- Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0));
- top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
+ Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
- if (top_time.tv_sec <= now)
- {
- DBUG_PRINT("info", ("Ready for execution"));
- abstime->tv_sec= 0;
- et_new= new Event_job_data();
- if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
- et_new)))
- {
- delete et_new;
- et_new= NULL;
- DBUG_ASSERT(0);
- break;
- }
-
- et->mark_last_executed(thd);
- if (et->compute_next_execution_time())
- et->status= Event_queue_element::DISABLED;
- DBUG_PRINT("info", ("event's status is %d", et->status));
-
- et->update_timing_fields(thd);
- if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
- (et->status == Event_queue_element::DISABLED))
- {
- DBUG_PRINT("info", ("removing from the queue"));
- if (et->dropped)
- et->drop(thd);
- delete et;
- queue_remove(&queue, 0);
- }
- else
- queue_replaced(&queue);
- }
- else
+ top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
+
+ if (top_time.tv_sec > now)
{
abstime->tv_sec= top_time.tv_sec;
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
abstime->tv_sec));
+ break;
}
+
+ DBUG_PRINT("info", ("Ready for execution"));
+ abstime->tv_sec= 0;
+ *job_data= new Event_job_data();
+ if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
+ *job_data)))
+ {
+ delete *job_data;
+ *job_data= NULL;
+ ret= TRUE;
+ break;
+ }
+
+ top->mark_last_executed(thd);
+ if (top->compute_next_execution_time())
+ top->status= Event_queue_element::DISABLED;
+ DBUG_PRINT("info", ("event's status is %d", top->status));
+
+ top->update_timing_fields(thd);
+ if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
+ (top->status == Event_queue_element::DISABLED))
+ {
+ DBUG_PRINT("info", ("removing from the queue"));
+ if (top->dropped)
+ top->drop(thd);
+ delete top;
+ queue_remove(&queue, 0);
+ }
+ else
+ queue_replaced(&queue);
} while (0);
UNLOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new,
- abstime->tv_sec));
- if (et_new)
- DBUG_PRINT("info", ("db=%s name=%s definer=%s",
- et_new->dbname.str, et_new->name.str, et_new->definer.str));
- DBUG_RETURN(et_new);
+ DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
+ ret, *job_data, abstime->tv_sec));
+
+ if (*job_data)
+ DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
+ (*job_data)->name.str, (*job_data)->definer.str));
+
+ DBUG_RETURN(ret);
}
@@ -848,10 +869,18 @@ Event_queue::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_queue::lock_data");
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
+ mutex_last_attempted_lock_in_func= func;
+ mutex_last_attempted_lock_at_line= line;
+ mutex_queue_data_attempting_lock= TRUE;
pthread_mutex_lock(&LOCK_event_queue);
+ mutex_last_attempted_lock_in_func= "";
+ mutex_last_attempted_lock_at_line= 0;
+ mutex_queue_data_attempting_lock= FALSE;
+
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
mutex_queue_data_locked= TRUE;
+
DBUG_VOID_RETURN;
}
@@ -921,6 +950,13 @@ Event_queue::dump_internal_status(THD *thd)
protocol->store(&int_string);
ret= protocol->write();
+ /* queue_data_attempting_lock */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue data attempting lock"), scs);
+ int_string.set((longlong) mutex_queue_data_attempting_lock, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+
/* last locked at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue last locked at"), scs);
@@ -940,6 +976,17 @@ Event_queue::dump_internal_status(THD *thd)
mutex_last_unlocked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
+
+ /* last attempted lock at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue last attempted lock at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_attempted_lock_in_func,
+ mutex_last_attempted_lock_at_line));
+ protocol->store(&tmp_string);
+ ret= protocol->write();
+
#endif
DBUG_RETURN(FALSE);
}