summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc83
1 files changed, 23 insertions, 60 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index c959d512764..ea5a2deaeb3 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -81,6 +81,7 @@ static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0;
static int ndbcluster_binlog_inited= 0;
+static int ndbcluster_binlog_terminating= 0;
/*
Mutex and condition used for interacting between client sql thread
@@ -582,61 +583,18 @@ static int ndbcluster_binlog_end(THD *thd)
#ifdef HAVE_NDB_BINLOG
/* wait for injector thread to finish */
+ ndbcluster_binlog_terminating= 1;
+ pthread_cond_signal(&injector_cond);
pthread_mutex_lock(&injector_mutex);
- if (ndb_binlog_thread_running > 0)
- {
- pthread_cond_signal(&injector_cond);
- pthread_mutex_unlock(&injector_mutex);
-
- pthread_mutex_lock(&injector_mutex);
- while (ndb_binlog_thread_running > 0)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
- }
- }
+ while (ndb_binlog_thread_running > 0)
+ pthread_cond_wait(&injector_cond, &injector_mutex);
pthread_mutex_unlock(&injector_mutex);
-
- /* remove all shares */
- {
- pthread_mutex_lock(&ndbcluster_mutex);
- for (uint i= 0; i < ndbcluster_open_tables.records; i++)
- {
- NDB_SHARE *share=
- (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
- if (share->table)
- DBUG_PRINT("share",
- ("table->s->db.table_name: %s.%s",
- share->table->s->db.str, share->table->s->table_name.str));
- /* ndb_share reference create free */
- if (share->state != NSS_DROPPED)
- {
- DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
- share->key, share->use_count));
- --share->use_count;
- }
- if (share->use_count == 0)
- ndbcluster_real_free_share(&share);
- else
- {
- DBUG_PRINT("share",
- ("[%d] 0x%lx key: %s key_length: %d",
- i, (long) share, share->key, share->key_length));
- DBUG_PRINT("share",
- ("db.tablename: %s.%s use_count: %d commit_count: %lu",
- share->db, share->table_name,
- share->use_count, (long) share->commit_count));
- }
- }
- pthread_mutex_unlock(&ndbcluster_mutex);
- }
-
pthread_mutex_destroy(&injector_mutex);
pthread_cond_destroy(&injector_cond);
pthread_mutex_destroy(&ndb_schema_share_mutex);
#endif
+
DBUG_RETURN(0);
}
@@ -2385,7 +2343,6 @@ int ndbcluster_binlog_start()
if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1);
-
DBUG_RETURN(0);
}
@@ -3562,6 +3519,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
s_ndb->init())
{
sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
+ ndb_binlog_thread_running= -1;
+ pthread_mutex_unlock(&injector_mutex);
+ pthread_cond_signal(&injector_cond);
goto err;
}
@@ -3592,7 +3552,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
p_latest_trans_gci=
injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
schema_ndb= s_ndb;
- ndb_binlog_thread_running= 1;
+
if (opt_bin_log)
{
if (global_system_variables.binlog_format == BINLOG_FORMAT_ROW ||
@@ -3605,10 +3565,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
sql_print_error("NDB: only row based binary logging is supported");
}
}
- /*
- We signal the thread that started us that we've finished
- starting up.
- */
+
+ /* Thread start up completed */
+ ndb_binlog_thread_running= 1;
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
@@ -3627,7 +3586,7 @@ restart:
struct timespec abstime;
set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
- if (abort_loop)
+ if (ndbcluster_binlog_terminating)
{
pthread_mutex_unlock(&injector_mutex);
goto err;
@@ -3652,13 +3611,15 @@ restart:
{
// wait for the first event
thd->proc_info= "Waiting for first event from ndbcluster";
- DBUG_PRINT("info", ("Waiting for the first event"));
int schema_res, res;
Uint64 schema_gci;
do
{
- if (abort_loop)
+ DBUG_PRINT("info", ("Waiting for the first event"));
+
+ if (ndbcluster_binlog_terminating)
goto err;
+
schema_res= s_ndb->pollEvents(100, &schema_gci);
} while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
if (ndb_binlog_running)
@@ -3666,7 +3627,7 @@ restart:
Uint64 gci= i_ndb->getLatestGCI();
while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
{
- if (abort_loop)
+ if (ndbcluster_binlog_terminating)
goto err;
res= i_ndb->pollEvents(10, &gci);
}
@@ -3713,7 +3674,8 @@ restart:
thd->db= db;
}
do_ndbcluster_binlog_close_connection= BCCC_running;
- for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
+ for ( ; !((ndbcluster_binlog_terminating ||
+ do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
do_ndbcluster_binlog_close_connection != BCCC_restart; )
{
@@ -3760,7 +3722,8 @@ restart:
schema_res= s_ndb->pollEvents(10, &schema_gci);
}
- if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
+ if ((ndbcluster_binlog_terminating ||
+ do_ndbcluster_binlog_close_connection) &&
(ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
!ndb_binlog_running))
break; /* Shutting down server */