summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc82
1 files changed, 25 insertions, 57 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index e189f7df588..38b640d5f55 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -80,6 +80,8 @@ THD *injector_thd= 0;
static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0;
+static int ndbcluster_binlog_inited= 0;
+
/*
Mutex and condition used for interacting between client sql thread
and injector thread
@@ -557,29 +559,28 @@ ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binl
DBUG_VOID_RETURN;
}
+
/*
- End use of the NDB Cluster table handler
- - free all global variables allocated by
- ndbcluster_init()
+ End use of the NDB Cluster binlog
+ - wait for binlog thread to shutdown
*/
static int ndbcluster_binlog_end(THD *thd)
{
- DBUG_ENTER("ndb_binlog_end");
+ DBUG_ENTER("ndbcluster_binlog_end");
- if (!ndbcluster_util_inited)
+ if (!ndbcluster_binlog_inited)
DBUG_RETURN(0);
-
- // Kill ndb utility thread
- (void) pthread_mutex_lock(&LOCK_ndb_util_thread);
- DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
- (void) pthread_cond_signal(&COND_ndb_util_thread);
- (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ ndbcluster_binlog_inited= 0;
#ifdef HAVE_NDB_BINLOG
/* wait for injector thread to finish */
+ pthread_mutex_lock(&injector_mutex);
if (ndb_binlog_thread_running > 0)
{
+ pthread_cond_signal(&injector_cond);
+ pthread_mutex_unlock(&injector_mutex);
+
pthread_mutex_lock(&injector_mutex);
while (ndb_binlog_thread_running > 0)
{
@@ -587,8 +588,9 @@ static int ndbcluster_binlog_end(THD *thd)
set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
}
- pthread_mutex_unlock(&injector_mutex);
}
+ pthread_mutex_unlock(&injector_mutex);
+
/* remove all shares */
{
@@ -616,8 +618,10 @@ static int ndbcluster_binlog_end(THD *thd)
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
+
+ pthread_mutex_destroy(&injector_mutex);
+ pthread_cond_destroy(&injector_cond);
#endif
- ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
@@ -2285,31 +2289,21 @@ int ndbcluster_binlog_start()
DBUG_RETURN(-1);
}
- /*
- Wait for the ndb injector thread to finish starting up.
- */
+ ndbcluster_binlog_inited= 1;
+
+ /* Wait for the injector thread to start */
pthread_mutex_lock(&injector_mutex);
while (!ndb_binlog_thread_running)
pthread_cond_wait(&injector_cond, &injector_mutex);
pthread_mutex_unlock(&injector_mutex);
-
+
if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1);
+
DBUG_RETURN(0);
}
-static void ndbcluster_binlog_close_connection(THD *thd)
-{
- DBUG_ENTER("ndbcluster_binlog_close_connection");
- const char *save_info= thd->proc_info;
- thd->proc_info= "ndbcluster_binlog_close_connection";
- do_ndbcluster_binlog_close_connection= BCCC_exit;
- while (ndb_binlog_thread_running > 0)
- sleep(1);
- thd->proc_info= save_info;
- DBUG_VOID_RETURN;
-}
/**************************************************************
Internal helper functions for creating/dropping ndb events
@@ -3541,7 +3535,7 @@ restart:
if (abort_loop)
goto err;
schema_res= s_ndb->pollEvents(100, &schema_gci);
- } while (ndb_latest_received_binlog_epoch == schema_gci);
+ } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
if (ndb_binlog_running)
{
Uint64 gci= i_ndb->getLatestGCI();
@@ -3952,15 +3946,12 @@ restart:
goto restart;
}
err:
+ sql_print_information("Stopping Cluster Binlog");
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
thd->proc_info= "Shutting down";
close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
- uint ndb_obj_cnt= 1; // g_ndb
- ndb_obj_cnt+= injector_ndb == 0 ? 0 : 1;
- ndb_obj_cnt+= schema_ndb == 0 ? 0 : 1;
- ndb_obj_cnt+= ndbcluster_util_inited ? 1 : 0;
injector_thd= 0;
injector_ndb= 0;
p_latest_trans_gci= 0;
@@ -3968,29 +3959,6 @@ err:
pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory
- if (!ndb_extra_logging)
- sql_print_information("Stopping Cluster Binlog");
- else
- sql_print_information("Stopping Cluster Binlog: %u(%u)",
- g_ndb_cluster_connection->get_active_ndb_objects(),
- ndb_obj_cnt);
-
- /**
- * Add extra wait loop to make user "user" ndb-object go away...
- * otherwise user thread can have ongoing SUB_DATA
- */
- int sleep_cnt= 0;
- while (sleep_cnt < 300 &&
- g_ndb_cluster_connection->get_active_ndb_objects() > ndb_obj_cnt)
- {
- my_sleep(10000); // 10ms
- sleep_cnt++;
- }
- if (ndb_extra_logging)
- sql_print_information("Stopping Cluster Binlog: waited %ums %u(%u)",
- 10*sleep_cnt, g_ndb_cluster_connection->get_active_ndb_objects(),
- ndb_obj_cnt);
-
if (ndb_apply_status_share)
{
free_share(&ndb_apply_status_share);
@@ -4046,8 +4014,8 @@ err:
hash_free(&ndb_schema_objects);
- // Placed here to avoid a memory leak; TODO: check if needed
net_end(&thd->net);
+ thd->cleanup();
delete thd;
ndb_binlog_thread_running= -1;