summaryrefslogtreecommitdiff
path: root/sql/event_queue.cc
diff options
context:
space:
mode:
authorunknown <andrey@lmy004.>2006-07-10 13:44:43 +0200
committerunknown <andrey@lmy004.>2006-07-10 13:44:43 +0200
commit974eecc246db18e29e5ef06db6f48a0f4b3f7951 (patch)
tree692560cb267120b449f56b1df9db9a918e5c7477 /sql/event_queue.cc
parentb9a7fe2757d9040296311e96a9e2740416d181f2 (diff)
downloadmariadb-git-974eecc246db18e29e5ef06db6f48a0f4b3f7951.tar.gz
WL#3337 (Event scheduler new architecture)
This patch introduces specialized Event data objects Event_basic as parent. Event_queue_element used for queue storage Event_timed used for SHOW EVENTS/ I_S.EVENTS / SHOW CREATE EVENT Event_job_data using during execution. Methods were moved out of Event_timed to other classes. This patch also introduces Events::LOCK_event_metadata. This patch gives new implementation of Events::dump_internal_status(). Now both the Event_scheduler and Event_queue return information during their ::dump_internal_status(). Shortened a bit the runtime for executing events test cases. mysql-test/r/events.result: update results mysql-test/r/events_bugs.result: update results mysql-test/r/events_logs_tests.result: update results mysql-test/r/events_scheduling.result: update results mysql-test/t/events.test: update test make --sleep more appropriate . saving some time could mean failure on loaded boxes though :( add tests for previously uncovered branches. mysql-test/t/events_bugs.test: update test make --sleep more appropriate . saving some time could mean failure on loaded boxes though :( add tests for previously uncovered branches. mysql-test/t/events_logs_tests.test: make the test shorter by time mysql-test/t/events_scheduling.test: when selecting always use ORDER BY mysql-test/t/events_stress.test: sleep 2.5secs for shorter stress test sql/event_data_objects.cc: Event_timed is no more used during execution. Event_timed is no more used during in the memory queue. Event_timed is only used for SHOW CREATE EVENT/ I_S.EVENTS/ SHOW EVENTS Event_basic is the parent of almost all Event data objects. Event_basic -> Event_queue_element (used for the memory queue) -> Event_timed Event_basic -> Event_job_data (the object used for execution) Sql_alloc -> Event_parse_data (used during parsing) sql/event_data_objects.h: Event_timed is no more used during execution. Event_timed is no more used during in the memory queue. Event_timed is only used for SHOW CREATE EVENT/ I_S.EVENTS/ SHOW EVENTS Event_basic is the parent of almost all Event data objects. Event_basic -> Event_queue_element (used for the memory queue) -> Event_timed Event_basic -> Event_job_data (the object used for execution) Sql_alloc -> Event_parse_data (used during parsing) sql/event_db_repository.cc: Cosmetics. load_named_event now uses Event_basic, for polymorphism find_event uses Event_basic, to be polymorphic. use Field **fields= table->field and then index fields[...] Add documentation. Fix documentation. sql/event_db_repository.h: Event_db_repository depends only on Event_basic's interface sql/event_queue.cc: Cosmetics. Don't use Event_timed for the queue and giving back object for execution. Event_queue_element is for the queue, Event_job_data is for execution. Add Event_queue::dump_internal_status() for SHOW SCHEDULER STATUS command sql/event_queue.h: Cosmetics. Don't use Event_timed for the queue and giving back object for execution. Event_queue_element is for the queue, Event_job_data is for execution. Add Event_queue::dump_internal_status() for SHOW SCHEDULER STATUS command sql/event_scheduler_ng.cc: Add back Event_scheduler::cond_wait() Add back Event_scheduler::dump_internal_status() Using Event_job_data for execution. Make the scheduler thread unkillable (thd->command= COM_DAEMON). Add a lot of documentation. sql/event_scheduler_ng.h: Add back Event_scheduler::cond_wait() Add back Event_scheduler::dump_internal_status() Using Event_job_data for execution. sql/events.cc: Documentation Add LOCK_event_metadata sql/events.h: Change the signature of Events::drop_event() not to use sp_name but LEX_STRING sql/share/errmsg.txt: Fix error message sql/sql_parse.cc: Events::drop_event() has new signature
Diffstat (limited to 'sql/event_queue.cc')
-rw-r--r--sql/event_queue.cc653
1 files changed, 365 insertions, 288 deletions
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index b42372ba6bd..bd8809ba708 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -36,16 +36,16 @@
/*
- Compares the execute_at members of 2 Event_timed instances.
+ Compares the execute_at members of 2 Event_queue_element instances.
Used as callback for the prioritized queue when shifting
elements inside.
SYNOPSIS
- event_timed_compare_q()
+ event_queue_element_data_compare_q()
vptr - not used (set it to NULL)
- a - first Event_timed object
- b - second Event_timed object
+ a - first Event_queue_element object
+ b - second Event_queue_element object
RETURN VALUE
-1 - a->execute_at < b->execute_at
@@ -57,14 +57,13 @@
*/
static int
-event_timed_compare_q(void *vptr, byte* a, byte *b)
+event_queue_element_compare_q(void *vptr, byte* a, byte *b)
{
- return my_time_compare(&((Event_timed *)a)->execute_at,
- &((Event_timed *)b)->execute_at);
+ return my_time_compare(&((Event_queue_element *)a)->execute_at,
+ &((Event_queue_element *)b)->execute_at);
}
-
/*
Constructor of class Event_queue.
@@ -79,6 +78,120 @@ Event_queue::Event_queue()
mutex_queue_data_locked= FALSE;
}
+
+/*
+ Inits mutexes.
+
+ SYNOPSIS
+ Event_queue::init_mutexes()
+*/
+
+void
+Event_queue::init_mutexes()
+{
+ pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
+}
+
+
+/*
+ Destroys mutexes.
+
+ SYNOPSIS
+ Event_queue::deinit_mutexes()
+*/
+
+void
+Event_queue::deinit_mutexes()
+{
+ pthread_mutex_destroy(&LOCK_event_queue);
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_queue::notify_observers()
+*/
+
+void
+Event_queue::notify_observers()
+{
+ DBUG_ENTER("Event_queue::notify_observers");
+ DBUG_PRINT("info", ("Signalling change of the queue"));
+ scheduler->queue_changed();
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Inits the queue
+
+ SYNOPSIS
+ Event_queue::init()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error
+*/
+
+bool
+Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
+{
+ int i= 0;
+ bool ret= FALSE;
+ DBUG_ENTER("Event_queue::init_queue");
+ DBUG_PRINT("enter", ("this=0x%lx", this));
+
+ LOCK_QUEUE_DATA();
+ db_repository= db_repo;
+ scheduler= sched;
+
+ if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
+ event_queue_element_compare_q, NULL, 30 /*auto_extent*/))
+ {
+ sql_print_error("SCHEDULER: Can't initialize the execution queue");
+ ret= TRUE;
+ goto end;
+ }
+
+ if (sizeof(my_time_t) != sizeof(time_t))
+ {
+ sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
+ "The scheduler may not work correctly. Stopping.");
+ DBUG_ASSERT(0);
+ ret= TRUE;
+ goto end;
+ }
+
+end:
+ UNLOCK_QUEUE_DATA();
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Deinits the queue
+
+ SYNOPSIS
+ Event_queue::deinit_queue()
+*/
+
+void
+Event_queue::deinit_queue()
+{
+ DBUG_ENTER("Event_queue::deinit_queue");
+
+ LOCK_QUEUE_DATA();
+ empty_queue();
+ delete_queue(&queue);
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_VOID_RETURN;
+}
+
+
/*
Creates an event in the scheduler queue
@@ -90,28 +203,31 @@ Event_queue::Event_queue()
RETURN VALUE
OP_OK OK or scheduler not working
OP_LOAD_ERROR Error during loading from disk
- OP_ALREADY_EXISTS Event already in the queue
*/
int
-Event_queue::create_event(THD *thd, Event_parse_data *et)
+Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
- Event_timed *et_new;
+ Event_queue_element *element_new;
DBUG_ENTER("Event_queue::create_event");
- DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue));
+ DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
- res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new);
- LOCK_QUEUE_DATA();
- if (!res)
+ element_new= new Event_queue_element();
+ res= db_repository->load_named_event(thd, dbname, name, element_new);
+ if (res || element_new->status == Event_queue_element::DISABLED)
+ delete element_new;
+ else
{
- DBUG_PRINT("info", ("new event in the queue %p", et_new));
- queue_insert_safe(&queue, (byte *) et_new);
+ element_new->compute_next_execution_time();
+
+ LOCK_QUEUE_DATA();
+ DBUG_PRINT("info", ("new event in the queue 0x%lx", element_new));
+ queue_insert_safe(&queue, (byte *) element_new);
+ UNLOCK_QUEUE_DATA();
+
notify_observers();
}
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
- UNLOCK_QUEUE_DATA();
DBUG_RETURN(res);
}
@@ -123,9 +239,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et)
SYNOPSIS
Event_queue::update_event()
thd Thread
- et The event to replace(add) into the queue
- new_schema New schema, in case of RENAME TO
- new_name New name, in case of RENAME TO
+ dbname Schema of the event
+ name Name of the event
+ new_schema New schema, in case of RENAME TO, otherwise NULL
+ new_name New name, in case of RENAME TO, otherwise NULL
RETURN VALUE
OP_OK OK or scheduler not working
@@ -133,43 +250,57 @@ Event_queue::create_event(THD *thd, Event_parse_data *et)
*/
int
-Event_queue::update_event(THD *thd, Event_parse_data *et,
+Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
LEX_STRING *new_schema, LEX_STRING *new_name)
{
int res;
- Event_timed *et_old= NULL, *et_new= NULL;
+ Event_queue_element *element_old= NULL,
+ *element_new;
DBUG_ENTER("Event_queue::update_event");
- DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
- thd, et, et->dbname.str, et->name.str, &LOCK_event_queue));
+ DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
- res= db_repository->
- load_named_event_timed(thd, new_schema?*new_schema:et->dbname,
- new_name? *new_name:et->name, &et_new);
+ element_new= new Event_queue_element();
- if (res && res != OP_DISABLED_EVENT)
+ res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
+ new_name? *new_name:name, element_new);
+ if (res)
+ {
+ delete element_new;
goto end;
+ }
+ else if (element_new->status == Event_queue_element::DISABLED)
+ {
+ DBUG_PRINT("info", ("The event is disabled."));
+ /*
+ Destroy the object but don't skip to end: because we may have to remove
+ object from the cache.
+ */
+ delete element_new;
+ element_new= NULL;
+ }
+ else
+ element_new->compute_next_execution_time();
LOCK_QUEUE_DATA();
- if (!(et_old= find_event(et->dbname, et->name, TRUE)))
+ if (!(element_old= find_event(dbname, name, TRUE)))
{
- DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
- et->dbname.str, et->name.str));
+ DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
+ dbname.str, name.str));
}
-
- if (!res)
+ /* If not disabled event */
+ if (element_new)
{
- DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
- queue_insert_safe(&queue, (byte *) et_new);
+ DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
+ element_new, element_old));
+ queue_insert_safe(&queue, (byte *) element_new);
}
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
UNLOCK_QUEUE_DATA();
notify_observers();
- if (et_old)
- delete et_old;
+ if (element_old)
+ delete element_old;
end:
DBUG_PRINT("info", ("res=%d", res));
DBUG_RETURN(res);
@@ -181,35 +312,34 @@ end:
SYNOPSIS
Event_queue::drop_event()
- thd Thread
- name The event to drop
-
- RETURN VALUE
- FALSE OK (replaced or scheduler not working)
- TRUE Failure
+ thd Thread
+ dbname Schema of the event to drop
+ name Name of the event to drop
*/
-bool
-Event_queue::drop_event(THD *thd, sp_name *name)
+void
+Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
{
int res;
- Event_timed *et_old;
+ Event_queue_element *element;
DBUG_ENTER("Event_queue::drop_event");
- DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name,
- &LOCK_event_queue));
+ DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
LOCK_QUEUE_DATA();
- if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
- DBUG_PRINT("info", ("No such event found, probably DISABLED"));
+ element= find_event(dbname, name, TRUE);
UNLOCK_QUEUE_DATA();
- if (et_old)
- delete et_old;
+
+ if (element)
+ delete element;
+ else
+ DBUG_PRINT("info", ("No such event found, probably DISABLED"));
+
/*
We don't signal here because the scheduler will catch the change
next time it wakes up.
*/
- DBUG_RETURN(FALSE);
+ DBUG_VOID_RETURN;
}
@@ -232,7 +362,7 @@ Event_queue::drop_event(THD *thd, sp_name *name)
(signalling COND_new_work for instance).
*/
-Event_timed *
+Event_queue_element *
Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
{
uint i;
@@ -240,10 +370,10 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
for (i= 0; i < queue.elements; ++i)
{
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
et->dbname.str, et->name.str));
- if (event_timed_identifier_equal(db, name, et))
+ if (event_basic_identifier_equal(db, name, et))
{
if (remove_from_q)
queue_remove(&queue, i);
@@ -274,7 +404,7 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
- bool (*comparator)(Event_timed *,LEX_STRING *))
+ bool (*comparator)(LEX_STRING *, Event_basic *))
{
DBUG_ENTER("Event_queue::drop_matching_events");
DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
@@ -282,9 +412,9 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
uint i= 0;
while (i < queue.elements)
{
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
- if (comparator(et, &pattern))
+ if (comparator(&pattern, et))
{
/*
The queue is ordered. If we remove an element, then all elements after
@@ -331,67 +461,20 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
{
DBUG_ENTER("Event_queue::drop_schema_events");
LOCK_QUEUE_DATA();
- drop_matching_events(thd, schema, event_timed_db_equal);
+ drop_matching_events(thd, schema, event_basic_db_equal);
UNLOCK_QUEUE_DATA();
DBUG_VOID_RETURN;
}
/*
- Wrapper for pthread_mutex_lock
-
- SYNOPSIS
- Event_queue::lock_data()
- mutex Mutex to lock
- line The line number on which the lock is done
-
- RETURN VALUE
- Error code of pthread_mutex_lock()
-*/
-
-void
-Event_queue::lock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_queue::lock_mutex");
- DBUG_PRINT("enter", ("func=%s line=%u", func, line));
- pthread_mutex_lock(&LOCK_event_queue);
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_queue_data_locked= TRUE;
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_mutex_unlock
-
- SYNOPSIS
- Event_queue::unlock_data()
- mutex Mutex to unlock
- line The line number on which the unlock is done
-*/
-
-void
-Event_queue::unlock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_queue::unlock_mutex");
- DBUG_PRINT("enter", ("func=%s line=%u", func, line));
- mutex_last_unlocked_at_line= line;
- mutex_queue_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
- pthread_mutex_unlock(&LOCK_event_queue);
- DBUG_VOID_RETURN;
-}
-
-
-/*
Returns the number of elements in the queue
SYNOPSIS
Event_queue::events_count()
RETURN VALUE
- 0 Number of Event_timed objects in the queue
+ Number of Event_queue_element objects in the queue
*/
uint
@@ -408,31 +491,9 @@ Event_queue::events_count()
/*
- Returns the number of elements in the queue
-
- SYNOPSIS
- Event_queue::events_count_no_lock()
-
- RETURN VALUE
- 0 Number of Event_timed objects in the queue
-*/
-
-uint
-Event_queue::events_count_no_lock()
-{
- uint n;
- DBUG_ENTER("Event_queue::events_count_no_lock");
-
- n= queue.elements;
-
- DBUG_RETURN(n);
-}
-
-
-/*
Loads all ENABLED events from mysql.event into the prioritized
queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_timed instances for every ENABLED event
+ the events. Creates Event_queue_element instances for every ENABLED event
from mysql.event.
SYNOPSIS
@@ -461,7 +522,7 @@ Event_queue::load_events_from_db(THD *thd)
MEM_ROOT boot_root;
DBUG_ENTER("Event_queue::load_events_from_db");
- DBUG_PRINT("enter", ("thd=%p", thd));
+ DBUG_PRINT("enter", ("thd=0x%lx", thd));
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
{
@@ -469,12 +530,11 @@ Event_queue::load_events_from_db(THD *thd)
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
}
- init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
init_read_record(&read_record_info, thd, table ,NULL,1,0);
while (!(read_record_info.read_record(&read_record_info)))
{
- Event_timed *et;
- if (!(et= new Event_timed))
+ Event_queue_element *et;
+ if (!(et= new Event_queue_element))
{
DBUG_PRINT("info", ("Out of memory"));
clean_the_queue= TRUE;
@@ -489,13 +549,14 @@ Event_queue::load_events_from_db(THD *thd)
"Table probably corrupted");
break;
}
- if (et->status != Event_timed::ENABLED)
+ if (et->status != Event_queue_element::ENABLED)
{
DBUG_PRINT("info",("%s is disabled",et->name.str));
delete et;
continue;
}
-
+#if 0
+ init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
/* We load only on scheduler root just to check whether the body compiles */
@@ -514,6 +575,7 @@ Event_queue::load_events_from_db(THD *thd)
et->free_sp();
break;
}
+ free_root(&boot_root, MYF(0));
/* let's find when to be executed */
if (et->compute_next_execution_time())
@@ -522,19 +584,17 @@ Event_queue::load_events_from_db(THD *thd)
" Skipping", et->dbname.str, et->name.str);
continue;
}
-
- DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list."));
+#endif
+ DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
queue_insert_safe(&queue, (byte *) et);
count++;
}
end:
end_read_record(&read_record_info);
- free_root(&boot_root, MYF(0));
if (clean_the_queue)
{
- for (count= 0; count < queue.elements; ++count)
- queue_remove(&queue, 0);
+ empty_queue();
ret= -1;
}
else
@@ -571,7 +631,7 @@ Event_queue::check_system_tables(THD *thd)
bool ret;
DBUG_ENTER("Event_queue::check_system_tables");
- DBUG_PRINT("enter", ("thd=%p", thd));
+ DBUG_PRINT("enter", ("thd=0x%lx", thd));
thd->reset_n_backup_open_tables_state(&backup);
@@ -618,123 +678,48 @@ Event_queue::check_system_tables(THD *thd)
/*
- Inits mutexes.
-
- SYNOPSIS
- Event_queue::init_mutexes()
-*/
-
-void
-Event_queue::init_mutexes()
-{
- pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
-}
-
-
-/*
- Destroys mutexes.
+ Recalculates activation times in the queue. There is one reason for
+ that. Because the values (execute_at) by which the queue is ordered are
+ changed by calls to compute_next_execution_time() on a request from the
+ scheduler thread, if it is not running then the values won't be updated.
+ Once the scheduler is started again the values has to be recalculated
+ so they are right for the current time.
SYNOPSIS
- Event_queue::deinit_mutexes()
+ Event_queue::recalculate_activation_times()
+ thd Thread
*/
void
-Event_queue::deinit_mutexes()
+Event_queue::recalculate_activation_times(THD *thd)
{
- pthread_mutex_destroy(&LOCK_event_queue);
-}
-
-
-/*
- Signals the main scheduler thread that the queue has changed
- its state.
-
- SYNOPSIS
- Event_queue::notify_observers()
-*/
-
-void
-Event_queue::notify_observers()
-{
- DBUG_ENTER("Event_queue::notify_observers");
- DBUG_PRINT("info", ("Signalling change of the queue"));
- scheduler->queue_changed();
- DBUG_VOID_RETURN;
-}
-
-
-/*
- The implementation of full-fledged initialization.
-
- SYNOPSIS
- Event_queue::init()
-
- RETURN VALUE
- FALSE OK
- TRUE Error
-*/
-
-bool
-Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
-{
- int i= 0;
- bool ret= FALSE;
- DBUG_ENTER("Event_queue::init_queue");
- DBUG_PRINT("enter", ("this=%p", this));
+ uint i;
+ DBUG_ENTER("Event_queue::recalculate_activation_times");
LOCK_QUEUE_DATA();
- db_repository= db_repo;
- scheduler= sched;
-
- if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
- event_timed_compare_q, NULL, 30 /*auto_extent*/))
- {
- sql_print_error("SCHEDULER: Can't initialize the execution queue");
- ret= TRUE;
- goto end;
- }
-
- if (sizeof(my_time_t) != sizeof(time_t))
+ DBUG_PRINT("info", ("%u loaded events to be recalculated", queue.elements));
+ for (i= 0; i < queue.elements; i++)
{
- sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
- "The scheduler may not work correctly. Stopping.");
- DBUG_ASSERT(0);
- ret= TRUE;
- goto end;
+ ((Event_queue_element*)queue_element(&queue, i))->compute_next_execution_time();
+ ((Event_queue_element*)queue_element(&queue, i))->update_timing_fields(thd);
}
-
-end:
- UNLOCK_QUEUE_DATA();
- DBUG_RETURN(ret);
-}
-
-
-void
-Event_queue::deinit_queue()
-{
- DBUG_ENTER("Event_queue::deinit_queue");
-
- LOCK_QUEUE_DATA();
- empty_queue();
- delete_queue(&queue);
+ queue_fix(&queue);
UNLOCK_QUEUE_DATA();
DBUG_VOID_RETURN;
}
-void
-Event_queue::recalculate_queue(THD *thd)
-{
- uint i;
- for (i= 0; i < queue.elements; i++)
- {
- ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
- ((Event_timed*)queue_element(&queue, i))->update_fields(thd);
- }
- queue_fix(&queue);
-}
+/*
+ Empties the queue and destroys the Event_queue_element objects in the
+ queue.
+ SYNOPSIS
+ Event_queue::empty_queue()
+
+ NOTE
+ Should be called with LOCK_event_queue locked
+*/
void
Event_queue::empty_queue()
@@ -743,9 +728,9 @@ Event_queue::empty_queue()
DBUG_ENTER("Event_queue::empty_queue");
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
/* empty the queue */
- for (i= 0; i < events_count_no_lock(); ++i)
+ for (i= 0; i < queue.elements; ++i)
{
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
delete et;
}
resize_queue(&queue, 0);
@@ -753,38 +738,17 @@ Event_queue::empty_queue()
}
-Event_timed*
-Event_queue::get_top()
-{
- return (Event_timed *)queue_top(&queue);
-}
-
-
-void
-Event_queue::remove_top()
-{
- queue_remove(&queue, 0);// 0 is top, internally 1
-}
-
-
-void
-Event_queue::top_changed()
-{
- queue_replaced(&queue);
-}
-
-
inline void
Event_queue::dbug_dump_queue(time_t now)
{
#ifndef DBUG_OFF
- Event_timed *et;
+ Event_queue_element *et;
uint i;
DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
for (i = 0; i < queue.elements; i++)
{
- et= ((Event_timed*)queue_element(&queue, i));
- DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
+ et= ((Event_queue_element*)queue_element(&queue, i));
+ DBUG_PRINT("info",("et=0x%lx db=%s name=%s",et, et->dbname.str, et->name.str));
DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
" expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
TIME_to_ulonglong_datetime(&et->execute_at),
@@ -797,14 +761,14 @@ Event_queue::dbug_dump_queue(time_t now)
#endif
}
-Event_timed *
+Event_job_data *
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
struct timespec *abstime)
{
struct timespec top_time;
- Event_timed *et_new= NULL;
+ Event_job_data *et_new= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
- DBUG_PRINT("enter", ("thd=%p now=%d", thd, now));
+ DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
abstime->tv_nsec= 0;
LOCK_QUEUE_DATA();
do {
@@ -816,28 +780,31 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
}
dbug_dump_queue(now);
- Event_timed *et= ((Event_timed*)queue_element(&queue, 0));
+ Event_queue_element *et= ((Event_queue_element*) queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
if (top_time.tv_sec <= now)
{
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
- if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name,
- &et_new)))
+ et_new= new Event_job_data();
+ if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
+ et_new)))
{
+ delete et_new;
+ et_new= NULL;
DBUG_ASSERT(0);
break;
}
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
- et->status= Event_timed::DISABLED;
+ et->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event's status is %d", et->status));
- et->update_fields(thd);
+ et->update_timing_fields(thd);
if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
- (et->status == Event_timed::DISABLED))
+ (et->status == Event_queue_element::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (et->dropped)
@@ -857,12 +824,122 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
} while (0);
UNLOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new,
+ DBUG_PRINT("info", ("returning. et_new=0x%lx abstime.tv_sec=%d ", et_new,
abstime->tv_sec));
if (et_new)
- DBUG_PRINT("info", ("db=%s name=%s definer=%s "
- "et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str,
- et_new->definer.str,
- TIME_to_ulonglong_datetime(&et_new->execute_at)));
+ DBUG_PRINT("info", ("db=%s name=%s definer=%s",
+ et_new->dbname.str, et_new->name.str, et_new->definer.str));
DBUG_RETURN(et_new);
}
+
+
+/*
+ Auxiliary function for locking LOCK_event_queue. Used by the
+ LOCK_QUEUE_DATA macro
+
+ SYNOPSIS
+ Event_queue::lock_data()
+ func Which function is requesting mutex lock
+ line On which line mutex lock is requested
+*/
+
+void
+Event_queue::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_queue::lock_data");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
+ pthread_mutex_lock(&LOCK_event_queue);
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_queue_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Auxiliary function for unlocking LOCK_event_queue. Used by the
+ UNLOCK_QUEUE_DATA macro
+
+ SYNOPSIS
+ Event_queue::unlock_data()
+ func Which function is requesting mutex unlock
+ line On which line mutex unlock is requested
+*/
+
+void
+Event_queue::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_queue::unlock_data");
+ DBUG_PRINT("enter", ("func=%s line=%u", func, line));
+ mutex_last_unlocked_at_line= line;
+ mutex_queue_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+ pthread_mutex_unlock(&LOCK_event_queue);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Dumps the internal status of the queue
+
+ SYNOPSIS
+ Event_queue::dump_internal_status()
+ thd Thread
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error
+*/
+
+bool
+Event_queue::dump_internal_status(THD *thd)
+{
+ DBUG_ENTER("Event_queue::dump_internal_status");
+#ifndef DBUG_OFF
+ CHARSET_INFO *scs= system_charset_info;
+ Protocol *protocol= thd->protocol;
+ List<Item> field_list;
+ int ret;
+ char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
+ char int_buff[STRING_BUFFER_USUAL_SIZE];
+ String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
+ String int_string(int_buff, sizeof(int_buff), scs);
+ tmp_string.length(0);
+ int_string.length(0);
+
+ /* workers_count */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue element count"), scs);
+ int_string.set((longlong) queue.elements, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+
+ /* queue_data_locked */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue data locked"), scs);
+ int_string.set((longlong) mutex_queue_data_locked, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+
+ /* last locked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue last locked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_locked_in_func,
+ mutex_last_locked_at_line));
+ protocol->store(&tmp_string);
+ ret= protocol->write();
+
+ /* last unlocked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("queue last unlocked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_unlocked_in_func,
+ mutex_last_unlocked_at_line));
+ protocol->store(&tmp_string);
+ ret= protocol->write();
+#endif
+ DBUG_RETURN(FALSE);
+}