diff options
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 83 |
1 files changed, 23 insertions, 60 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index c959d512764..ea5a2deaeb3 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -81,6 +81,7 @@ static Ndb *injector_ndb= 0; static Ndb *schema_ndb= 0; static int ndbcluster_binlog_inited= 0; +static int ndbcluster_binlog_terminating= 0; /* Mutex and condition used for interacting between client sql thread @@ -582,61 +583,18 @@ static int ndbcluster_binlog_end(THD *thd) #ifdef HAVE_NDB_BINLOG /* wait for injector thread to finish */ + ndbcluster_binlog_terminating= 1; + pthread_cond_signal(&injector_cond); pthread_mutex_lock(&injector_mutex); - if (ndb_binlog_thread_running > 0) - { - pthread_cond_signal(&injector_cond); - pthread_mutex_unlock(&injector_mutex); - - pthread_mutex_lock(&injector_mutex); - while (ndb_binlog_thread_running > 0) - { - struct timespec abstime; - set_timespec(abstime, 1); - pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); - } - } + while (ndb_binlog_thread_running > 0) + pthread_cond_wait(&injector_cond, &injector_mutex); pthread_mutex_unlock(&injector_mutex); - - /* remove all shares */ - { - pthread_mutex_lock(&ndbcluster_mutex); - for (uint i= 0; i < ndbcluster_open_tables.records; i++) - { - NDB_SHARE *share= - (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i); - if (share->table) - DBUG_PRINT("share", - ("table->s->db.table_name: %s.%s", - share->table->s->db.str, share->table->s->table_name.str)); - /* ndb_share reference create free */ - if (share->state != NSS_DROPPED) - { - DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u", - share->key, share->use_count)); - --share->use_count; - } - if (share->use_count == 0) - ndbcluster_real_free_share(&share); - else - { - DBUG_PRINT("share", - ("[%d] 0x%lx key: %s key_length: %d", - i, (long) share, share->key, share->key_length)); - DBUG_PRINT("share", - ("db.tablename: %s.%s use_count: %d commit_count: %lu", - share->db, share->table_name, - share->use_count, (long) share->commit_count)); - } - } - pthread_mutex_unlock(&ndbcluster_mutex); - } - pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); #endif + DBUG_RETURN(0); } @@ -2385,7 +2343,6 @@ int ndbcluster_binlog_start() if (ndb_binlog_thread_running < 0) DBUG_RETURN(-1); - DBUG_RETURN(0); } @@ -3562,6 +3519,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) s_ndb->init()) { sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); + ndb_binlog_thread_running= -1; + pthread_mutex_unlock(&injector_mutex); + pthread_cond_signal(&injector_cond); goto err; } @@ -3592,7 +3552,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) p_latest_trans_gci= injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci(); schema_ndb= s_ndb; - ndb_binlog_thread_running= 1; + if (opt_bin_log) { if (global_system_variables.binlog_format == BINLOG_FORMAT_ROW || @@ -3605,10 +3565,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) sql_print_error("NDB: only row based binary logging is supported"); } } - /* - We signal the thread that started us that we've finished - starting up. - */ + + /* Thread start up completed */ + ndb_binlog_thread_running= 1; pthread_mutex_unlock(&injector_mutex); pthread_cond_signal(&injector_cond); @@ -3627,7 +3586,7 @@ restart: struct timespec abstime; set_timespec(abstime, 1); pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); - if (abort_loop) + if (ndbcluster_binlog_terminating) { pthread_mutex_unlock(&injector_mutex); goto err; @@ -3652,13 +3611,15 @@ restart: { // wait for the first event thd->proc_info= "Waiting for first event from ndbcluster"; - DBUG_PRINT("info", ("Waiting for the first event")); int schema_res, res; Uint64 schema_gci; do { - if (abort_loop) + DBUG_PRINT("info", ("Waiting for the first event")); + + if (ndbcluster_binlog_terminating) goto err; + schema_res= s_ndb->pollEvents(100, &schema_gci); } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci); if (ndb_binlog_running) @@ -3666,7 +3627,7 @@ restart: Uint64 gci= i_ndb->getLatestGCI(); while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch) { - if (abort_loop) + if (ndbcluster_binlog_terminating) goto err; res= i_ndb->pollEvents(10, &gci); } @@ -3713,7 +3674,8 @@ restart: thd->db= db; } do_ndbcluster_binlog_close_connection= BCCC_running; - for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && + for ( ; !((ndbcluster_binlog_terminating || + do_ndbcluster_binlog_close_connection) && ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) && do_ndbcluster_binlog_close_connection != BCCC_restart; ) { @@ -3760,7 +3722,8 @@ restart: schema_res= s_ndb->pollEvents(10, &schema_gci); } - if ((abort_loop || do_ndbcluster_binlog_close_connection) && + if ((ndbcluster_binlog_terminating || + do_ndbcluster_binlog_close_connection) && (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci || !ndb_binlog_running)) break; /* Shutting down server */ |