summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/event_data_objects.cc20
-rw-r--r--sql/event_db_repository.cc1
-rw-r--r--sql/event_db_repository.h3
-rw-r--r--sql/event_queue.cc841
-rw-r--r--sql/event_queue.h104
-rw-r--r--sql/event_scheduler.cc1006
-rw-r--r--sql/event_scheduler.h154
-rw-r--r--sql/events.cc1
8 files changed, 1182 insertions, 948 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index a47a3e9e936..f4147d72c3d 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -25,6 +25,19 @@
#define EVEX_MAX_INTERVAL_VALUE 2147483647L
+/*
+ Returns a new instance
+
+ SYNOPSIS
+ Event_parse_data::new_instance()
+
+ RETURN VALUE
+ Address or NULL in case of error
+
+ NOTE
+ Created on THD's mem_root
+*/
+
Event_parse_data *
Event_parse_data::new_instance(THD *thd)
{
@@ -32,6 +45,13 @@ Event_parse_data::new_instance(THD *thd)
}
+/*
+ Constructor
+
+ SYNOPSIS
+ Event_parse_data::Event_parse_data()
+*/
+
Event_parse_data::Event_parse_data()
{
item_execute_at= item_expression= item_starts= item_ends= NULL;
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc
index 68d8234cc90..8886992c839 100644
--- a/sql/event_db_repository.cc
+++ b/sql/event_db_repository.cc
@@ -1297,3 +1297,4 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK);
}
+
diff --git a/sql/event_db_repository.h b/sql/event_db_repository.h
index 625d5bfb993..e1c64c8aded 100644
--- a/sql/event_db_repository.h
+++ b/sql/event_db_repository.h
@@ -113,6 +113,9 @@ private:
int
table_scan_all_for_i_s(THD *thd, TABLE *schema_table, TABLE *event_table);
+ static bool
+ check_system_tables(THD *thd);
+
MEM_ROOT repo_root;
/* Prevent use of these */
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 46f965678c6..32c5a076a62 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -15,5 +15,846 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
+#include "events.h"
+#include "event_scheduler.h"
#include "event_queue.h"
#include "event_data_objects.h"
+#include "event_db_repository.h"
+#include "sp_head.h"
+
+
+#ifdef __GNUC__
+#if __GNUC__ >= 2
+#define SCHED_FUNC __FUNCTION__
+#endif
+#else
+#define SCHED_FUNC "<unknown>"
+#endif
+
+#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
+#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
+
+
+Event_scheduler*
+Event_queue::singleton= NULL;
+
+
+/*
+ Compares the execute_at members of 2 Event_timed instances.
+ Used as callback for the prioritized queue when shifting
+ elements inside.
+
+ SYNOPSIS
+ event_timed_compare_q()
+
+ vptr - not used (set it to NULL)
+ a - first Event_timed object
+ b - second Event_timed object
+
+ RETURN VALUE
+ -1 - a->execute_at < b->execute_at
+ 0 - a->execute_at == b->execute_at
+ 1 - a->execute_at > b->execute_at
+
+ NOTES
+ execute_at.second_part is not considered during comparison
+*/
+
+static int
+event_timed_compare_q(void *vptr, byte* a, byte *b)
+{
+ return my_time_compare(&((Event_timed *)a)->execute_at,
+ &((Event_timed *)b)->execute_at);
+}
+
+
+
+/*
+ Constructor of class Event_queue.
+
+ SYNOPSIS
+ Event_queue::Event_queue()
+*/
+
+Event_queue::Event_queue()
+{
+ mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
+ mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
+ mutex_queue_data_locked= FALSE;
+}
+
+/*
+ Creates an event in the scheduler queue
+
+ SYNOPSIS
+ Event_queue::create_event()
+ et The event to add
+ check_existence Whether to check if already loaded.
+
+ RETURN VALUE
+ OP_OK OK or scheduler not working
+ OP_LOAD_ERROR Error during loading from disk
+*/
+
+int
+Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
+{
+ int res;
+ Event_timed *et_new;
+ DBUG_ENTER("Event_queue::create_event");
+ DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue));
+
+ LOCK_QUEUE_DATA();
+ if (check_existence && find_event(et->dbname, et->name, FALSE))
+ {
+ res= OP_ALREADY_EXISTS;
+ goto end;
+ }
+
+ /* We need to load the event on scheduler_root */
+ if (!(res= db_repository->
+ load_named_event(thd, et->dbname, et->name, &et_new)))
+ {
+ queue_insert_safe(&queue, (byte *) et_new);
+ on_queue_change();
+ }
+ else if (res == OP_DISABLED_EVENT)
+ res= OP_OK;
+end:
+ UNLOCK_QUEUE_DATA();
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Updates an event from the scheduler queue
+
+ SYNOPSIS
+ Event_scheduler::update_event()
+ thd Thread
+ et The event to replace(add) into the queue
+ new_schema New schema
+ new_name New name
+
+ RETURN VALUE
+ OP_OK OK or scheduler not working
+ OP_LOAD_ERROR Error during loading from disk
+ OP_ALREADY_EXISTS Event already in the queue
+*/
+
+int
+Event_queue::update_event(THD *thd, Event_parse_data *et,
+ LEX_STRING *new_schema,
+ LEX_STRING *new_name)
+{
+ int res= OP_OK;
+ Event_timed *et_old, *et_new= NULL;
+ LEX_STRING old_schema, old_name;
+
+ LINT_INIT(old_schema.str);
+ LINT_INIT(old_schema.length);
+ LINT_INIT(old_name.str);
+ LINT_INIT(old_name.length);
+
+ DBUG_ENTER("Event_queue::update_event");
+ DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
+ thd, et, et->dbname.str, et->name.str, &LOCK_event_queue));
+
+ LOCK_QUEUE_DATA();
+ if (!(et_old= find_event(et->dbname, et->name, TRUE)))
+ DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
+ et->dbname.str, et->name.str));
+
+ if (new_schema && new_name)
+ {
+ old_schema= et->dbname;
+ old_name= et->name;
+ et->dbname= *new_schema;
+ et->name= *new_name;
+ }
+ /*
+ We need to load the event (it's strings but on the object itself)
+ on scheduler_root. et_new could be NULL :
+ 1. Error occured
+ 2. If the replace is DISABLED, we don't load it into the queue.
+ */
+ if (!(res= db_repository->
+ load_named_event(thd, et->dbname, et->name, &et_new)))
+ {
+ queue_insert_safe(&queue, (byte *) et_new);
+ on_queue_change();
+ }
+ else if (res == OP_DISABLED_EVENT)
+ res= OP_OK;
+
+ if (new_schema && new_name)
+ {
+ et->dbname= old_schema;
+ et->name= old_name;
+ }
+ DBUG_PRINT("info", ("res=%d", res));
+ UNLOCK_QUEUE_DATA();
+ /*
+ Andrey: Is this comment still truthful ???
+
+ We don't move this code above because a potential kill_thread will call
+ THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
+ which is LOCK_event_queue on which the COND_new_work in ::run() locks.
+ Hence, we try to acquire a lock which we have already acquired and we run
+ into an assert. Holding LOCK_event_queue however is not needed because
+ we don't touch any invariant of the scheduler anymore. ::drop_event() does
+ the same.
+ */
+ if (et_old)
+ {
+ switch (et_old->kill_thread(thd)) {
+ case EVEX_CANT_KILL:
+ /* Don't delete but continue */
+ et_old->flags |= EVENT_FREE_WHEN_FINISHED;
+ break;
+ case 0:
+ /*
+ kill_thread() waits till the spawned thread finishes after it's
+ killed. Hence, we delete here memory which is no more referenced from
+ a running thread.
+ */
+ delete et_old;
+ /*
+ We don't signal COND_new_work here because:
+ 1. Even if the dropped event is on top of the queue this will not
+ move another one to be executed before the time the one on the
+ top (but could be at the same second as the dropped one)
+ 2. If this was the last event on the queue, then pthread_cond_timedwait
+ in ::run() will finish and then see that the queue is empty and
+ call cond_wait(). Hence, no need to interrupt the blocked
+ ::run() thread.
+ */
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ }
+
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Drops an event from the scheduler queue
+
+ SYNOPSIS
+ Event_queue::drop_event()
+ thd Thread
+ name The event to drop
+
+ RETURN VALUE
+ FALSE OK (replaced or scheduler not working)
+ TRUE Failure
+*/
+
+bool
+Event_queue::drop_event(THD *thd, sp_name *name)
+{
+ int res;
+ Event_timed *et_old;
+ DBUG_ENTER("Event_queue::drop_event");
+ DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name,
+ &LOCK_event_queue));
+
+ LOCK_QUEUE_DATA();
+ if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
+ DBUG_PRINT("info", ("No such event found, probably DISABLED"));
+
+ UNLOCK_QUEUE_DATA();
+
+ /* See comments in ::replace_event() why this is split in two parts. */
+ if (et_old)
+ {
+ switch ((res= et_old->kill_thread(thd))) {
+ case EVEX_CANT_KILL:
+ /* Don't delete but continue */
+ et_old->flags |= EVENT_FREE_WHEN_FINISHED;
+ break;
+ case 0:
+ /*
+ kill_thread() waits till the spawned thread finishes after it's
+ killed. Hence, we delete here memory which is no more referenced from
+ a running thread.
+ */
+ delete et_old;
+ /*
+ We don't signal COND_new_work here because:
+ 1. Even if the dropped event is on top of the queue this will not
+ move another one to be executed before the time the one on the
+ top (but could be at the same second as the dropped one)
+ 2. If this was the last event on the queue, then pthread_cond_timedwait
+ in ::run() will finish and then see that the queue is empty and
+ call cond_wait(). Hence, no need to interrupt the blocked
+ ::run() thread.
+ */
+ break;
+ default:
+ sql_print_error("SCHEDULER: Got unexpected error %d", res);
+ DBUG_ASSERT(0);
+ }
+ }
+
+ DBUG_RETURN(FALSE);
+}
+
+
+
+
+/*
+ Searches for an event in the scheduler queue
+
+ SYNOPSIS
+ Event_queue::find_event()
+ db The schema of the event to find
+ name The event to find
+ remove_from_q If found whether to remove from the Q
+
+ RETURN VALUE
+ NULL Not found
+ otherwise Address
+
+ NOTE
+ The caller should do the locking also the caller is responsible for
+ actual signalling in case an event is removed from the queue
+ (signalling COND_new_work for instance).
+*/
+
+Event_timed *
+Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
+{
+ uint i;
+ DBUG_ENTER("Event_queue::find_event");
+
+ for (i= 0; i < queue.elements; ++i)
+ {
+ Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
+ et->dbname.str, et->name.str));
+ if (event_timed_identifier_equal(db, name, et))
+ {
+ if (remove_from_q)
+ queue_remove(&queue, i);
+ DBUG_RETURN(et);
+ }
+ }
+
+ DBUG_RETURN(NULL);
+}
+
+
+/*
+ Drops all events from the in-memory queue and disk that match
+ certain pattern evaluated by a comparator function
+
+ SYNOPSIS
+ Event_queue::drop_matching_events()
+ thd THD
+ pattern A pattern string
+ comparator The function to use for comparing
+
+ RETURN VALUE
+ -1 Scheduler not working
+ >=0 Number of dropped events
+
+ NOTE
+ Expected is the caller to acquire lock on LOCK_event_queue
+*/
+
+void
+Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
+ bool (*comparator)(Event_timed *,LEX_STRING *))
+{
+ DBUG_ENTER("Event_queue::drop_matching_events");
+ DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));
+
+ uint i= 0, dropped= 0;
+ while (i < queue.elements)
+ {
+ Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
+ if (comparator(et, &pattern))
+ {
+ /*
+ The queue is ordered. If we remove an element, then all elements after
+ it will shift one position to the left, if we imagine it as an array
+ from left to the right. In this case we should not increment the
+ counter and the (i < queue.elements) condition is ok.
+ */
+ queue_remove(&queue, i);
+
+ /* See replace_event() */
+ switch (et->kill_thread(thd)) {
+ case EVEX_CANT_KILL:
+ /* Don't delete but continue */
+ et->flags |= EVENT_FREE_WHEN_FINISHED;
+ ++dropped;
+ break;
+ case 0:
+ delete et;
+ ++dropped;
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ }
+ else
+ i++;
+ }
+ DBUG_PRINT("info", ("Dropped %lu", dropped));
+ /*
+ Don't send COND_new_work because no need to wake up the scheduler thread.
+ When it wakes next time up it will recalculate how much more it should
+ sleep if the top of the queue has been changed by this method.
+ */
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Drops all events from the in-memory queue and disk that are from
+ certain schema.
+
+ SYNOPSIS
+ Event_queue::drop_schema_events()
+ thd THD
+ db The schema name
+
+ RETURN VALUE
+ -1 Scheduler not working
+ >=0 Number of dropped events
+*/
+
+int
+Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
+{
+ int ret;
+ DBUG_ENTER("Event_queue::drop_schema_events");
+ LOCK_QUEUE_DATA();
+ drop_matching_events(thd, schema, event_timed_db_equal);
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Wrapper for pthread_mutex_lock
+
+ SYNOPSIS
+ Event_queue::lock_data()
+ mutex Mutex to lock
+ line The line number on which the lock is done
+
+ RETURN VALUE
+ Error code of pthread_mutex_lock()
+*/
+
+void
+Event_queue::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_queue::lock_mutex");
+ DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
+ &LOCK_event_queue, func, line));
+ pthread_mutex_lock(&LOCK_event_queue);
+ mutex_last_locked_in_func= func;
+ mutex_last_locked_at_line= line;
+ mutex_queue_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_mutex_unlock
+
+ SYNOPSIS
+ Event_queue::unlock_data()
+ mutex Mutex to unlock
+ line The line number on which the unlock is done
+*/
+
+void
+Event_queue::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_queue::UNLOCK_mutex");
+ DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
+ &LOCK_event_queue, func, line));
+ mutex_last_unlocked_at_line= line;
+ mutex_queue_data_locked= FALSE;
+ mutex_last_unlocked_in_func= func;
+ pthread_mutex_unlock(&LOCK_event_queue);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Returns the number of elements in the queue
+
+ SYNOPSIS
+ Event_queue::events_count()
+
+ RETURN VALUE
+ 0 Number of Event_timed objects in the queue
+*/
+
+uint
+Event_queue::events_count()
+{
+ uint n;
+ DBUG_ENTER("Event_scheduler::events_count");
+ LOCK_QUEUE_DATA();
+ n= queue.elements;
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_RETURN(n);
+}
+
+
+/*
+ Returns the number of elements in the queue
+
+ SYNOPSIS
+ Event_queue::events_count_no_lock()
+
+ RETURN VALUE
+ 0 Number of Event_timed objects in the queue
+*/
+
+uint
+Event_queue::events_count_no_lock()
+{
+ uint n;
+ DBUG_ENTER("Event_scheduler::events_count_no_lock");
+
+ n= queue.elements;
+
+ DBUG_RETURN(n);
+}
+
+
+/*
+ Loads all ENABLED events from mysql.event into the prioritized
+ queue. Called during scheduler main thread initialization. Compiles
+ the events. Creates Event_timed instances for every ENABLED event
+ from mysql.event.
+
+ SYNOPSIS
+ Event_queue::load_events_from_db()
+ thd - Thread context. Used for memory allocation in some cases.
+
+ RETURN VALUE
+ 0 OK
+ !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
+ EVEX_COMPILE_ERROR) - in all these cases mysql.event was
+ tampered.
+
+ NOTES
+ Reports the error to the console
+*/
+
+int
+Event_queue::load_events_from_db(THD *thd)
+{
+ TABLE *table;
+ READ_RECORD read_record_info;
+ int ret= -1;
+ uint count= 0;
+ bool clean_the_queue= FALSE;
+ /* Compile the events on this root but only for syntax check, then discard */
+ MEM_ROOT boot_root;
+
+ DBUG_ENTER("Event_queue::load_events_from_db");
+ DBUG_PRINT("enter", ("thd=%p", thd));
+
+ if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
+ {
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
+ DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
+ }
+
+ init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
+ init_read_record(&read_record_info, thd, table ,NULL,1,0);
+ while (!(read_record_info.read_record(&read_record_info)))
+ {
+ Event_timed *et;
+ if (!(et= new Event_timed))
+ {
+ DBUG_PRINT("info", ("Out of memory"));
+ clean_the_queue= TRUE;
+ break;
+ }
+ DBUG_PRINT("info", ("Loading event from row."));
+
+ if ((ret= et->load_from_row(&scheduler_root, table)))
+ {
+ clean_the_queue= TRUE;
+ sql_print_error("SCHEDULER: Error while loading from mysql.event. "
+ "Table probably corrupted");
+ break;
+ }
+ if (et->status != Event_timed::ENABLED)
+ {
+ DBUG_PRINT("info",("%s is disabled",et->name.str));
+ delete et;
+ continue;
+ }
+
+ DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
+
+ /* We load only on scheduler root just to check whether the body compiles */
+ switch (ret= et->compile(thd, &boot_root)) {
+ case EVEX_MICROSECOND_UNSUP:
+ et->free_sp();
+ sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
+ "supported but found in mysql.event");
+ goto end;
+ case EVEX_COMPILE_ERROR:
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
+ et->dbname.str, et->name.str);
+ goto end;
+ default:
+ /* Free it, it will be compiled again on the worker thread */
+ et->free_sp();
+ break;
+ }
+
+ /* let's find when to be executed */
+ if (et->compute_next_execution_time())
+ {
+ sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
+ " Skipping", et->dbname.str, et->name.str);
+ continue;
+ }
+
+ DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list."));
+ queue_insert_safe(&queue, (byte *) et);
+ count++;
+ }
+end:
+ end_read_record(&read_record_info);
+ free_root(&boot_root, MYF(0));
+
+ if (clean_the_queue)
+ {
+ for (count= 0; count < queue.elements; ++count)
+ queue_remove(&queue, 0);
+ ret= -1;
+ }
+ else
+ {
+ ret= 0;
+ sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
+ }
+
+ /* Force close to free memory */
+ thd->version--;
+
+ close_thread_tables(thd);
+
+ DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Opens mysql.db and mysql.user and checks whether:
+ 1. mysql.db has column Event_priv at column 20 (0 based);
+ 2. mysql.user has column Event_priv at column 29 (0 based);
+
+ SYNOPSIS
+ Event_queue::check_system_tables()
+*/
+
+bool
+Event_queue::check_system_tables(THD *thd)
+{
+ TABLE_LIST tables;
+ bool not_used;
+ Open_tables_state backup;
+ bool ret;
+
+ DBUG_ENTER("Event_queue::check_system_tables");
+ DBUG_PRINT("enter", ("thd=%p", thd));
+
+ thd->reset_n_backup_open_tables_state(&backup);
+
+ bzero((char*) &tables, sizeof(tables));
+ tables.db= (char*) "mysql";
+ tables.table_name= tables.alias= (char*) "db";
+ tables.lock_type= TL_READ;
+
+ if ((ret= simple_open_n_lock_tables(thd, &tables)))
+ sql_print_error("Cannot open mysql.db");
+ else
+ {
+ ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
+ mysql_db_table_fields, &mysql_db_table_last_check,
+ ER_CANNOT_LOAD_FROM_TABLE);
+ close_thread_tables(thd);
+ }
+ if (ret)
+ DBUG_RETURN(TRUE);
+
+ bzero((char*) &tables, sizeof(tables));
+ tables.db= (char*) "mysql";
+ tables.table_name= tables.alias= (char*) "user";
+ tables.lock_type= TL_READ;
+
+ if ((ret= simple_open_n_lock_tables(thd, &tables)))
+ sql_print_error("Cannot open mysql.db");
+ else
+ {
+ if (tables.table->s->fields < 29 ||
+ strncmp(tables.table->field[29]->field_name,
+ STRING_WITH_LEN("Event_priv")))
+ {
+ sql_print_error("mysql.user has no `Event_priv` column at position 29");
+ ret= TRUE;
+ }
+ close_thread_tables(thd);
+ }
+
+ thd->restore_backup_open_tables_state(&backup);
+
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Inits mutexes.
+
+ SYNOPSIS
+ Event_queue::init_mutexes()
+*/
+
+void
+Event_queue::init_mutexes()
+{
+ pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST);
+}
+
+
+/*
+ Destroys mutexes.
+
+ SYNOPSIS
+ Event_queue::destroy_mutexes()
+*/
+
+void
+Event_queue::destroy_mutexes()
+{
+ pthread_mutex_destroy(&singleton->LOCK_event_queue);
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_queue::on_queue_change()
+*/
+
+void
+Event_queue::on_queue_change()
+{
+ DBUG_ENTER("Event_queue::on_queue_change");
+ DBUG_PRINT("info", ("Sending COND_new_work"));
+ singleton->queue_changed();
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ The implementation of full-fledged initialization.
+
+ SYNOPSIS
+ Event_scheduler::init()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error
+*/
+
+bool
+Event_queue::init(Event_db_repository *db_repo)
+{
+ int i= 0;
+ bool ret= FALSE;
+ DBUG_ENTER("Event_scheduler::init");
+ DBUG_PRINT("enter", ("this=%p", this));
+
+ LOCK_QUEUE_DATA();
+ db_repository= db_repo;
+ /* init memory root */
+ init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
+
+ if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
+ event_timed_compare_q, NULL, 30 /*auto_extent*/))
+ {
+ sql_print_error("SCHEDULER: Can't initialize the execution queue");
+ ret= TRUE;
+ goto end;
+ }
+
+ if (sizeof(my_time_t) != sizeof(time_t))
+ {
+ sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
+ "The scheduler may not work correctly. Stopping.");
+ DBUG_ASSERT(0);
+ ret= TRUE;
+ goto end;
+ }
+
+end:
+ UNLOCK_QUEUE_DATA();
+ DBUG_RETURN(ret);
+}
+
+
+void
+Event_queue::deinit()
+{
+ DBUG_ENTER("Event_queue::deinit");
+
+ LOCK_QUEUE_DATA();
+ delete_queue(&queue);
+ free_root(&scheduler_root, MYF(0));
+ UNLOCK_QUEUE_DATA();
+
+ DBUG_VOID_RETURN;
+}
+
+
+void
+Event_queue::recalculate_queue(THD *thd)
+{
+ int i;
+ for (i= 0; i < queue.elements; i++)
+ {
+ ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
+ ((Event_timed*)queue_element(&queue, i))->update_fields(thd);
+ }
+ queue_fix(&queue);
+}
+
+
+void
+Event_queue::empty_queue()
+{
+ int i;
+ /* empty the queue */
+ for (i= 0; i < events_count_no_lock(); ++i)
+ {
+ Event_timed *et= (Event_timed *) queue_element(&queue, i);
+ et->free_sp();
+ delete et;
+ }
+ resize_queue(&queue, 0);
+}
diff --git a/sql/event_queue.h b/sql/event_queue.h
index b3aa6133840..8c11d7a2042 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -16,5 +16,107 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
+class sp_name;
+class Event_timed;
+class Event_db_repository;
+
+class THD;
+typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
+
+class Event_scheduler;
+
+class Event_queue
+{
+public:
+ Event_queue();
+
+ static void
+ init_mutexes();
+
+ static void
+ destroy_mutexes();
+
+ bool
+ init(Event_db_repository *db_repo);
+
+ void
+ deinit();
+
+ /* Methods for queue management follow */
+
+ int
+ create_event(THD *thd, Event_parse_data *et, bool check_existence);
+
+ int
+ update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
+ LEX_STRING *new_name);
+
+ bool
+ drop_event(THD *thd, sp_name *name);
+
+ int
+ drop_schema_events(THD *thd, LEX_STRING schema);
+
+ int
+ drop_user_events(THD *thd, LEX_STRING *definer)
+ { DBUG_ASSERT(0); return 0;}
+
+ uint
+ events_count();
+
+ uint
+ events_count_no_lock();
+
+ static bool
+ check_system_tables(THD *thd);
+
+ void
+ recalculate_queue(THD *thd);
+
+ void
+ empty_queue();
+
+///////////////protected
+ Event_timed *
+ find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
+
+ int
+ load_events_from_db(THD *thd);
+
+ void
+ drop_matching_events(THD *thd, LEX_STRING pattern,
+ bool (*)(Event_timed *,LEX_STRING *));
+
+ /* LOCK_event_queue is the mutex which protects the access to the queue. */
+ pthread_mutex_t LOCK_event_queue;
+
+ Event_db_repository *db_repository;
+
+ /* The MEM_ROOT of the object */
+ MEM_ROOT scheduler_root;
+
+ /* The sorted queue with the Event_timed objects */
+ QUEUE queue;
+
+ uint mutex_last_locked_at_line;
+ uint mutex_last_unlocked_at_line;
+ const char* mutex_last_locked_in_func;
+ const char* mutex_last_unlocked_in_func;
+ bool mutex_queue_data_locked;
+
+ /* helper functions for working with mutexes & conditionals */
+ void
+ lock_data(const char *func, uint line);
+
+ void
+ unlock_data(const char *func, uint line);
+
+ static void
+ on_queue_change();
+protected:
+ /* Singleton instance */
+ static Event_scheduler *singleton;
+
+};
+
#endif /* _EVENT_QUEUE_H_ */
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index 35c30e5fc5a..fb60ce8ae6d 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -264,11 +264,6 @@ LEX_STRING states_names[] =
};
#endif
-
-Event_scheduler
-Event_scheduler::singleton;
-
-
const char * const
Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] =
{
@@ -278,6 +273,13 @@ Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] =
};
+/*
+Event_scheduler*
+Event_scheduler::singleton= NULL;
+*/
+
+
+
class Worker_thread_param
{
public:
@@ -301,35 +303,6 @@ public:
/*
- Compares the execute_at members of 2 Event_timed instances.
- Used as callback for the prioritized queue when shifting
- elements inside.
-
- SYNOPSIS
- event_timed_compare_q()
-
- vptr - not used (set it to NULL)
- a - first Event_timed object
- b - second Event_timed object
-
- RETURN VALUE
- -1 - a->execute_at < b->execute_at
- 0 - a->execute_at == b->execute_at
- 1 - a->execute_at > b->execute_at
-
- NOTES
- execute_at.second_part is not considered during comparison
-*/
-
-static int
-event_timed_compare_q(void *vptr, byte* a, byte *b)
-{
- return my_time_compare(&((Event_timed *)a)->execute_at,
- &((Event_timed *)b)->execute_at);
-}
-
-
-/*
Prints the stack of infos, warnings, errors from thd to
the console so it can be fetched by the logs-into-tables and
checked later.
@@ -640,6 +613,8 @@ event_worker_thread(void *arg)
}
+
+
/*
Constructor of class Event_scheduler.
@@ -648,15 +623,35 @@ event_worker_thread(void *arg)
*/
Event_scheduler::Event_scheduler()
- :state(UNINITIALIZED), start_scheduler_suspended(FALSE),
- thread_id(0), mutex_last_locked_at_line(0),
- mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""),
- mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE),
- mutex_scheduler_data_locked(FALSE)
{
+ thread_id= 0;
+ mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0;
+ mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= "";
+ cond_waiting_on= COND_NONE;
+ mutex_scheduler_data_locked= FALSE;
+ state= UNINITIALIZED;
+ start_scheduler_suspended= FALSE;
+ LOCK_scheduler_data= &LOCK_event_queue;
}
+
+/*
+ Returns the singleton instance of the class.
+
+ SYNOPSIS
+ Event_scheduler::create_instance()
+
+ RETURN VALUE
+ address
+*/
+
+void
+Event_scheduler::create_instance()
+{
+ singleton= new Event_scheduler();
+}
+
/*
Returns the singleton instance of the class.
@@ -671,7 +666,7 @@ Event_scheduler*
Event_scheduler::get_instance()
{
DBUG_ENTER("Event_scheduler::get_instance");
- DBUG_RETURN(&singleton);
+ DBUG_RETURN(singleton);
}
@@ -693,9 +688,9 @@ Event_scheduler::init(Event_db_repository *db_repo)
bool ret= FALSE;
DBUG_ENTER("Event_scheduler::init");
DBUG_PRINT("enter", ("this=%p", this));
-
+
+ Event_queue::init(db_repo);
LOCK_SCHEDULER_DATA();
- db_repository= db_repo;
for (;i < COND_LAST; i++)
if (pthread_cond_init(&cond_vars[i], NULL))
{
@@ -703,27 +698,6 @@ Event_scheduler::init(Event_db_repository *db_repo)
ret= TRUE;
goto end;
}
-
- /* init memory root */
- init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
-
- if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
- event_timed_compare_q, NULL, 30 /*auto_extent*/))
- {
- sql_print_error("SCHEDULER: Can't initialize the execution queue");
- ret= TRUE;
- goto end;
- }
-
- if (sizeof(my_time_t) != sizeof(time_t))
- {
- sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
- "The scheduler may not work correctly. Stopping.");
- DBUG_ASSERT(0);
- ret= TRUE;
- goto end;
- }
-
state= INITIALIZED;
end:
UNLOCK_SCHEDULER_DATA();
@@ -746,14 +720,12 @@ void
Event_scheduler::destroy()
{
DBUG_ENTER("Event_scheduler");
-
+ Event_queue::deinit();
LOCK_SCHEDULER_DATA();
switch (state) {
case UNINITIALIZED:
break;
case INITIALIZED:
- delete_queue(&queue);
- free_root(&scheduler_root, MYF(0));
int i;
for (i= 0; i < COND_LAST; i++)
pthread_cond_destroy(&cond_vars[i]);
@@ -771,389 +743,6 @@ Event_scheduler::destroy()
}
-/*
- Creates an event in the scheduler queue
-
- SYNOPSIS
- Event_scheduler::create_event()
- et The event to add
- check_existence Whether to check if already loaded.
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
-*/
-
-int
-Event_scheduler::create_event(THD *thd, Event_parse_data *et, bool check_existence)
-{
- int res;
- Event_timed *et_new;
- DBUG_ENTER("Event_scheduler::create_event");
- DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
- if (check_existence && find_event(et->dbname, et->name, FALSE))
- {
- res= OP_ALREADY_EXISTS;
- goto end;
- }
-
- /* We need to load the event on scheduler_root */
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
- {
- queue_insert_safe(&queue, (byte *) et_new);
- DBUG_PRINT("info", ("Sending COND_new_work"));
- pthread_cond_signal(&cond_vars[COND_new_work]);
- }
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(res);
-}
-
-
-/*
- Drops an event from the scheduler queue
-
- SYNOPSIS
- Event_scheduler::drop_event()
- etn The event to drop
- state Wait the event or kill&drop
-
- RETURN VALUE
- FALSE OK (replaced or scheduler not working)
- TRUE Failure
-*/
-
-bool
-Event_scheduler::drop_event(THD *thd, sp_name *name)
-{
- int res;
- Event_timed *et_old;
- DBUG_ENTER("Event_scheduler::drop_event");
- DBUG_PRINT("enter", ("thd=%p name=%p lock=%p", thd, name,
- &LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
-
- if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
- DBUG_PRINT("info", ("No such event found, probably DISABLED"));
-
- UNLOCK_SCHEDULER_DATA();
-
- /* See comments in ::replace_event() why this is split in two parts. */
- if (et_old)
- {
- switch ((res= et_old->kill_thread(thd))) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- sql_print_error("SCHEDULER: Got unexpected error %d", res);
- DBUG_ASSERT(0);
- }
- }
-
- DBUG_RETURN(FALSE);
-}
-
-
-/*
- Updates an event from the scheduler queue
-
- SYNOPSIS
- Event_scheduler::replace_event()
- et The event to replace(add) into the queue
- state Async or sync stopping
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
- OP_ALREADY_EXISTS Event already in the queue
-*/
-
-int
-Event_scheduler::update_event(THD *thd, Event_parse_data *et,
- LEX_STRING *new_schema,
- LEX_STRING *new_name)
-{
- int res= OP_OK;
- Event_timed *et_old, *et_new= NULL;
- LEX_STRING old_schema, old_name;
-
- LINT_INIT(old_schema.str);
- LINT_INIT(old_schema.length);
- LINT_INIT(old_name.str);
- LINT_INIT(old_name.length);
-
- DBUG_ENTER("Event_scheduler::update_event");
- DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
- thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data));
-
- LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
- {
- DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state));
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(OP_OK);
- }
-
- if (!(et_old= find_event(et->dbname, et->name, TRUE)))
- DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
- et->dbname.str, et->name.str));
-
- if (new_schema && new_name)
- {
- old_schema= et->dbname;
- old_name= et->name;
- et->dbname= *new_schema;
- et->name= *new_name;
- }
- /*
- We need to load the event (it's strings but on the object itself)
- on scheduler_root. et_new could be NULL :
- 1. Error occured
- 2. If the replace is DISABLED, we don't load it into the queue.
- */
- if (!(res= db_repository->
- load_named_event(thd, et->dbname, et->name, &et_new)))
- {
- queue_insert_safe(&queue, (byte *) et_new);
- DBUG_PRINT("info", ("Sending COND_new_work"));
- pthread_cond_signal(&cond_vars[COND_new_work]);
- }
- else if (res == OP_DISABLED_EVENT)
- res= OP_OK;
-
- if (new_schema && new_name)
- {
- et->dbname= old_schema;
- et->name= old_name;
- }
- DBUG_PRINT("info", ("res=%d", res));
- UNLOCK_SCHEDULER_DATA();
- /*
- Andrey: Is this comment still truthful ???
-
- We don't move this code above because a potential kill_thread will call
- THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
- which is LOCK_scheduler_data on which the COND_new_work in ::run() locks.
- Hence, we try to acquire a lock which we have already acquired and we run
- into an assert. Holding LOCK_scheduler_data however is not needed because
- we don't touch any invariant of the scheduler anymore. ::drop_event() does
- the same.
- */
- if (et_old)
- {
- switch (et_old->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et_old->flags |= EVENT_FREE_WHEN_FINISHED;
- break;
- case 0:
- /*
- kill_thread() waits till the spawned thread finishes after it's
- killed. Hence, we delete here memory which is no more referenced from
- a running thread.
- */
- delete et_old;
- /*
- We don't signal COND_new_work here because:
- 1. Even if the dropped event is on top of the queue this will not
- move another one to be executed before the time the one on the
- top (but could be at the same second as the dropped one)
- 2. If this was the last event on the queue, then pthread_cond_timedwait
- in ::run() will finish and then see that the queue is empty and
- call cond_wait(). Hence, no need to interrupt the blocked
- ::run() thread.
- */
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
-
- DBUG_RETURN(res);
-}
-
-
-/*
- Searches for an event in the scheduler queue
-
- SYNOPSIS
- Event_scheduler::find_event()
- db The schema of the event to find
- name The event to find
- remove_from_q If found whether to remove from the Q
-
- RETURN VALUE
- NULL Not found
- otherwise Address
-
- NOTE
- The caller should do the locking also the caller is responsible for
- actual signalling in case an event is removed from the queue
- (signalling COND_new_work for instance).
-*/
-
-Event_timed *
-Event_scheduler::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
-{
- uint i;
- DBUG_ENTER("Event_scheduler::find_event");
-
- for (i= 0; i < queue.elements; ++i)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
- et->dbname.str, et->name.str));
- if (event_timed_identifier_equal(db, name, et))
- {
- if (remove_from_q)
- queue_remove(&queue, i);
- DBUG_RETURN(et);
- }
- }
-
- DBUG_RETURN(NULL);
-}
-
-
-/*
- Drops all events from the in-memory queue and disk that match
- certain pattern evaluated by a comparator function
-
- SYNOPSIS
- Event_scheduler::drop_matching_events()
- thd THD
- pattern A pattern string
- comparator The function to use for comparing
-
- RETURN VALUE
- -1 Scheduler not working
- >=0 Number of dropped events
-
- NOTE
- Expected is the caller to acquire lock on LOCK_scheduler_data
-*/
-
-void
-Event_scheduler::drop_matching_events(THD *thd, LEX_STRING pattern,
- bool (*comparator)(Event_timed *,LEX_STRING *))
-{
- DBUG_ENTER("Event_scheduler::drop_matching_events");
- DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str,
- state));
- if (is_running_or_suspended())
- {
- uint i= 0, dropped= 0;
- while (i < queue.elements)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
- if (comparator(et, &pattern))
- {
- /*
- The queue is ordered. If we remove an element, then all elements after
- it will shift one position to the left, if we imagine it as an array
- from left to the right. In this case we should not increment the
- counter and the (i < queue.elements) condition is ok.
- */
- queue_remove(&queue, i);
-
- /* See replace_event() */
- switch (et->kill_thread(thd)) {
- case EVEX_CANT_KILL:
- /* Don't delete but continue */
- et->flags |= EVENT_FREE_WHEN_FINISHED;
- ++dropped;
- break;
- case 0:
- delete et;
- ++dropped;
- break;
- default:
- DBUG_ASSERT(0);
- }
- }
- else
- i++;
- }
- DBUG_PRINT("info", ("Dropped %lu", dropped));
- }
- /*
- Don't send COND_new_work because no need to wake up the scheduler thread.
- When it wakes next time up it will recalculate how much more it should
- sleep if the top of the queue has been changed by this method.
- */
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Drops all events from the in-memory queue and disk that are from
- certain schema.
-
- SYNOPSIS
- Event_scheduler::drop_schema_events()
- thd THD
- db The schema name
-
- RETURN VALUE
- -1 Scheduler not working
- >=0 Number of dropped events
-*/
-
-int
-Event_scheduler::drop_schema_events(THD *thd, LEX_STRING schema)
-{
- int ret;
- DBUG_ENTER("Event_scheduler::drop_schema_events");
- LOCK_SCHEDULER_DATA();
- if (is_running_or_suspended())
- drop_matching_events(thd, schema, event_timed_db_equal);
-
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_RETURN(ret);
-}
-
-
extern pthread_attr_t connection_attrib;
@@ -1205,8 +794,8 @@ Event_scheduler::start()
}
/* Wait till the child thread has booted (w/ or wo success) */
- while (!is_running_or_suspended() && state != CANTSTART)
- cond_wait(COND_started_or_stopped, &LOCK_scheduler_data);
+ while (!(state == SUSPENDED || state == RUNNING) && state != CANTSTART)
+ cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
/*
If we cannot start for some reason then don't prohibit further attempts.
@@ -1314,7 +903,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Manager thread started with id %lu",
thd->thread_id);
abstime.tv_nsec= 0;
- while (is_running_or_suspended())
+ while ((state == SUSPENDED || state == RUNNING))
{
Event_timed *et;
@@ -1374,9 +963,9 @@ Event_scheduler::run(THD *thd)
pthread_cond_timedwait() will wait till `abstime`.
"Sleeping until next time"
*/
- thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping");
+ thd->enter_cond(&cond_vars[COND_new_work],LOCK_scheduler_data,"Sleeping");
- pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data,
+ pthread_cond_timedwait(&cond_vars[COND_new_work], LOCK_scheduler_data,
&abstime);
DBUG_PRINT("info", ("Manager woke up. state is %d", state));
@@ -1397,7 +986,7 @@ Event_scheduler::run(THD *thd)
In this case stop the manager.
We should enter ::execute_top() with locked LOCK_scheduler_data.
*/
- int ret= execute_top(thd);
+ int ret= execute_top(thd, et);
UNLOCK_SCHEDULER_DATA();
if (ret)
break;
@@ -1421,7 +1010,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Shutting down");
thd->proc_info= (char *)"Cleaning queue";
- clean_queue(thd);
+ clean_memory(thd);
THD_CHECK_SENTRY(thd);
/* free mamager_root memory but don't destroy the root */
@@ -1474,15 +1063,13 @@ Event_scheduler::run(THD *thd)
*/
bool
-Event_scheduler::execute_top(THD *thd)
+Event_scheduler::execute_top(THD *thd, Event_timed *et)
{
int spawn_ret_code;
bool ret= FALSE;
DBUG_ENTER("Event_scheduler::execute_top");
DBUG_PRINT("enter", ("thd=%p", thd));
- Event_timed *et= (Event_timed *)queue_top(&queue);
-
/* Is it good idea to pass a stack address ?*/
Worker_thread_param param(et);
@@ -1552,7 +1139,7 @@ Event_scheduler::execute_top(THD *thd)
*/
void
-Event_scheduler::clean_queue(THD *thd)
+Event_scheduler::clean_memory(THD *thd)
{
CHARSET_INFO *scs= system_charset_info;
uint i;
@@ -1565,14 +1152,7 @@ Event_scheduler::clean_queue(THD *thd)
sql_print_information("SCHEDULER: Emptying the queue");
- /* empty the queue */
- for (i= 0; i < queue.elements; ++i)
- {
- Event_timed *et= (Event_timed *) queue_element(&queue, i);
- et->free_sp();
- delete et;
- }
- resize_queue(&queue, 0);
+ empty_queue();
DBUG_VOID_RETURN;
}
@@ -1681,7 +1261,7 @@ Event_scheduler::stop()
DBUG_PRINT("enter", ("thd=%p", current_thd));
LOCK_SCHEDULER_DATA();
- if (!is_running_or_suspended())
+ if (!(state == SUSPENDED || state == RUNNING))
{
/*
One situation to be here is if there was a start that forked a new
@@ -1717,7 +1297,7 @@ Event_scheduler::stop()
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %d . "
"workers count=%d", state, workers_count()));
- cond_wait(COND_started_or_stopped, &LOCK_scheduler_data);
+ cond_wait(COND_started_or_stopped, LOCK_scheduler_data);
}
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
UNLOCK_SCHEDULER_DATA();
@@ -1768,7 +1348,7 @@ Event_scheduler::suspend_or_resume(
pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
}
DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume"));
- cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data);
+ cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
DBUG_PRINT("info", ("Got response"));
}
UNLOCK_SCHEDULER_DATA();
@@ -1833,7 +1413,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
if (state == SUSPENDED)
{
- thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data,
+ thd->enter_cond(&cond_vars[COND_suspend_or_resume], LOCK_scheduler_data,
"Suspended");
/* Send back signal to the thread that asked us to suspend operations */
pthread_cond_signal(&cond_vars[COND_suspend_or_resume]);
@@ -1842,7 +1422,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
while (state == SUSPENDED)
{
- cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data);
+ cond_wait(COND_suspend_or_resume, LOCK_scheduler_data);
DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume"));
if (state != SUSPENDED)
{
@@ -1852,18 +1432,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
}
if (was_suspended)
{
- if (queue.elements)
- {
- uint i;
- DBUG_PRINT("info", ("We have to recompute the execution times"));
-
- for (i= 0; i < queue.elements; i++)
- {
- ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
- ((Event_timed*)queue_element(&queue, i))->update_fields(thd);
- }
- queue_fix(&queue);
- }
+ recalculate_queue(thd);
/* This will implicitly unlock LOCK_scheduler_data */
thd->exit_cond("");
}
@@ -1892,18 +1461,18 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
bool slept= FALSE;
DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue");
DBUG_PRINT("enter", ("q.elements=%lu state=%s",
- queue.elements, states_names[state]));
+ events_count_no_lock(), states_names[state]));
- if (!queue.elements)
- thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data,
+ if (!events_count_no_lock())
+ thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data,
"Empty queue, sleeping");
/* Wait in a loop protecting against catching spurious signals */
- while (!queue.elements && state == RUNNING)
+ while (!events_count_no_lock() && state == RUNNING)
{
slept= TRUE;
DBUG_PRINT("info", ("Entering condition because of empty queue"));
- cond_wait(COND_new_work, &LOCK_scheduler_data);
+ cond_wait(COND_new_work, LOCK_scheduler_data);
DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d",
state));
/*
@@ -1916,105 +1485,13 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
thd->exit_cond("");
DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d",
- queue.elements, states_names[state], thd->killed));
+ events_count_no_lock(), states_names[state], thd->killed));
DBUG_RETURN(slept);
}
/*
- Wrapper for pthread_mutex_lock
-
- SYNOPSIS
- Event_scheduler::lock_data()
- mutex Mutex to lock
- line The line number on which the lock is done
-
- RETURN VALUE
- Error code of pthread_mutex_lock()
-*/
-
-inline void
-Event_scheduler::lock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::lock_mutex");
- DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
- &LOCK_scheduler_data, func, line));
- pthread_mutex_lock(&LOCK_scheduler_data);
- mutex_last_locked_in_func= func;
- mutex_last_locked_at_line= line;
- mutex_scheduler_data_locked= TRUE;
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_mutex_unlock
-
- SYNOPSIS
- Event_scheduler::unlock_data()
- mutex Mutex to unlock
- line The line number on which the unlock is done
-*/
-
-inline void
-Event_scheduler::unlock_data(const char *func, uint line)
-{
- DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
- DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
- &LOCK_scheduler_data, func, line));
- mutex_last_unlocked_at_line= line;
- mutex_scheduler_data_locked= FALSE;
- mutex_last_unlocked_in_func= func;
- pthread_mutex_unlock(&LOCK_scheduler_data);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Wrapper for pthread_cond_wait
-
- SYNOPSIS
- Event_scheduler::cond_wait()
- cond Conditional to wait for
- mutex Mutex of the conditional
-
- RETURN VALUE
- Error code of pthread_cond_wait()
-*/
-
-inline int
-Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond,
- pthread_mutex_t *mutex)
-{
- int ret;
- DBUG_ENTER("Event_scheduler::cond_wait");
- DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex));
- ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex);
- cond_waiting_on= COND_NONE;
- DBUG_RETURN(ret);
-}
-
-
-/*
- Checks whether the scheduler is in a running or suspended state.
-
- SYNOPSIS
- Event_scheduler::is_running_or_suspended()
-
- RETURN VALUE
- TRUE Either running or suspended
- FALSE IN_SHUTDOWN, not started, etc.
-*/
-
-inline bool
-Event_scheduler::is_running_or_suspended()
-{
- return (state == SUSPENDED || state == RUNNING);
-}
-
-
-/*
Returns the current state of the scheduler
SYNOPSIS
@@ -2027,9 +1504,9 @@ Event_scheduler::get_state()
enum Event_scheduler::enum_state ret;
DBUG_ENTER("Event_scheduler::get_state");
/* lock_data & unlock_data are not static */
- pthread_mutex_lock(&singleton.LOCK_scheduler_data);
- ret= singleton.state;
- pthread_mutex_unlock(&singleton.LOCK_scheduler_data);
+ pthread_mutex_lock(singleton->LOCK_scheduler_data);
+ ret= singleton->state;
+ pthread_mutex_unlock(singleton->LOCK_scheduler_data);
DBUG_RETURN(ret);
}
@@ -2053,252 +1530,6 @@ Event_scheduler::initialized()
}
-/*
- Returns the number of elements in the queue
-
- SYNOPSIS
- Event_scheduler::events_count()
-
- RETURN VALUE
- 0 Number of Event_timed objects in the queue
-*/
-
-uint
-Event_scheduler::events_count()
-{
- uint n;
- DBUG_ENTER("Event_scheduler::events_count");
- LOCK_SCHEDULER_DATA();
- n= queue.elements;
- UNLOCK_SCHEDULER_DATA();
-
- DBUG_RETURN(n);
-}
-
-
-
-
-/*
- Loads all ENABLED events from mysql.event into the prioritized
- queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_timed instances for every ENABLED event
- from mysql.event.
-
- SYNOPSIS
- Event_scheduler::load_events_from_db()
- thd - Thread context. Used for memory allocation in some cases.
-
- RETURN VALUE
- 0 OK
- !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
- EVEX_COMPILE_ERROR) - in all these cases mysql.event was
- tampered.
-
- NOTES
- Reports the error to the console
-*/
-
-int
-Event_scheduler::load_events_from_db(THD *thd)
-{
- TABLE *table;
- READ_RECORD read_record_info;
- int ret= -1;
- uint count= 0;
- bool clean_the_queue= FALSE;
- /* Compile the events on this root but only for syntax check, then discard */
- MEM_ROOT boot_root;
-
- DBUG_ENTER("Event_scheduler::load_events_from_db");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- if (state > COMMENCING)
- {
- DBUG_ASSERT(0);
- sql_print_error("SCHEDULER: Trying to load events while already running.");
- DBUG_RETURN(EVEX_GENERAL_ERROR);
- }
-
- if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
- {
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
- DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
- }
-
- init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
- init_read_record(&read_record_info, thd, table ,NULL,1,0);
- while (!(read_record_info.read_record(&read_record_info)))
- {
- Event_timed *et;
- if (!(et= new Event_timed))
- {
- DBUG_PRINT("info", ("Out of memory"));
- clean_the_queue= TRUE;
- break;
- }
- DBUG_PRINT("info", ("Loading event from row."));
-
- if ((ret= et->load_from_row(&scheduler_root, table)))
- {
- clean_the_queue= TRUE;
- sql_print_error("SCHEDULER: Error while loading from mysql.event. "
- "Table probably corrupted");
- break;
- }
- if (et->status != Event_timed::ENABLED)
- {
- DBUG_PRINT("info",("%s is disabled",et->name.str));
- delete et;
- continue;
- }
-
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- /* We load only on scheduler root just to check whether the body compiles */
- switch (ret= et->compile(thd, &boot_root)) {
- case EVEX_MICROSECOND_UNSUP:
- et->free_sp();
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- goto end;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
- et->dbname.str, et->name.str);
- goto end;
- default:
- /* Free it, it will be compiled again on the worker thread */
- et->free_sp();
- break;
- }
-
- /* let's find when to be executed */
- if (et->compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
- continue;
- }
-
- DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list."));
- queue_insert_safe(&queue, (byte *) et);
- count++;
- }
-end:
- end_read_record(&read_record_info);
- free_root(&boot_root, MYF(0));
-
- if (clean_the_queue)
- {
- for (count= 0; count < queue.elements; ++count)
- queue_remove(&queue, 0);
- ret= -1;
- }
- else
- {
- ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
- }
-
- /* Force close to free memory */
- thd->version--;
-
- close_thread_tables(thd);
-
- DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
- DBUG_RETURN(ret);
-}
-
-
-/*
- Opens mysql.db and mysql.user and checks whether:
- 1. mysql.db has column Event_priv at column 20 (0 based);
- 2. mysql.user has column Event_priv at column 29 (0 based);
-
- SYNOPSIS
- Event_scheduler::check_system_tables()
-*/
-
-bool
-Event_scheduler::check_system_tables(THD *thd)
-{
- TABLE_LIST tables;
- bool not_used;
- Open_tables_state backup;
- bool ret;
-
- DBUG_ENTER("Event_scheduler::check_system_tables");
- DBUG_PRINT("enter", ("thd=%p", thd));
-
- thd->reset_n_backup_open_tables_state(&backup);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "db";
- tables.lock_type= TL_READ;
-
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
- sql_print_error("Cannot open mysql.db");
- else
- {
- ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
- mysql_db_table_fields, &mysql_db_table_last_check,
- ER_CANNOT_LOAD_FROM_TABLE);
- close_thread_tables(thd);
- }
- if (ret)
- DBUG_RETURN(TRUE);
-
- bzero((char*) &tables, sizeof(tables));
- tables.db= (char*) "mysql";
- tables.table_name= tables.alias= (char*) "user";
- tables.lock_type= TL_READ;
-
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
- sql_print_error("Cannot open mysql.db");
- else
- {
- if (tables.table->s->fields < 29 ||
- strncmp(tables.table->field[29]->field_name,
- STRING_WITH_LEN("Event_priv")))
- {
- sql_print_error("mysql.user has no `Event_priv` column at position 29");
- ret= TRUE;
- }
- close_thread_tables(thd);
- }
-
- thd->restore_backup_open_tables_state(&backup);
-
- DBUG_RETURN(ret);
-}
-
-
-/*
- Inits mutexes.
-
- SYNOPSIS
- Event_scheduler::init_mutexes()
-*/
-
-void
-Event_scheduler::init_mutexes()
-{
- pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST);
-}
-
-
-/*
- Destroys mutexes.
-
- SYNOPSIS
- Event_scheduler::destroy_mutexes()
-*/
-
-void
-Event_scheduler::destroy_mutexes()
-{
- pthread_mutex_destroy(&singleton.LOCK_scheduler_data);
-}
/*
@@ -2337,8 +1568,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("state"), scs);
- protocol->store(states_names[singleton.state].str,
- states_names[singleton.state].length,
+ protocol->store(states_names[singleton->state].str,
+ states_names[singleton->state].length,
scs);
ret= protocol->write();
@@ -2346,7 +1577,7 @@ Event_scheduler::dump_internal_status(THD *thd)
If not initialized - don't show anything else. get_instance()
will otherwise implicitly initialize it. We don't want that.
*/
- if (singleton.state >= INITIALIZED)
+ if (singleton->state >= INITIALIZED)
{
/* last locked at*/
/*
@@ -2357,8 +1588,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->store(STRING_WITH_LEN("last locked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
- singleton.mutex_last_locked_in_func,
- singleton.mutex_last_locked_at_line));
+ singleton->mutex_last_locked_in_func,
+ singleton->mutex_last_locked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2367,8 +1598,8 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol->store(STRING_WITH_LEN("last unlocked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
- singleton.mutex_last_unlocked_in_func,
- singleton.mutex_last_unlocked_at_line));
+ singleton->mutex_last_unlocked_in_func,
+ singleton->mutex_last_unlocked_at_line));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2378,8 +1609,8 @@ Event_scheduler::dump_internal_status(THD *thd)
tmp_string.length(scs->cset->
snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s",
- (singleton.cond_waiting_on != COND_NONE) ?
- cond_vars_names[singleton.cond_waiting_on]:
+ (singleton->cond_waiting_on != COND_NONE) ?
+ cond_vars_names[singleton->cond_waiting_on]:
"NONE"));
protocol->store(&tmp_string);
ret= protocol->write();
@@ -2396,7 +1627,7 @@ Event_scheduler::dump_internal_status(THD *thd)
/* queue.elements */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue.elements"), scs);
- int_string.set((longlong) scheduler->queue.elements, scs);
+ int_string.set((longlong) scheduler->events_count_no_lock(), scs);
protocol->store(&int_string);
ret= protocol->write();
@@ -2411,3 +1642,94 @@ Event_scheduler::dump_internal_status(THD *thd)
#endif
DBUG_RETURN(0);
}
+
+
+/*
+ Wrapper for pthread_mutex_lock
+
+ SYNOPSIS
+ Event_scheduler::lock_data()
+ mutex Mutex to lock
+ line The line number on which the lock is done
+
+ RETURN VALUE
+ Error code of pthread_mutex_lock()
+*/
+
+void
+Event_scheduler::lock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::lock_mutex");
+ DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
+ &LOCK_scheduler_data, func, line));
+ pthread_mutex_lock(LOCK_scheduler_data);
+ mutex_last_locked_in_func_name= func;
+ mutex_last_locked_at_line_nr= line;
+ mutex_scheduler_data_locked= TRUE;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_mutex_unlock
+
+ SYNOPSIS
+ Event_scheduler::unlock_data()
+ mutex Mutex to unlock
+ line The line number on which the unlock is done
+*/
+
+void
+Event_scheduler::unlock_data(const char *func, uint line)
+{
+ DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
+ DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
+ LOCK_scheduler_data, func, line));
+ mutex_last_unlocked_at_line_nr= line;
+ mutex_scheduler_data_locked= FALSE;
+ mutex_last_unlocked_in_func_name= func;
+ pthread_mutex_unlock(LOCK_scheduler_data);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wrapper for pthread_cond_wait
+
+ SYNOPSIS
+ Event_scheduler::cond_wait()
+ cond Conditional to wait for
+ mutex Mutex of the conditional
+
+ RETURN VALUE
+ Error code of pthread_cond_wait()
+*/
+
+int
+Event_scheduler::cond_wait(int cond, pthread_mutex_t *mutex)
+{
+ int ret;
+ DBUG_ENTER("Event_scheduler::cond_wait");
+ DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex));
+ ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex);
+ cond_waiting_on= COND_NONE;
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Signals the main scheduler thread that the queue has changed
+ its state.
+
+ SYNOPSIS
+ Event_scheduler::queue_changed()
+*/
+
+void
+Event_scheduler::queue_changed()
+{
+ DBUG_ENTER("Event_scheduler::queue_changed");
+ DBUG_PRINT("info", ("Sending COND_new_work"));
+ pthread_cond_signal(&cond_vars[COND_new_work]);
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index a274636b38b..b4007d88976 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -21,7 +21,6 @@ class Event_timed;
class Event_db_repository;
class THD;
-typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
int
events_init();
@@ -29,10 +28,12 @@ events_init();
void
events_shutdown();
-class Event_scheduler
+#include "event_queue.h"
+#include "event_scheduler.h"
+
+class Event_scheduler : public Event_queue
{
public:
-
enum enum_state
{
UNINITIALIZED= 0,
@@ -50,32 +51,22 @@ public:
RESUME= 2
};
- /* Singleton access */
- static Event_scheduler*
- get_instance();
-
- /* Methods for queue management follow */
+ /* This is the current status of the life-cycle of the scheduler. */
+ enum enum_state state;
- int
- create_event(THD *thd, Event_parse_data *et, bool check_existence);
-
- int
- update_event(THD *thd, Event_parse_data *et, LEX_STRING *new_schema,
- LEX_STRING *new_name);
-
- bool
- drop_event(THD *thd, sp_name *name);
+ static void
+ create_instance();
- int
- drop_schema_events(THD *thd, LEX_STRING schema);
+ /* Singleton access */
+ static Event_scheduler*
+ get_instance();
- int
- drop_user_events(THD *thd, LEX_STRING *definer)
- { DBUG_ASSERT(0); return 0;}
+ bool
+ init(Event_db_repository *db_repo);
- uint
- events_count();
+ void
+ destroy();
/* State changing methods follow */
@@ -97,19 +88,13 @@ public:
int
suspend_or_resume(enum enum_suspend_or_resume action);
-
- bool
- init(Event_db_repository *db_repo);
-
- void
- destroy();
-
+/*
static void
init_mutexes();
static void
destroy_mutexes();
-
+*/
void
report_error_during_start();
@@ -124,35 +109,34 @@ public:
static int
dump_internal_status(THD *thd);
- static bool
- check_system_tables(THD *thd);
+ /* helper functions for working with mutexes & conditionals */
+ void
+ lock_data(const char *func, uint line);
-private:
- Event_timed *
- find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
+ void
+ unlock_data(const char *func, uint line);
+
+ int
+ cond_wait(int cond, pthread_mutex_t *mutex);
+
+ void
+ queue_changed();
+
+protected:
uint
workers_count();
- bool
- is_running_or_suspended();
-
/* helper functions */
bool
- execute_top(THD *thd);
+ execute_top(THD *thd, Event_timed *et);
void
- clean_queue(THD *thd);
+ clean_memory(THD *thd);
void
stop_all_running_events(THD *thd);
- int
- load_events_from_db(THD *thd);
-
- void
- drop_matching_events(THD *thd, LEX_STRING pattern,
- bool (*)(Event_timed *,LEX_STRING *));
bool
check_n_suspend_if_needed(THD *thd);
@@ -163,48 +147,14 @@ private:
/* Singleton DP is used */
Event_scheduler();
- enum enum_cond_vars
- {
- COND_NONE= -1,
- /*
- COND_new_work is a conditional used to signal that there is a change
- of the queue that should inform the executor thread that new event should
- be executed sooner than previously expected, because of add/replace event.
- */
- COND_new_work= 0,
- /*
- COND_started is a conditional used to synchronize the thread in which
- ::start() was called and the spawned thread. ::start() spawns a new thread
- and then waits on COND_started but also checks when awaken that `state` is
- either RUNNING or CANTSTART. Then it returns back.
- */
- COND_started_or_stopped,
- /*
- Conditional used for signalling from the scheduler thread back to the
- thread that calls ::suspend() or ::resume. Synchronizing the calls.
- */
- COND_suspend_or_resume,
- /* Must be always last */
- COND_LAST
- };
- /* Singleton instance */
- static Event_scheduler singleton;
+ pthread_mutex_t *LOCK_scheduler_data;
- /* This is the current status of the life-cycle of the manager. */
- enum enum_state state;
/* Set to start the scheduler in suspended state */
bool start_scheduler_suspended;
/*
- LOCK_scheduler_data is the mutex which protects the access to the
- manager's queue as well as used when signalling COND_new_work,
- COND_started and COND_shutdown.
- */
- pthread_mutex_t LOCK_scheduler_data;
-
- /*
Holds the thread id of the executor thread or 0 if the executor is not
running. It is used by ::shutdown() to know which thread to kill with
kill_one_thread(). The latter wake ups a thread if it is waiting on a
@@ -212,33 +162,27 @@ private:
*/
ulong thread_id;
- pthread_cond_t cond_vars[COND_LAST];
- static const char * const cond_vars_names[COND_LAST];
-
- /* The MEM_ROOT of the object */
- MEM_ROOT scheduler_root;
-
- Event_db_repository *db_repository;
+ enum enum_cond_vars
+ {
+ COND_NONE= -1,
+ COND_new_work= 0,
+ COND_started_or_stopped,
+ COND_suspend_or_resume,
+ /* Must be always last */
+ COND_LAST
+ };
- /* The sorted queue with the Event_timed objects */
- QUEUE queue;
-
- uint mutex_last_locked_at_line;
- uint mutex_last_unlocked_at_line;
- const char* mutex_last_locked_in_func;
- const char* mutex_last_unlocked_in_func;
- enum enum_cond_vars cond_waiting_on;
+ uint mutex_last_locked_at_line_nr;
+ uint mutex_last_unlocked_at_line_nr;
+ const char* mutex_last_locked_in_func_name;
+ const char* mutex_last_unlocked_in_func_name;
+ int cond_waiting_on;
bool mutex_scheduler_data_locked;
- /* helper functions for working with mutexes & conditionals */
- void
- lock_data(const char *func, uint line);
- void
- unlock_data(const char *func, uint line);
+ static const char * const cond_vars_names[COND_LAST];
- int
- cond_wait(enum enum_cond_vars, pthread_mutex_t *mutex);
+ pthread_cond_t cond_vars[COND_LAST];
private:
/* Prevent use of these */
diff --git a/sql/events.cc b/sql/events.cc
index 28c57d6b493..09d5ee21a4f 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -559,6 +559,7 @@ void
Events::init_mutexes()
{
db_repository= new Event_db_repository;
+ Event_scheduler::create_instance();
Event_scheduler::init_mutexes();
}