diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster.cc | 45 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 125 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.h | 6 |
3 files changed, 128 insertions, 48 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index dfddfd622e6..a7579c7b465 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -4382,7 +4382,7 @@ int ha_ndbcluster::create(const char *name, if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0) { /* this is only a serious error if the binlog is on */ - if (share && ndb_binlog_thread_running > 0) + if (share && ndb_binlog_running) { push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_GET_ERRMSG, ER(ER_GET_ERRMSG), @@ -4395,14 +4395,14 @@ int ha_ndbcluster::create(const char *name, sql_print_information("NDB Binlog: CREATE TABLE Event: %s", event_name.c_ptr()); - if (share && ndb_binlog_thread_running > 0 && + if (share && ndb_binlog_running && ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0) { sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations." " Event: %s", name2); /* a warning has been issued to the client */ } - if (share && ndb_binlog_thread_running <= 0) + if (share && !ndb_binlog_running) share->flags|= NSF_NO_BINLOG; ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, @@ -4686,7 +4686,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) } #ifdef HAVE_NDB_BINLOG NDB_SHARE *share= 0; - if (ndb_binlog_thread_running > 0 && + if (ndb_binlog_running && (share= get_share(from, 0, false))) { int r= rename_share(share, to); @@ -5866,18 +5866,8 @@ static bool ndbcluster_init() pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); #ifdef HAVE_NDB_BINLOG /* start the ndb injector thread */ - if (opt_bin_log) - { - if (binlog_row_based) - { - if (ndbcluster_binlog_start()) - goto ndbcluster_init_error; - } - else - { - sql_print_error("NDB: only row based binary logging is supported"); - } - } + if (ndbcluster_binlog_start()) + goto ndbcluster_init_error; #endif /* HAVE_NDB_BINLOG */ pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST); @@ -7440,9 +7430,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ndbcluster_util_inited= 1; #ifdef HAVE_NDB_BINLOG - /* If running, signal injector thread that all is setup */ - if (ndb_binlog_thread_running > 0) - pthread_cond_signal(&injector_cond); + /* Signal injector thread that all is setup */ + pthread_cond_signal(&injector_cond); #endif set_timespec(abstime, 0); @@ -9360,6 +9349,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) { DBUG_ENTER("ha_ndbcluster::alter_tablespace"); + int is_tablespace= 0; Ndb *ndb= check_ndb_in_thd(thd); if (ndb == NULL) { @@ -9398,6 +9388,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) DBUG_PRINT("error", ("createDatafile returned %d", error)); goto ndberror; } + is_tablespace= 1; break; } case (ALTER_TABLESPACE): @@ -9441,6 +9432,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) info->ts_alter_tablespace_type)); DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED); } + is_tablespace= 1; break; } case (CREATE_LOGFILE_GROUP): @@ -9506,6 +9498,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) { goto ndberror; } + is_tablespace= 1; break; } case (DROP_LOGFILE_GROUP): @@ -9531,6 +9524,20 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED); } } + + if (is_tablespace) + ndbcluster_log_schema_op(thd, 0, + thd->query, thd->query_length, + "", info->tablespace_name, + 0, 0, + SOT_TABLESPACE); + else + ndbcluster_log_schema_op(thd, 0, + thd->query, thd->query_length, + "", info->logfile_group_name, + 0, 0, + SOT_LOGFILE_GROUP); + DBUG_RETURN(FALSE); ndberror: diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 564e95bac1c..324c15e77a4 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -41,6 +41,11 @@ 0 if never started */ int ndb_binlog_thread_running= 0; +/* + Flag showing if the ndb binlog should be created, if so == TRUE + FALSE if not +*/ +my_bool ndb_binlog_running= FALSE; /* Global reference to the ndb injector thread THD oject @@ -237,7 +242,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) share->op= 0; share->table= 0; - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) { if (_table) { @@ -349,7 +354,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) */ static void ndbcluster_binlog_wait(THD *thd) { - if (ndb_binlog_thread_running > 0) + if (ndb_binlog_running) { DBUG_ENTER("ndbcluster_binlog_wait"); const char *save_info= thd ? thd->proc_info : 0; @@ -358,7 +363,7 @@ static void ndbcluster_binlog_wait(THD *thd) if (thd) thd->proc_info= "Waiting for ndbcluster binlog update to " "reach current position"; - while (count && ndb_binlog_thread_running > 0 && + while (count && ndb_binlog_running && ndb_latest_handled_binlog_epoch < wait_epoch) { count--; @@ -375,7 +380,7 @@ static void ndbcluster_binlog_wait(THD *thd) */ static int ndbcluster_reset_logs(THD *thd) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return 0; DBUG_ENTER("ndbcluster_reset_logs"); @@ -402,7 +407,7 @@ static int ndbcluster_reset_logs(THD *thd) static int ndbcluster_binlog_index_purge_file(THD *thd, const char *file) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return 0; DBUG_ENTER("ndbcluster_binlog_index_purge_file"); @@ -427,6 +432,37 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, DBUG_ENTER("ndbcluster_binlog_log_query"); DBUG_PRINT("enter", ("db: %s table_name: %s query: %s", db, table_name, query)); + enum SCHEMA_OP_TYPE type; + int log= 0; + switch (binlog_command) + { + case LOGCOM_CREATE_TABLE: + type= SOT_CREATE_TABLE; + break; + case LOGCOM_ALTER_TABLE: + type= SOT_ALTER_TABLE; + break; + case LOGCOM_RENAME_TABLE: + type= SOT_RENAME_TABLE; + break; + case LOGCOM_DROP_TABLE: + type= SOT_DROP_TABLE; + break; + case LOGCOM_CREATE_DB: + type= SOT_CREATE_DB; + log= 1; + break; + case LOGCOM_ALTER_DB: + type= SOT_ALTER_DB; + log= 1; + break; + case LOGCOM_DROP_DB: + type= SOT_DROP_DB; + break; + } + if (log) + ndbcluster_log_schema_op(thd, 0, query, query_length, + db, table_name, 0, 0, type); DBUG_VOID_RETURN; } @@ -499,7 +535,7 @@ static int ndbcluster_binlog_end(THD *thd) ****************************************************************/ static void ndbcluster_reset_slave(THD *thd) { - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) return; DBUG_ENTER("ndbcluster_reset_slave"); @@ -835,7 +871,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, enum SCHEMA_OP_TYPE type) { DBUG_ENTER("ndbcluster_log_schema_op"); -#ifdef NOT_YET Thd_ndb *thd_ndb= get_thd_ndb(thd); if (!thd_ndb) { @@ -879,6 +914,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, break; case SOT_ALTER_DB: break; + case SOT_TABLESPACE: + break; + case SOT_LOGFILE_GROUP: + break; default: abort(); /* should not happen, programming error */ } @@ -1070,13 +1109,13 @@ end: sql_print_error("NDB create table: timed out. Ignoring..."); break; } - sql_print_information("NDB create table: " - "waiting max %u sec for create table %s.", - max_timeout, share->key); + if (ndb_extra_logging) + sql_print_information("NDB create table: " + "waiting max %u sec for create table %s.", + max_timeout, share->key); } (void) pthread_mutex_unlock(&share->mutex); } -#endif DBUG_RETURN(0); } @@ -1315,11 +1354,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema_list->push_back(schema, mem_root); log_query= 0; break; - case SOT_CREATE_TABLE: - /* fall through */ case SOT_RENAME_TABLE: /* fall through */ case SOT_ALTER_TABLE: + /* fall through */ + if (!ndb_binlog_running) + { + log_query= 1; + break; /* discovery will be handled by binlog */ + } + /* fall through */ + case SOT_CREATE_TABLE: + /* fall through */ pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) { @@ -1329,12 +1375,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->node_id); } pthread_mutex_unlock(&LOCK_open); - { - /* signal that schema operation has been handled */ - DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); - if (bitmap_is_set(&slock, node_id)) - ndbcluster_update_slock(thd, schema->db, schema->name); - } log_query= 1; break; case SOT_DROP_DB: @@ -1374,14 +1414,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, } DBUG_RETURN(0); } + case SOT_TABLESPACE: + case SOT_LOGFILE_GROUP: + log_query= 1; + break; } + + /* signal that schema operation has been handled */ + if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK) + { + DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); + if (bitmap_is_set(&slock, node_id)) + ndbcluster_update_slock(thd, schema->db, schema->name); + } + if (log_query) { char *thd_db_save= thd->db; thd->db= schema->db; thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, schema->query_length, FALSE, - schema->name[0] == 0); + schema->name[0] == 0 || thd->db[0] == 0); thd->db= thd_db_save; } } @@ -1672,7 +1725,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, "allocating table share for %s failed", key); } - if (ndb_binlog_thread_running <= 0) + if (!ndb_binlog_running) { share->flags|= NSF_NO_BINLOG; pthread_mutex_unlock(&ndbcluster_mutex); @@ -2521,7 +2574,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) injector_thd= thd; injector_ndb= ndb; ndb_binlog_thread_running= 1; - + if (opt_bin_log) + { + if (binlog_row_based) + { + ndb_binlog_running= TRUE; + } + else + { + sql_print_error("NDB: only row based binary logging is supported"); + } + } /* We signal the thread that started us that we've finished starting up. @@ -2562,7 +2625,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { static char db[]= ""; thd->db= db; - open_binlog_index(thd, &binlog_tables, &binlog_index); + if (ndb_binlog_running) + open_binlog_index(thd, &binlog_tables, &binlog_index); if (!apply_status_share) { sql_print_error("NDB: Could not get apply status share"); @@ -2590,16 +2654,22 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->set_time(); /* wait for event or 1000 ms */ - Uint64 gci, schema_gci; - int res= ndb->pollEvents(1000, &gci); - int schema_res= schema_ndb->pollEvents(0, &schema_gci); + Uint64 gci= 0, schema_gci; + int res= 0, tot_poll_wait= 1000; + if (ndb_binlog_running) + { + res= ndb->pollEvents(tot_poll_wait, &gci); + tot_poll_wait= 0; + } + int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) schema_res= schema_ndb->pollEvents(10, &schema_gci); if ((abort_loop || do_ndbcluster_binlog_close_connection) && - ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) + (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || + !ndb_binlog_running)) break; /* Shutting down server */ if (binlog_index && binlog_index->s->version < refresh_version) @@ -2810,6 +2880,7 @@ err: delete thd; ndb_binlog_thread_running= -1; + ndb_binlog_running= FALSE; (void) pthread_cond_signal(&injector_cond); DBUG_PRINT("exit", ("ndb_binlog_thread")); diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h index f1a08625a34..4739c77a1bd 100644 --- a/sql/ha_ndbcluster_binlog.h +++ b/sql/ha_ndbcluster_binlog.h @@ -38,7 +38,9 @@ enum SCHEMA_OP_TYPE SOT_DROP_DB, SOT_CREATE_DB, SOT_ALTER_DB, - SOT_CLEAR_SLOCK + SOT_CLEAR_SLOCK, + SOT_TABLESPACE, + SOT_LOGFILE_GROUP }; const uint max_ndb_nodes= 64; /* multiple of 32 */ @@ -104,7 +106,7 @@ extern NDB_SHARE *apply_status_share; extern NDB_SHARE *schema_share; extern THD *injector_thd; -extern int ndb_binlog_thread_running; +extern my_bool ndb_binlog_running; bool ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, |