diff options
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 125 |
1 files changed, 98 insertions, 27 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 564e95bac1c..324c15e77a4 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -41,6 +41,11 @@ 0 if never started */ int ndb_binlog_thread_running= 0; +/* + Flag showing if the ndb binlog should be created, if so == TRUE + FALSE if not +*/ +my_bool ndb_binlog_running= FALSE; /* Global reference to the ndb injector thread THD oject @@ -237,7 +242,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) share->op= 0; share->table= 0; - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) { if (_table) { @@ -349,7 +354,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) */ static void ndbcluster_binlog_wait(THD *thd) { - if (ndb_binlog_thread_running > 0) + if (ndb_binlog_running) { DBUG_ENTER("ndbcluster_binlog_wait"); const char *save_info= thd ? thd->proc_info : 0; @@ -358,7 +363,7 @@ static void ndbcluster_binlog_wait(THD *thd) if (thd) thd->proc_info= "Waiting for ndbcluster binlog update to " "reach current position"; - while (count && ndb_binlog_thread_running > 0 && + while (count && ndb_binlog_running && ndb_latest_handled_binlog_epoch < wait_epoch) { count--; @@ -375,7 +380,7 @@ static void ndbcluster_binlog_wait(THD *thd) */ static int ndbcluster_reset_logs(THD *thd) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return 0; DBUG_ENTER("ndbcluster_reset_logs"); @@ -402,7 +407,7 @@ static int ndbcluster_reset_logs(THD *thd) static int ndbcluster_binlog_index_purge_file(THD *thd, const char *file) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return 0; DBUG_ENTER("ndbcluster_binlog_index_purge_file"); @@ -427,6 +432,37 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, DBUG_ENTER("ndbcluster_binlog_log_query"); DBUG_PRINT("enter", ("db: %s table_name: %s query: %s", db, table_name, query)); + enum SCHEMA_OP_TYPE type; + int log= 0; + switch (binlog_command) + { + case LOGCOM_CREATE_TABLE: + type= SOT_CREATE_TABLE; + break; + case LOGCOM_ALTER_TABLE: + type= SOT_ALTER_TABLE; + break; + case LOGCOM_RENAME_TABLE: + type= SOT_RENAME_TABLE; + break; + case LOGCOM_DROP_TABLE: + type= SOT_DROP_TABLE; + break; + case LOGCOM_CREATE_DB: + type= SOT_CREATE_DB; + log= 1; + break; + case LOGCOM_ALTER_DB: + type= SOT_ALTER_DB; + log= 1; + break; + case LOGCOM_DROP_DB: + type= SOT_DROP_DB; + break; + } + if (log) + ndbcluster_log_schema_op(thd, 0, query, query_length, + db, table_name, 0, 0, type); DBUG_VOID_RETURN; } @@ -499,7 +535,7 @@ static int ndbcluster_binlog_end(THD *thd) ****************************************************************/ static void ndbcluster_reset_slave(THD *thd) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return; DBUG_ENTER("ndbcluster_reset_slave"); @@ -835,7 +871,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, enum SCHEMA_OP_TYPE type) { DBUG_ENTER("ndbcluster_log_schema_op"); -#ifdef NOT_YET Thd_ndb *thd_ndb= get_thd_ndb(thd); if (!thd_ndb) { @@ -879,6 +914,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, break; case SOT_ALTER_DB: break; + case SOT_TABLESPACE: + break; + case SOT_LOGFILE_GROUP: + break; default: abort(); /* should not happen, programming error */ } @@ -1070,13 +1109,13 @@ end: sql_print_error("NDB create table: timed out. Ignoring..."); break; } - sql_print_information("NDB create table: " - "waiting max %u sec for create table %s.", - max_timeout, share->key); + if (ndb_extra_logging) + sql_print_information("NDB create table: " + "waiting max %u sec for create table %s.", + max_timeout, share->key); } (void) pthread_mutex_unlock(&share->mutex); } -#endif DBUG_RETURN(0); } @@ -1315,11 +1354,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema_list->push_back(schema, mem_root); log_query= 0; break; - case SOT_CREATE_TABLE: - /* fall through */ case SOT_RENAME_TABLE: /* fall through */ case SOT_ALTER_TABLE: + /* fall through */ + if (!ndb_binlog_running) + { + log_query= 1; + break; /* discovery will be handled by binlog */ + } + /* fall through */ + case SOT_CREATE_TABLE: + /* fall through */ pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) { @@ -1329,12 +1375,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->node_id); } pthread_mutex_unlock(&LOCK_open); - { - /* signal that schema operation has been handled */ - DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); - if (bitmap_is_set(&slock, node_id)) - ndbcluster_update_slock(thd, schema->db, schema->name); - } log_query= 1; break; case SOT_DROP_DB: @@ -1374,14 +1414,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, } DBUG_RETURN(0); } + case SOT_TABLESPACE: + case SOT_LOGFILE_GROUP: + log_query= 1; + break; } + + /* signal that schema operation has been handled */ + if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK) + { + DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); + if (bitmap_is_set(&slock, node_id)) + ndbcluster_update_slock(thd, schema->db, schema->name); + } + if (log_query) { char *thd_db_save= thd->db; thd->db= schema->db; thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, schema->query_length, FALSE, - schema->name[0] == 0); + schema->name[0] == 0 || thd->db[0] == 0); thd->db= thd_db_save; } } @@ -1672,7 +1725,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, "allocating table share for %s failed", key); } - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) { share->flags|= NSF_NO_BINLOG; pthread_mutex_unlock(&ndbcluster_mutex); @@ -2521,7 +2574,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) injector_thd= thd; injector_ndb= ndb; ndb_binlog_thread_running= 1; - + if (opt_bin_log) + { + if (binlog_row_based) + { + ndb_binlog_running= TRUE; + } + else + { + sql_print_error("NDB: only row based binary logging is supported"); + } + } /* We signal the thread that started us that we've finished starting up. @@ -2562,7 +2625,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { static char db[]= ""; thd->db= db; - open_binlog_index(thd, &binlog_tables, &binlog_index); + if (ndb_binlog_running) + open_binlog_index(thd, &binlog_tables, &binlog_index); if (!apply_status_share) { sql_print_error("NDB: Could not get apply status share"); @@ -2590,16 +2654,22 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->set_time(); /* wait for event or 1000 ms */ - Uint64 gci, schema_gci; - int res= ndb->pollEvents(1000, &gci); - int schema_res= schema_ndb->pollEvents(0, &schema_gci); + Uint64 gci= 0, schema_gci; + int res= 0, tot_poll_wait= 1000; + if (ndb_binlog_running) + { + res= ndb->pollEvents(tot_poll_wait, &gci); + tot_poll_wait= 0; + } + int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) schema_res= schema_ndb->pollEvents(10, &schema_gci); if ((abort_loop || do_ndbcluster_binlog_close_connection) && - ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) + (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || + !ndb_binlog_running)) break; /* Shutting down server */ if (binlog_index && binlog_index->s->version < refresh_version) @@ -2810,6 +2880,7 @@ err: delete thd; ndb_binlog_thread_running= -1; + ndb_binlog_running= FALSE; (void) pthread_cond_signal(&injector_cond); DBUG_PRINT("exit", ("ndb_binlog_thread")); |