diff options
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 462 |
1 files changed, 227 insertions, 235 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index e34a22cf9f4..7097c0a1a46 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2000-2003 MySQL AB +/* Copyright (C) 2000-2003 MySQL AB, 2008-2009 Sun Microsystems, Inc This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,6 +33,8 @@ #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) #endif +extern my_bool opt_ndb_log_binlog_index; +extern ulong opt_ndb_extra_logging; /* defines for cluster replication table names */ @@ -44,14 +46,16 @@ Timeout for syncing schema events between mysql servers, and between mysql server and the binlog */ -const int opt_ndb_sync_timeout= 120; +static const int DEFAULT_SYNC_TIMEOUT= 120; + /* Flag showing if the ndb injector thread is running, if so == 1 -1 if it was started but later stopped for some reason 0 if never started */ -int ndb_binlog_thread_running= 0; +static int ndb_binlog_thread_running= 0; + /* Flag showing if the ndb binlog should be created, if so == TRUE FALSE if not @@ -75,7 +79,7 @@ THD *injector_thd= 0; to enable ndb injector thread receiving events. Must therefore always be used with a surrounding - pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation + mysql_mutex_lock(&injector_mutex), when doing create/dropEventOperation */ static Ndb *injector_ndb= 0; static Ndb *schema_ndb= 0; @@ -102,8 +106,8 @@ static int ndbcluster_binlog_terminating= 0; and injector thread */ pthread_t ndb_binlog_thread; -pthread_mutex_t injector_mutex; -pthread_cond_t injector_cond; +mysql_mutex_t injector_mutex; +mysql_cond_t injector_cond; /* NDB Injector thread (used for binlog creation) */ static ulonglong ndb_latest_applied_binlog_epoch= 0; @@ -112,7 +116,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; -pthread_mutex_t ndb_schema_share_mutex; +mysql_mutex_t ndb_schema_share_mutex; extern my_bool opt_log_slave_updates; static my_bool g_ndb_log_slave_updates; @@ -120,7 +124,7 @@ static my_bool g_ndb_log_slave_updates; /* Schema object distribution handling */ HASH ndb_schema_objects; typedef struct st_ndb_schema_object { - pthread_mutex_t mutex; + mysql_mutex_t mutex; char *key; uint key_length; uint use_count; @@ -247,8 +251,8 @@ static void run_query(THD *thd, char *buf, char *end, struct system_status_var save_thd_status_var= thd->status_var; THD_TRANS save_thd_transaction_all= thd->transaction.all; THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt; - ulonglong save_thd_options= thd->options; - DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options)); + ulonglong save_thd_options= thd->variables.option_bits; + DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->variables.option_bits)); NET save_thd_net= thd->net; const char* found_semicolon= NULL; @@ -257,12 +261,12 @@ static void run_query(THD *thd, char *buf, char *end, thd->variables.pseudo_thread_id= thread_id; thd->transaction.stmt.modified_non_trans_table= FALSE; if (disable_binlog) - thd->options&= ~OPTION_BIN_LOG; + thd->variables.option_bits&= ~OPTION_BIN_LOG; DBUG_PRINT("query", ("%s", thd->query())); DBUG_ASSERT(!thd->in_sub_stmt); - DBUG_ASSERT(!thd->prelocked_mode); + DBUG_ASSERT(!thd->locked_tables_mode); mysql_parse(thd, thd->query(), thd->query_length(), &found_semicolon); @@ -282,7 +286,13 @@ static void run_query(THD *thd, char *buf, char *end, thd_ndb->m_error_code, (int) thd->is_error(), thd->is_slave_error); } + + /* + After executing statement we should unlock and close tables open + by it as well as release meta-data locks obtained by it. + */ close_thread_tables(thd); + /* XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command() can not be called from within a statement, and @@ -295,13 +305,14 @@ static void run_query(THD *thd, char *buf, char *end, */ thd->stmt_da->reset_diagnostics_area(); - thd->options= save_thd_options; + thd->variables.option_bits= save_thd_options; thd->set_query(save_thd_query, save_thd_query_length); thd->variables.pseudo_thread_id= save_thread_id; thd->status_var= save_thd_status_var; thd->transaction.all= save_thd_transaction_all; thd->transaction.stmt= save_thd_transaction_stmt; thd->net= save_thd_net; + thd->set_current_stmt_binlog_format_row(); if (thd == injector_thd) { @@ -343,7 +354,7 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share, int error; DBUG_ENTER("ndbcluster_binlog_open_table"); - safe_mutex_assert_owner(&LOCK_open); + mysql_mutex_assert_owner(&LOCK_open); init_tmp_table_share(thd, table_share, share->db, 0, share->table_name, share->key); if ((error= open_table_def(thd, table_share, 0))) @@ -637,28 +648,28 @@ static int ndbcluster_binlog_end(THD *thd) however be a likely case as the ndbcluster_binlog_end is supposed to be called before ndb_cluster_end(). */ - pthread_mutex_lock(&LOCK_ndb_util_thread); + mysql_mutex_lock(&LOCK_ndb_util_thread); /* Ensure mutex are not freed if ndb_cluster_end is running at same time */ ndb_util_thread_running++; ndbcluster_terminating= 1; - pthread_cond_signal(&COND_ndb_util_thread); + mysql_cond_signal(&COND_ndb_util_thread); while (ndb_util_thread_running > 1) - pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread); + mysql_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread); ndb_util_thread_running--; - pthread_mutex_unlock(&LOCK_ndb_util_thread); + mysql_mutex_unlock(&LOCK_ndb_util_thread); } /* wait for injector thread to finish */ ndbcluster_binlog_terminating= 1; - pthread_mutex_lock(&injector_mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_lock(&injector_mutex); + mysql_cond_signal(&injector_cond); while (ndb_binlog_thread_running > 0) - pthread_cond_wait(&injector_cond, &injector_mutex); - pthread_mutex_unlock(&injector_mutex); + mysql_cond_wait(&injector_cond, &injector_mutex); + mysql_mutex_unlock(&injector_mutex); - pthread_mutex_destroy(&injector_mutex); - pthread_cond_destroy(&injector_cond); - pthread_mutex_destroy(&ndb_schema_share_mutex); + mysql_mutex_destroy(&injector_mutex); + mysql_cond_destroy(&injector_cond); + mysql_mutex_destroy(&ndb_schema_share_mutex); #endif DBUG_RETURN(0); @@ -738,14 +749,14 @@ void ndbcluster_binlog_init_handlerton() */ static NDB_SHARE *ndbcluster_check_ndb_apply_status_share() { - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); void *share= my_hash_search(&ndbcluster_open_tables, (uchar*) NDB_APPLY_TABLE_FILE, sizeof(NDB_APPLY_TABLE_FILE) - 1); DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx", NDB_APPLY_TABLE_FILE, (long) share)); - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); return (NDB_SHARE*) share; } @@ -756,14 +767,14 @@ static NDB_SHARE *ndbcluster_check_ndb_apply_status_share() */ static NDB_SHARE *ndbcluster_check_ndb_schema_share() { - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); void *share= my_hash_search(&ndbcluster_open_tables, (uchar*) NDB_SCHEMA_TABLE_FILE, sizeof(NDB_SCHEMA_TABLE_FILE) - 1); DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx", NDB_SCHEMA_TABLE_FILE, (long) share)); - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); return (NDB_SHARE*) share; } @@ -788,7 +799,7 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd) char buf[1024 + 1], *end; - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE); /* @@ -798,7 +809,7 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd) { build_table_filename(buf, sizeof(buf) - 1, NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0); - my_delete(buf, MYF(0)); + mysql_file_delete(key_file_frm, buf, MYF(0)); } /* @@ -846,7 +857,7 @@ static int ndbcluster_create_schema_table(THD *thd) char buf[1024 + 1], *end; - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE); /* @@ -856,7 +867,7 @@ static int ndbcluster_create_schema_table(THD *thd) { build_table_filename(buf, sizeof(buf) - 1, NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0); - my_delete(buf, MYF(0)); + mysql_file_delete(key_file_frm, buf, MYF(0)); } /* @@ -891,9 +902,9 @@ int ndbcluster_setup_binlog_table_shares(THD *thd) if (!ndb_schema_share && ndbcluster_check_ndb_schema_share() == 0) { - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE); - pthread_mutex_unlock(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); if (!ndb_schema_share) { ndbcluster_create_schema_table(thd); @@ -905,9 +916,9 @@ int ndbcluster_setup_binlog_table_shares(THD *thd) if (!ndb_apply_status_share && ndbcluster_check_ndb_apply_status_share() == 0) { - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE); - pthread_mutex_unlock(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); if (!ndb_apply_status_share) { ndbcluster_create_ndb_apply_status_table(thd); @@ -917,14 +928,14 @@ int ndbcluster_setup_binlog_table_shares(THD *thd) } if (!ndbcluster_find_all_files(thd)) { - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); ndb_binlog_tables_inited= TRUE; - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: ndb tables writable"); - close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE); - pthread_mutex_unlock(&LOCK_open); + close_cached_tables(NULL, NULL, TRUE, FALSE); + mysql_mutex_unlock(&LOCK_open); /* Signal injector thread that all is setup */ - pthread_cond_signal(&injector_cond); + mysql_cond_signal(&injector_cond); } return 0; } @@ -1235,12 +1246,12 @@ static void ndb_report_waiting(const char *key, { ulonglong ndb_latest_epoch= 0; const char *proc_info= "<no info>"; - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); if (injector_ndb) ndb_latest_epoch= injector_ndb->getLatestGCI(); if (injector_thd) proc_info= injector_thd->proc_info; - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); sql_print_information("NDB %s:" " waiting max %u sec for %s %s." " epochs: (%u,%u,%u)" @@ -1353,15 +1364,15 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, bitmap_set_all(&schema_subscribers); /* begin protect ndb_schema_share */ - pthread_mutex_lock(&ndb_schema_share_mutex); + mysql_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } - (void) pthread_mutex_lock(&ndb_schema_share->mutex); + mysql_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i]; @@ -1372,8 +1383,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, updated= 1; } } - (void) pthread_mutex_unlock(&ndb_schema_share->mutex); - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share->mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ if (updated) @@ -1393,10 +1404,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, if (ndb_schema_object) { - (void) pthread_mutex_lock(&ndb_schema_object->mutex); + mysql_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema_subscribers.bitmap, sizeof(ndb_schema_object->slock)); - (void) pthread_mutex_unlock(&ndb_schema_object->mutex); + mysql_mutex_unlock(&ndb_schema_object->mutex); } DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap, @@ -1498,7 +1509,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, r|= op->setValue(SCHEMA_TYPE_I, log_type); DBUG_ASSERT(r == 0); /* any value */ - if (!(thd->options & OPTION_BIN_LOG)) + if (!(thd->variables.option_bits & OPTION_BIN_LOG)) r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING); else r|= op->setAnyValue(thd->server_id); @@ -1561,12 +1572,12 @@ end: else dict->forceGCPWait(); - int max_timeout= opt_ndb_sync_timeout; - (void) pthread_mutex_lock(&ndb_schema_object->mutex); + int max_timeout= DEFAULT_SYNC_TIMEOUT; + mysql_mutex_lock(&ndb_schema_object->mutex); if (have_lock_open) { - safe_mutex_assert_owner(&LOCK_open); - (void) pthread_mutex_unlock(&LOCK_open); + mysql_mutex_assert_owner(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); } while (1) { @@ -1574,20 +1585,20 @@ end: int i; int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); set_timespec(abstime, 1); - int ret= pthread_cond_timedwait(&injector_cond, - &ndb_schema_object->mutex, - &abstime); + int ret= mysql_cond_timedwait(&injector_cond, + &ndb_schema_object->mutex, + &abstime); if (thd->killed) break; /* begin protect ndb_schema_share */ - pthread_mutex_lock(&ndb_schema_share_mutex); + mysql_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); break; } - (void) pthread_mutex_lock(&ndb_schema_share->mutex); + mysql_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { /* remove any unsubscribed from schema_subscribers */ @@ -1595,8 +1606,8 @@ end: if (!bitmap_is_clear_all(tmp)) bitmap_intersect(&schema_subscribers, tmp); } - (void) pthread_mutex_unlock(&ndb_schema_share->mutex); - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share->mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ /* remove any unsubscribed from ndb_schema_object->slock */ @@ -1618,16 +1629,16 @@ end: type_str, ndb_schema_object->key); break; } - if (ndb_extra_logging) + if (opt_ndb_extra_logging) ndb_report_waiting(type_str, max_timeout, "distributing", ndb_schema_object->key); } } if (have_lock_open) { - (void) pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); } - (void) pthread_mutex_unlock(&ndb_schema_object->mutex); + mysql_mutex_unlock(&ndb_schema_object->mutex); } if (ndb_schema_object) @@ -1709,7 +1720,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, { DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(), altered_table->getFrmLength()); - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); Ndb_table_guard ndbtab_g(dict, tabname); const NDBTAB *old= ndbtab_g.get_table(); if (!old && @@ -1735,7 +1746,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; - close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE); + close_cached_tables(thd, &table_list, TRUE, FALSE); if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 1))) @@ -1747,7 +1758,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, dbname= table_share->db.str; tabname= table_share->table_name.str; - pthread_mutex_unlock(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); } my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); @@ -1757,17 +1768,17 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, if (is_online_alter_table) { /* Signal ha_ndbcluster::alter_table that drop is done */ - (void) pthread_cond_signal(&injector_cond); + mysql_cond_signal(&injector_cond); DBUG_RETURN(0); } - (void) pthread_mutex_lock(&share->mutex); + mysql_mutex_lock(&share->mutex); if (is_rename_table && !is_remote_change) { DBUG_PRINT("info", ("Detected name change of table %s.%s", share->db, share->table_name)); /* ToDo: remove printout */ - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.", share_prefix, share->table->s->db.str, share->table->s->table_name.str, @@ -1797,10 +1808,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, // either just us or drop table handling as well /* Signal ha_ndbcluster::delete/rename_table that drop is done */ - (void) pthread_mutex_unlock(&share->mutex); - (void) pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&share->mutex); + mysql_cond_signal(&injector_cond); - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); /* ndb_share reference binlog free */ DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u", share->key, share->use_count)); @@ -1826,14 +1837,14 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, } else share= 0; - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); pOp->setCustomData(0); - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); ndb->dropEventOperation(pOp); pOp= 0; - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); if (do_close_cached_tables) { @@ -1841,7 +1852,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; - close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); + close_cached_tables(thd, &table_list, FALSE, FALSE); /* ndb_share reference create free */ DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u", share->key, share->use_count)); @@ -1870,7 +1881,7 @@ static void ndb_binlog_query(THD *thd, Cluster_schema *schema) thd->db= schema->db; int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, - schema->query_length, FALSE, + schema->query_length, FALSE, TRUE, schema->name[0] == 0 || thd->db[0] == 0, errcode); thd->server_id= thd_server_id_save; @@ -1962,7 +1973,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; - close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); + close_cached_tables(thd, &table_list, FALSE, FALSE); } /* ndb_share reference temporary free */ if (share) @@ -1974,7 +1985,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, } // fall through case SOT_CREATE_TABLE: - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); if (ndbcluster_check_if_local_table(schema->db, schema->name)) { DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'", @@ -1988,7 +1999,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { print_could_not_discover_error(thd, schema); } - pthread_mutex_unlock(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); log_query= 1; break; case SOT_DROP_DB: @@ -2057,18 +2068,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, // skip break; case NDBEVENT::TE_CLUSTER_FAILURE: - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.", ndb_schema_share->key, (unsigned) pOp->getGCI()); // fall through case NDBEVENT::TE_DROP: - if (ndb_extra_logging && + if (opt_ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); /* begin protect ndb_schema_share */ - pthread_mutex_lock(&ndb_schema_share_mutex); + mysql_mutex_lock(&ndb_schema_share_mutex); /* ndb_share reference binlog extra free */ DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u", ndb_schema_share->key, @@ -2076,10 +2087,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, free_share(&ndb_schema_share); ndb_schema_share= 0; ndb_binlog_tables_inited= 0; - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ - close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE); + close_cached_tables(NULL, NULL, FALSE, FALSE); // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, tmp_share); @@ -2088,10 +2099,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; DBUG_ASSERT(node_id != 0xFF); - (void) pthread_mutex_lock(&tmp_share->mutex); + mysql_mutex_lock(&tmp_share->mutex); bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]); DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); - if (ndb_extra_logging) + if (opt_ndb_extra_logging) { sql_print_information("NDB Binlog: Node: %d, down," " Subscriber bitmask %x%x", @@ -2099,8 +2110,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } - (void) pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&tmp_share->mutex); + mysql_cond_signal(&injector_cond); break; } case NDBEVENT::TE_SUBSCRIBE: @@ -2108,10 +2119,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); - (void) pthread_mutex_lock(&tmp_share->mutex); + mysql_mutex_lock(&tmp_share->mutex); bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); - if (ndb_extra_logging) + if (opt_ndb_extra_logging) { sql_print_information("NDB Binlog: Node: %d, subscribe from node %d," " Subscriber bitmask %x%x", @@ -2120,8 +2131,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } - (void) pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&tmp_share->mutex); + mysql_cond_signal(&injector_cond); break; } case NDBEVENT::TE_UNSUBSCRIBE: @@ -2129,10 +2140,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); - (void) pthread_mutex_lock(&tmp_share->mutex); + mysql_mutex_lock(&tmp_share->mutex); bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); - if (ndb_extra_logging) + if (opt_ndb_extra_logging) { sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d," " Subscriber bitmask %x%x", @@ -2141,8 +2152,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } - (void) pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&tmp_share->mutex); + mysql_cond_signal(&injector_cond); break; } default: @@ -2182,22 +2193,22 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0); if (schema_type == SOT_CLEAR_SLOCK) { - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); NDB_SCHEMA_OBJECT *ndb_schema_object= (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects, (uchar*) key, strlen(key)); if (ndb_schema_object) { - pthread_mutex_lock(&ndb_schema_object->mutex); + mysql_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema->slock, sizeof(ndb_schema_object->slock)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (uchar*)ndb_schema_object->slock_bitmap.bitmap, no_bytes_in_map(&ndb_schema_object->slock_bitmap)); - pthread_mutex_unlock(&ndb_schema_object->mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&ndb_schema_object->mutex); + mysql_cond_signal(&injector_cond); } - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); continue; } /* ndb_share reference temporary, free below */ @@ -2236,7 +2247,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; - close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); + close_cached_tables(thd, &table_list, FALSE, FALSE); } if (schema_type != SOT_ALTER_TABLE) break; @@ -2257,7 +2268,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, free_share(&share); share= 0; } - pthread_mutex_lock(&LOCK_open); + mysql_mutex_lock(&LOCK_open); if (ndbcluster_check_if_local_table(schema->db, schema->name)) { DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'", @@ -2271,7 +2282,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, { print_could_not_discover_error(thd, schema); } - pthread_mutex_unlock(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); } break; default: @@ -2323,22 +2334,20 @@ struct ndb_binlog_index_row { /* Open the ndb_binlog_index table */ -static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables, - TABLE **ndb_binlog_index) +static int open_ndb_binlog_index(THD *thd, TABLE **ndb_binlog_index) { static char repdb[]= NDB_REP_DB; static char reptable[]= NDB_REP_TABLE; const char *save_proc_info= thd->proc_info; + TABLE_LIST *tables= &binlog_tables; - bzero((char*) tables, sizeof(*tables)); - tables->db= repdb; - tables->alias= tables->table_name= reptable; - tables->lock_type= TL_WRITE; + tables->init_one_table(repdb, strlen(repdb), reptable, strlen(reptable), + reptable, TL_WRITE); thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE; + tables->required_type= FRMTYPE_TABLE; - uint counter; thd->clear_error(); - if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH)) + if (open_and_lock_tables(thd, tables, FALSE, 0)) { if (thd->killed) sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed"); @@ -2364,36 +2373,18 @@ int ndb_add_ndb_binlog_index(THD *thd, void *_row) { ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row; int error= 0; - bool need_reopen; /* Turn of binlogging to prevent the table changes to be written to the binary log. */ - ulong saved_options= thd->options; - thd->options&= ~(OPTION_BIN_LOG); + ulong saved_options= thd->variables.option_bits; + thd->variables.option_bits&= ~OPTION_BIN_LOG; - for ( ; ; ) /* loop for need_reopen */ + if (!ndb_binlog_index && open_ndb_binlog_index(thd, &ndb_binlog_index)) { - if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index)) - { - error= -1; - goto add_ndb_binlog_index_err; - } - - if (lock_tables(thd, &binlog_tables, 1, &need_reopen)) - { - if (need_reopen) - { - TABLE_LIST *p_binlog_tables= &binlog_tables; - close_tables_for_reopen(thd, &p_binlog_tables); - ndb_binlog_index= 0; - continue; - } - sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index"); - error= -1; - goto add_ndb_binlog_index_err; - } - break; + sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index"); + error= -1; + goto add_ndb_binlog_index_err; } /* @@ -2418,14 +2409,10 @@ int ndb_add_ndb_binlog_index(THD *thd, void *_row) goto add_ndb_binlog_index_err; } - mysql_unlock_tables(thd, thd->lock); - thd->lock= 0; - thd->options= saved_options; - return 0; add_ndb_binlog_index_err: close_thread_tables(thd); ndb_binlog_index= 0; - thd->options= saved_options; + thd->variables.option_bits= saved_options; return error; } @@ -2458,27 +2445,29 @@ int ndbcluster_binlog_start() DBUG_RETURN(-1); } - pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init(&injector_cond, NULL); - pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_injector_mutex, &injector_mutex, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_injector_cond, &injector_cond, NULL); + mysql_mutex_init(key_ndb_schema_share_mutex, + &ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ - if (pthread_create(&ndb_binlog_thread, &connection_attrib, - ndb_binlog_thread_func, 0)) + if (mysql_thread_create(key_thread_ndb_binlog, + &ndb_binlog_thread, &connection_attrib, + ndb_binlog_thread_func, 0)) { DBUG_PRINT("error", ("Could not create ndb injector thread")); - pthread_cond_destroy(&injector_cond); - pthread_mutex_destroy(&injector_mutex); + mysql_cond_destroy(&injector_cond); + mysql_mutex_destroy(&injector_mutex); DBUG_RETURN(-1); } ndbcluster_binlog_inited= 1; /* Wait for the injector thread to start */ - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); while (!ndb_binlog_thread_running) - pthread_cond_wait(&injector_cond, &injector_mutex); - pthread_mutex_unlock(&injector_mutex); + mysql_cond_wait(&injector_cond, &injector_mutex); + mysql_mutex_unlock(&injector_mutex); if (ndb_binlog_thread_running < 0) DBUG_RETURN(-1); @@ -2568,7 +2557,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); DBUG_ASSERT(strlen(key) == key_len); - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); /* Handle any trailing share */ NDB_SHARE *share= (NDB_SHARE*) my_hash_search(&ndbcluster_open_tables, @@ -2580,7 +2569,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, share->op != 0 || share->op_old != 0) { - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); // replication already setup, or should not } } @@ -2590,7 +2579,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, if (share->op || share->op_old) { my_errno= HA_ERR_TABLE_EXIST; - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(1); } if (!share_may_exist || share->connect_count != @@ -2633,10 +2622,10 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, if (!do_event_op) { share->flags|= NSF_NO_BINLOG; - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); while (share && !IS_TMP_PREFIX(table_name)) { @@ -2655,7 +2644,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, const NDBTAB *ndbtab= ndbtab_g.get_table(); if (ndbtab == 0) { - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: Failed to get table %s from ndb: " "%s, %d", key, dict->getNdbError().message, dict->getNdbError().code); @@ -2677,7 +2666,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, event_name.c_ptr()); break; // error } - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: " "CREATE (DISCOVER) TABLE Event: %s", event_name.c_ptr()); @@ -2685,7 +2674,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, else { delete ev; - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s", event_name.c_ptr()); } @@ -2929,14 +2918,14 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, int retry_sleep= 100; while (1) { - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); Ndb *ndb= injector_ndb; if (do_ndb_schema_share) ndb= schema_ndb; if (ndb == 0) { - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } @@ -2961,7 +2950,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb->getNdbError().code, ndb->getNdbError().message, "NDB"); - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } @@ -3011,7 +3000,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, op->getNdbError().message, "NDB"); ndb->dropEventOperation(op); - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } } @@ -3053,7 +3042,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, op->getNdbError().code, op->getNdbError().message); } ndb->dropEventOperation(op); - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); if (retries) { my_sleep(retry_sleep); @@ -3061,7 +3050,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } DBUG_RETURN(-1); } - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); break; } @@ -3075,7 +3064,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb_apply_status_share= get_share(share); DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u", share->key, share->use_count)); - (void) pthread_cond_signal(&injector_cond); + mysql_cond_signal(&injector_cond); } else if (do_ndb_schema_share) { @@ -3083,13 +3072,13 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb_schema_share= get_share(share); DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u", share->key, share->use_count)); - (void) pthread_cond_signal(&injector_cond); + mysql_cond_signal(&injector_cond); } DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u", share->key, (long) share->op, share->use_count)); - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: logging %s", share->key); DBUG_RETURN(0); } @@ -3154,17 +3143,17 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, #define SYNC_DROP_ #ifdef SYNC_DROP_ thd->proc_info= "Syncing ndb table schema operation and binlog"; - (void) pthread_mutex_lock(&share->mutex); - safe_mutex_assert_owner(&LOCK_open); - (void) pthread_mutex_unlock(&LOCK_open); - int max_timeout= opt_ndb_sync_timeout; + mysql_mutex_lock(&share->mutex); + mysql_mutex_assert_owner(&LOCK_open); + mysql_mutex_unlock(&LOCK_open); + int max_timeout= DEFAULT_SYNC_TIMEOUT; while (share->op) { struct timespec abstime; set_timespec(abstime, 1); - int ret= pthread_cond_timedwait(&injector_cond, - &share->mutex, - &abstime); + int ret= mysql_cond_timedwait(&injector_cond, + &share->mutex, + &abstime); if (thd->killed || share->op == 0) break; @@ -3177,18 +3166,18 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, type_str, share->key); break; } - if (ndb_extra_logging) + if (opt_ndb_extra_logging) ndb_report_waiting(type_str, max_timeout, type_str, share->key); } } - (void) pthread_mutex_lock(&LOCK_open); - (void) pthread_mutex_unlock(&share->mutex); + mysql_mutex_lock(&LOCK_open); + mysql_mutex_unlock(&share->mutex); #else - (void) pthread_mutex_lock(&share->mutex); + mysql_mutex_lock(&share->mutex); share->op_old= share->op; share->op= 0; - (void) pthread_mutex_unlock(&share->mutex); + mysql_mutex_unlock(&share->mutex); #endif thd->proc_info= save_proc_info; @@ -3255,12 +3244,12 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, switch (type) { case NDBEVENT::TE_CLUSTER_FAILURE: - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.", share->key, (unsigned) pOp->getGCI()); if (ndb_apply_status_share == share) { - if (ndb_extra_logging && + if (opt_ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); @@ -3280,7 +3269,7 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, case NDBEVENT::TE_DROP: if (ndb_apply_status_share == share) { - if (ndb_extra_logging && + if (opt_ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); @@ -3292,7 +3281,7 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, ndb_binlog_tables_inited= 0; } /* ToDo: remove printout */ - if (ndb_extra_logging) + if (opt_ndb_extra_logging) sql_print_information("NDB Binlog: drop table %s.", share->key); // fall through case NDBEVENT::TE_ALTER: @@ -3556,7 +3545,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, DBUG_PRINT("enter", ("key: '%s'", key)); if (!have_lock) - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); while (!(ndb_schema_object= (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects, (uchar*) key, @@ -3582,7 +3571,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, my_free((uchar*) ndb_schema_object, 0); break; } - pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_ndb_schema_object_mutex, &ndb_schema_object->mutex, MY_MUTEX_INIT_FAST); bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock, sizeof(ndb_schema_object->slock)*8, FALSE); bitmap_clear_all(&ndb_schema_object->slock_bitmap); @@ -3594,7 +3583,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count)); } if (!have_lock) - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(ndb_schema_object); } @@ -3605,12 +3594,12 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, DBUG_ENTER("ndb_free_schema_object"); DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key)); if (!have_lock) - pthread_mutex_lock(&ndbcluster_mutex); + mysql_mutex_lock(&ndbcluster_mutex); if (!--(*ndb_schema_object)->use_count) { DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object); - pthread_mutex_destroy(&(*ndb_schema_object)->mutex); + mysql_mutex_destroy(&(*ndb_schema_object)->mutex); my_free((uchar*) *ndb_schema_object, MYF(0)); *ndb_schema_object= 0; } @@ -3619,10 +3608,12 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); } if (!have_lock) - pthread_mutex_unlock(&ndbcluster_mutex); + mysql_mutex_unlock(&ndbcluster_mutex); DBUG_VOID_RETURN; } +extern ulong opt_ndb_report_thresh_binlog_epoch_slip; +extern ulong opt_ndb_report_thresh_binlog_mem_usage; pthread_handler_t ndb_binlog_thread_func(void *arg) { @@ -3638,7 +3629,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Timer main_timer; #endif - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); /* Set up the Thread */ @@ -3647,13 +3638,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd= new THD; /* note that contructor of THD uses DBUG_ */ THD_CHECK_SENTRY(thd); + thd->set_current_stmt_binlog_format_row(); /* We need to set thd->thread_id before thd->store_globals, or it will set an invalid value for thd->variables.pseudo_thread_id. */ - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->thread_id= thread_id++; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + mysql_thread_set_psi_id(thd->thread_id); thd->thread_stack= (char*) &thd; /* remember where our stack is */ if (thd->store_globals()) @@ -3661,15 +3655,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->cleanup(); delete thd; ndb_binlog_thread_running= -1; - pthread_mutex_unlock(&injector_mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&injector_mutex); + mysql_cond_signal(&injector_cond); DBUG_LEAVE; // Must match DBUG_ENTER() my_thread_end(); pthread_exit(0); return NULL; // Avoid compiler warnings } - lex_start(thd); thd->init_for_queries(); thd->command= COM_DAEMON; @@ -3680,6 +3673,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) my_net_init(&thd->net, 0); thd->main_security_ctx.master_access= ~0; thd->main_security_ctx.priv_user= 0; + /* Do not use user-supplied timeout value for system threads. */ + thd->variables.lock_wait_timeout= LONG_TIMEOUT; /* Set up ndb binlog @@ -3688,9 +3683,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_detach_this_thread(); thd->real_id= pthread_self(); - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); threads.append(thd); - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); thd->lex->start_transaction_opt= 0; if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) || @@ -3698,8 +3693,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); ndb_binlog_thread_running= -1; - pthread_mutex_unlock(&injector_mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&injector_mutex); + mysql_cond_signal(&injector_cond); goto err; } @@ -3709,8 +3704,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { sql_print_error("NDB Binlog: Getting Ndb object failed"); ndb_binlog_thread_running= -1; - pthread_mutex_unlock(&injector_mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&injector_mutex); + mysql_cond_signal(&injector_cond); goto err; } @@ -3723,7 +3718,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Used by both sql client thread and binlog thread to interact with the storage - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); */ injector_thd= thd; injector_ndb= i_ndb; @@ -3738,27 +3733,27 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) /* Thread start up completed */ ndb_binlog_thread_running= 1; - pthread_mutex_unlock(&injector_mutex); - pthread_cond_signal(&injector_cond); + mysql_mutex_unlock(&injector_mutex); + mysql_cond_signal(&injector_cond); /* wait for mysql server to start (so that the binlog is started and thus can receive the first GAP event) */ - pthread_mutex_lock(&LOCK_server_started); + mysql_mutex_lock(&LOCK_server_started); while (!mysqld_server_started) { struct timespec abstime; set_timespec(abstime, 1); - pthread_cond_timedwait(&COND_server_started, &LOCK_server_started, - &abstime); + mysql_cond_timedwait(&COND_server_started, &LOCK_server_started, + &abstime); if (ndbcluster_terminating) { - pthread_mutex_unlock(&LOCK_server_started); + mysql_mutex_unlock(&LOCK_server_started); goto err; } } - pthread_mutex_unlock(&LOCK_server_started); + mysql_mutex_unlock(&LOCK_server_started); restart: /* Main NDB Injector loop @@ -3801,21 +3796,21 @@ restart: { thd->proc_info= "Waiting for ndbcluster to start"; - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); while (!ndb_schema_share || (ndb_binlog_running && !ndb_apply_status_share)) { /* ndb not connected yet */ struct timespec abstime; set_timespec(abstime, 1); - pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); + mysql_cond_timedwait(&injector_cond, &injector_mutex, &abstime); if (ndbcluster_binlog_terminating) { - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); goto err; } } - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); if (thd_ndb == NULL) { @@ -3882,7 +3877,7 @@ restart: "Changes to the database that occured while " "disconnected will not be in the binlog"); } - if (ndb_extra_logging) + if (opt_ndb_extra_logging) { sql_print_information("NDB Binlog: starting log at epoch %u", (unsigned)schema_gci); @@ -3892,9 +3887,6 @@ restart: { static char db[]= ""; thd->db= db; - if (ndb_binlog_running) - open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index); - thd->db= db; } do_ndbcluster_binlog_close_connection= BCCC_running; for ( ; !((ndbcluster_binlog_terminating || @@ -3984,9 +3976,9 @@ restart: { thd->proc_info= "Processing events from schema table"; s_ndb-> - setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); + setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip); s_ndb-> - setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); + setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage); NdbEventOperation *pOp= s_ndb->nextEvent(); while (pOp != NULL) { @@ -4049,8 +4041,8 @@ restart: /* initialize some variables for this epoch */ g_ndb_log_slave_updates= opt_log_slave_updates; i_ndb-> - setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); - i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); + setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip); + i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage); bzero((char*) &row, sizeof(row)); thd->variables.character_set_client= &my_charset_latin1; @@ -4286,13 +4278,13 @@ err: DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; close_thread_tables(thd); - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ injector_thd= 0; injector_ndb= 0; p_latest_trans_gci= 0; schema_ndb= 0; - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory if (ndb_apply_status_share) @@ -4307,7 +4299,7 @@ err: if (ndb_schema_share) { /* begin protect ndb_schema_share */ - pthread_mutex_lock(&ndb_schema_share_mutex); + mysql_mutex_lock(&ndb_schema_share_mutex); /* ndb_share reference binlog extra free */ DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u", ndb_schema_share->key, @@ -4315,7 +4307,7 @@ err: free_share(&ndb_schema_share); ndb_schema_share= 0; ndb_binlog_tables_inited= 0; - pthread_mutex_unlock(&ndb_schema_share_mutex); + mysql_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ } @@ -4375,7 +4367,7 @@ err: ndb_binlog_thread_running= -1; ndb_binlog_running= FALSE; - (void) pthread_cond_signal(&injector_cond); + mysql_cond_signal(&injector_cond); DBUG_PRINT("exit", ("ndb_binlog_thread")); @@ -4394,12 +4386,12 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ulonglong ndb_latest_epoch= 0; DBUG_ENTER("ndbcluster_show_status_binlog"); - pthread_mutex_lock(&injector_mutex); + mysql_mutex_lock(&injector_mutex); if (injector_ndb) { char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22]; ndb_latest_epoch= injector_ndb->getLatestGCI(); - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); buflen= snprintf(buf, sizeof(buf), @@ -4419,7 +4411,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, DBUG_RETURN(TRUE); } else - pthread_mutex_unlock(&injector_mutex); + mysql_mutex_unlock(&injector_mutex); DBUG_RETURN(FALSE); } |