summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc462
1 files changed, 227 insertions, 235 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index e34a22cf9f4..7097c0a1a46 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2000-2003 MySQL AB
+/* Copyright (C) 2000-2003 MySQL AB, 2008-2009 Sun Microsystems, Inc
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -33,6 +33,8 @@
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
+extern my_bool opt_ndb_log_binlog_index;
+extern ulong opt_ndb_extra_logging;
/*
defines for cluster replication table names
*/
@@ -44,14 +46,16 @@
Timeout for syncing schema events between
mysql servers, and between mysql server and the binlog
*/
-const int opt_ndb_sync_timeout= 120;
+static const int DEFAULT_SYNC_TIMEOUT= 120;
+
/*
Flag showing if the ndb injector thread is running, if so == 1
-1 if it was started but later stopped for some reason
0 if never started
*/
-int ndb_binlog_thread_running= 0;
+static int ndb_binlog_thread_running= 0;
+
/*
Flag showing if the ndb binlog should be created, if so == TRUE
FALSE if not
@@ -75,7 +79,7 @@ THD *injector_thd= 0;
to enable ndb injector thread receiving events.
Must therefore always be used with a surrounding
- pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
+ mysql_mutex_lock(&injector_mutex), when doing create/dropEventOperation
*/
static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0;
@@ -102,8 +106,8 @@ static int ndbcluster_binlog_terminating= 0;
and injector thread
*/
pthread_t ndb_binlog_thread;
-pthread_mutex_t injector_mutex;
-pthread_cond_t injector_cond;
+mysql_mutex_t injector_mutex;
+mysql_cond_t injector_cond;
/* NDB Injector thread (used for binlog creation) */
static ulonglong ndb_latest_applied_binlog_epoch= 0;
@@ -112,7 +116,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0;
NDB_SHARE *ndb_apply_status_share= 0;
NDB_SHARE *ndb_schema_share= 0;
-pthread_mutex_t ndb_schema_share_mutex;
+mysql_mutex_t ndb_schema_share_mutex;
extern my_bool opt_log_slave_updates;
static my_bool g_ndb_log_slave_updates;
@@ -120,7 +124,7 @@ static my_bool g_ndb_log_slave_updates;
/* Schema object distribution handling */
HASH ndb_schema_objects;
typedef struct st_ndb_schema_object {
- pthread_mutex_t mutex;
+ mysql_mutex_t mutex;
char *key;
uint key_length;
uint use_count;
@@ -247,8 +251,8 @@ static void run_query(THD *thd, char *buf, char *end,
struct system_status_var save_thd_status_var= thd->status_var;
THD_TRANS save_thd_transaction_all= thd->transaction.all;
THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
- ulonglong save_thd_options= thd->options;
- DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
+ ulonglong save_thd_options= thd->variables.option_bits;
+ DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->variables.option_bits));
NET save_thd_net= thd->net;
const char* found_semicolon= NULL;
@@ -257,12 +261,12 @@ static void run_query(THD *thd, char *buf, char *end,
thd->variables.pseudo_thread_id= thread_id;
thd->transaction.stmt.modified_non_trans_table= FALSE;
if (disable_binlog)
- thd->options&= ~OPTION_BIN_LOG;
+ thd->variables.option_bits&= ~OPTION_BIN_LOG;
DBUG_PRINT("query", ("%s", thd->query()));
DBUG_ASSERT(!thd->in_sub_stmt);
- DBUG_ASSERT(!thd->prelocked_mode);
+ DBUG_ASSERT(!thd->locked_tables_mode);
mysql_parse(thd, thd->query(), thd->query_length(), &found_semicolon);
@@ -282,7 +286,13 @@ static void run_query(THD *thd, char *buf, char *end,
thd_ndb->m_error_code,
(int) thd->is_error(), thd->is_slave_error);
}
+
+ /*
+ After executing statement we should unlock and close tables open
+ by it as well as release meta-data locks obtained by it.
+ */
close_thread_tables(thd);
+
/*
XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
can not be called from within a statement, and
@@ -295,13 +305,14 @@ static void run_query(THD *thd, char *buf, char *end,
*/
thd->stmt_da->reset_diagnostics_area();
- thd->options= save_thd_options;
+ thd->variables.option_bits= save_thd_options;
thd->set_query(save_thd_query, save_thd_query_length);
thd->variables.pseudo_thread_id= save_thread_id;
thd->status_var= save_thd_status_var;
thd->transaction.all= save_thd_transaction_all;
thd->transaction.stmt= save_thd_transaction_stmt;
thd->net= save_thd_net;
+ thd->set_current_stmt_binlog_format_row();
if (thd == injector_thd)
{
@@ -343,7 +354,7 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
int error;
DBUG_ENTER("ndbcluster_binlog_open_table");
- safe_mutex_assert_owner(&LOCK_open);
+ mysql_mutex_assert_owner(&LOCK_open);
init_tmp_table_share(thd, table_share, share->db, 0, share->table_name,
share->key);
if ((error= open_table_def(thd, table_share, 0)))
@@ -637,28 +648,28 @@ static int ndbcluster_binlog_end(THD *thd)
however be a likely case as the ndbcluster_binlog_end is supposed to
be called before ndb_cluster_end().
*/
- pthread_mutex_lock(&LOCK_ndb_util_thread);
+ mysql_mutex_lock(&LOCK_ndb_util_thread);
/* Ensure mutex are not freed if ndb_cluster_end is running at same time */
ndb_util_thread_running++;
ndbcluster_terminating= 1;
- pthread_cond_signal(&COND_ndb_util_thread);
+ mysql_cond_signal(&COND_ndb_util_thread);
while (ndb_util_thread_running > 1)
- pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
+ mysql_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
ndb_util_thread_running--;
- pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ mysql_mutex_unlock(&LOCK_ndb_util_thread);
}
/* wait for injector thread to finish */
ndbcluster_binlog_terminating= 1;
- pthread_mutex_lock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_lock(&injector_mutex);
+ mysql_cond_signal(&injector_cond);
while (ndb_binlog_thread_running > 0)
- pthread_cond_wait(&injector_cond, &injector_mutex);
- pthread_mutex_unlock(&injector_mutex);
+ mysql_cond_wait(&injector_cond, &injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
- pthread_mutex_destroy(&injector_mutex);
- pthread_cond_destroy(&injector_cond);
- pthread_mutex_destroy(&ndb_schema_share_mutex);
+ mysql_mutex_destroy(&injector_mutex);
+ mysql_cond_destroy(&injector_cond);
+ mysql_mutex_destroy(&ndb_schema_share_mutex);
#endif
DBUG_RETURN(0);
@@ -738,14 +749,14 @@ void ndbcluster_binlog_init_handlerton()
*/
static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
{
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
void *share= my_hash_search(&ndbcluster_open_tables,
(uchar*) NDB_APPLY_TABLE_FILE,
sizeof(NDB_APPLY_TABLE_FILE) - 1);
DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
NDB_APPLY_TABLE_FILE, (long) share));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
return (NDB_SHARE*) share;
}
@@ -756,14 +767,14 @@ static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
*/
static NDB_SHARE *ndbcluster_check_ndb_schema_share()
{
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
void *share= my_hash_search(&ndbcluster_open_tables,
(uchar*) NDB_SCHEMA_TABLE_FILE,
sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
NDB_SCHEMA_TABLE_FILE, (long) share));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
return (NDB_SHARE*) share;
}
@@ -788,7 +799,7 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd)
char buf[1024 + 1], *end;
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
/*
@@ -798,7 +809,7 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd)
{
build_table_filename(buf, sizeof(buf) - 1,
NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
- my_delete(buf, MYF(0));
+ mysql_file_delete(key_file_frm, buf, MYF(0));
}
/*
@@ -846,7 +857,7 @@ static int ndbcluster_create_schema_table(THD *thd)
char buf[1024 + 1], *end;
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
/*
@@ -856,7 +867,7 @@ static int ndbcluster_create_schema_table(THD *thd)
{
build_table_filename(buf, sizeof(buf) - 1,
NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
- my_delete(buf, MYF(0));
+ mysql_file_delete(key_file_frm, buf, MYF(0));
}
/*
@@ -891,9 +902,9 @@ int ndbcluster_setup_binlog_table_shares(THD *thd)
if (!ndb_schema_share &&
ndbcluster_check_ndb_schema_share() == 0)
{
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
- pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
if (!ndb_schema_share)
{
ndbcluster_create_schema_table(thd);
@@ -905,9 +916,9 @@ int ndbcluster_setup_binlog_table_shares(THD *thd)
if (!ndb_apply_status_share &&
ndbcluster_check_ndb_apply_status_share() == 0)
{
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
- pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
if (!ndb_apply_status_share)
{
ndbcluster_create_ndb_apply_status_table(thd);
@@ -917,14 +928,14 @@ int ndbcluster_setup_binlog_table_shares(THD *thd)
}
if (!ndbcluster_find_all_files(thd))
{
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
ndb_binlog_tables_inited= TRUE;
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: ndb tables writable");
- close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
- pthread_mutex_unlock(&LOCK_open);
+ close_cached_tables(NULL, NULL, TRUE, FALSE);
+ mysql_mutex_unlock(&LOCK_open);
/* Signal injector thread that all is setup */
- pthread_cond_signal(&injector_cond);
+ mysql_cond_signal(&injector_cond);
}
return 0;
}
@@ -1235,12 +1246,12 @@ static void ndb_report_waiting(const char *key,
{
ulonglong ndb_latest_epoch= 0;
const char *proc_info= "<no info>";
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
if (injector_ndb)
ndb_latest_epoch= injector_ndb->getLatestGCI();
if (injector_thd)
proc_info= injector_thd->proc_info;
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
sql_print_information("NDB %s:"
" waiting max %u sec for %s %s."
" epochs: (%u,%u,%u)"
@@ -1353,15 +1364,15 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
bitmap_set_all(&schema_subscribers);
/* begin protect ndb_schema_share */
- pthread_mutex_lock(&ndb_schema_share_mutex);
+ mysql_mutex_lock(&ndb_schema_share_mutex);
if (ndb_schema_share == 0)
{
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
if (ndb_schema_object)
ndb_free_schema_object(&ndb_schema_object, FALSE);
DBUG_RETURN(0);
}
- (void) pthread_mutex_lock(&ndb_schema_share->mutex);
+ mysql_mutex_lock(&ndb_schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
@@ -1372,8 +1383,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
updated= 1;
}
}
- (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share->mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
if (updated)
@@ -1393,10 +1404,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
if (ndb_schema_object)
{
- (void) pthread_mutex_lock(&ndb_schema_object->mutex);
+ mysql_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
sizeof(ndb_schema_object->slock));
- (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
+ mysql_mutex_unlock(&ndb_schema_object->mutex);
}
DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
@@ -1498,7 +1509,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
r|= op->setValue(SCHEMA_TYPE_I, log_type);
DBUG_ASSERT(r == 0);
/* any value */
- if (!(thd->options & OPTION_BIN_LOG))
+ if (!(thd->variables.option_bits & OPTION_BIN_LOG))
r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
else
r|= op->setAnyValue(thd->server_id);
@@ -1561,12 +1572,12 @@ end:
else
dict->forceGCPWait();
- int max_timeout= opt_ndb_sync_timeout;
- (void) pthread_mutex_lock(&ndb_schema_object->mutex);
+ int max_timeout= DEFAULT_SYNC_TIMEOUT;
+ mysql_mutex_lock(&ndb_schema_object->mutex);
if (have_lock_open)
{
- safe_mutex_assert_owner(&LOCK_open);
- (void) pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_assert_owner(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
}
while (1)
{
@@ -1574,20 +1585,20 @@ end:
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
set_timespec(abstime, 1);
- int ret= pthread_cond_timedwait(&injector_cond,
- &ndb_schema_object->mutex,
- &abstime);
+ int ret= mysql_cond_timedwait(&injector_cond,
+ &ndb_schema_object->mutex,
+ &abstime);
if (thd->killed)
break;
/* begin protect ndb_schema_share */
- pthread_mutex_lock(&ndb_schema_share_mutex);
+ mysql_mutex_lock(&ndb_schema_share_mutex);
if (ndb_schema_share == 0)
{
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
break;
}
- (void) pthread_mutex_lock(&ndb_schema_share->mutex);
+ mysql_mutex_lock(&ndb_schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
@@ -1595,8 +1606,8 @@ end:
if (!bitmap_is_clear_all(tmp))
bitmap_intersect(&schema_subscribers, tmp);
}
- (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share->mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
/* remove any unsubscribed from ndb_schema_object->slock */
@@ -1618,16 +1629,16 @@ end:
type_str, ndb_schema_object->key);
break;
}
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key);
}
}
if (have_lock_open)
{
- (void) pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
}
- (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
+ mysql_mutex_unlock(&ndb_schema_object->mutex);
}
if (ndb_schema_object)
@@ -1709,7 +1720,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
{
DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(),
altered_table->getFrmLength());
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
Ndb_table_guard ndbtab_g(dict, tabname);
const NDBTAB *old= ndbtab_g.get_table();
if (!old &&
@@ -1735,7 +1746,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
+ close_cached_tables(thd, &table_list, TRUE, FALSE);
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table, 1)))
@@ -1747,7 +1758,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
dbname= table_share->db.str;
tabname= table_share->table_name.str;
- pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
}
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
@@ -1757,17 +1768,17 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
if (is_online_alter_table)
{
/* Signal ha_ndbcluster::alter_table that drop is done */
- (void) pthread_cond_signal(&injector_cond);
+ mysql_cond_signal(&injector_cond);
DBUG_RETURN(0);
}
- (void) pthread_mutex_lock(&share->mutex);
+ mysql_mutex_lock(&share->mutex);
if (is_rename_table && !is_remote_change)
{
DBUG_PRINT("info", ("Detected name change of table %s.%s",
share->db, share->table_name));
/* ToDo: remove printout */
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
share_prefix, share->table->s->db.str,
share->table->s->table_name.str,
@@ -1797,10 +1808,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
// either just us or drop table handling as well
/* Signal ha_ndbcluster::delete/rename_table that drop is done */
- (void) pthread_mutex_unlock(&share->mutex);
- (void) pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&share->mutex);
+ mysql_cond_signal(&injector_cond);
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
/* ndb_share reference binlog free */
DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
share->key, share->use_count));
@@ -1826,14 +1837,14 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
}
else
share= 0;
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
pOp->setCustomData(0);
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
ndb->dropEventOperation(pOp);
pOp= 0;
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
if (do_close_cached_tables)
{
@@ -1841,7 +1852,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
+ close_cached_tables(thd, &table_list, FALSE, FALSE);
/* ndb_share reference create free */
DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
share->key, share->use_count));
@@ -1870,7 +1881,7 @@ static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
thd->db= schema->db;
int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
- schema->query_length, FALSE,
+ schema->query_length, FALSE, TRUE,
schema->name[0] == 0 || thd->db[0] == 0,
errcode);
thd->server_id= thd_server_id_save;
@@ -1962,7 +1973,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= schema->db;
table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
+ close_cached_tables(thd, &table_list, FALSE, FALSE);
}
/* ndb_share reference temporary free */
if (share)
@@ -1974,7 +1985,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
}
// fall through
case SOT_CREATE_TABLE:
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
if (ndbcluster_check_if_local_table(schema->db, schema->name))
{
DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
@@ -1988,7 +1999,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
{
print_could_not_discover_error(thd, schema);
}
- pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
log_query= 1;
break;
case SOT_DROP_DB:
@@ -2057,18 +2068,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
// skip
break;
case NDBEVENT::TE_CLUSTER_FAILURE:
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
ndb_schema_share->key, (unsigned) pOp->getGCI());
// fall through
case NDBEVENT::TE_DROP:
- if (ndb_extra_logging &&
+ if (opt_ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
/* begin protect ndb_schema_share */
- pthread_mutex_lock(&ndb_schema_share_mutex);
+ mysql_mutex_lock(&ndb_schema_share_mutex);
/* ndb_share reference binlog extra free */
DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
ndb_schema_share->key,
@@ -2076,10 +2087,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
free_share(&ndb_schema_share);
ndb_schema_share= 0;
ndb_binlog_tables_inited= 0;
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
- close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
+ close_cached_tables(NULL, NULL, FALSE, FALSE);
// fall through
case NDBEVENT::TE_ALTER:
ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
@@ -2088,10 +2099,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
{
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
DBUG_ASSERT(node_id != 0xFF);
- (void) pthread_mutex_lock(&tmp_share->mutex);
+ mysql_mutex_lock(&tmp_share->mutex);
bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
{
sql_print_information("NDB Binlog: Node: %d, down,"
" Subscriber bitmask %x%x",
@@ -2099,8 +2110,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
tmp_share->subscriber_bitmap[node_id].bitmap[1],
tmp_share->subscriber_bitmap[node_id].bitmap[0]);
}
- (void) pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&tmp_share->mutex);
+ mysql_cond_signal(&injector_cond);
break;
}
case NDBEVENT::TE_SUBSCRIBE:
@@ -2108,10 +2119,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
uint8 req_id= pOp->getReqNodeId();
DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- (void) pthread_mutex_lock(&tmp_share->mutex);
+ mysql_mutex_lock(&tmp_share->mutex);
bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
{
sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
" Subscriber bitmask %x%x",
@@ -2120,8 +2131,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
tmp_share->subscriber_bitmap[node_id].bitmap[1],
tmp_share->subscriber_bitmap[node_id].bitmap[0]);
}
- (void) pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&tmp_share->mutex);
+ mysql_cond_signal(&injector_cond);
break;
}
case NDBEVENT::TE_UNSUBSCRIBE:
@@ -2129,10 +2140,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
uint8 req_id= pOp->getReqNodeId();
DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- (void) pthread_mutex_lock(&tmp_share->mutex);
+ mysql_mutex_lock(&tmp_share->mutex);
bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
{
sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
" Subscriber bitmask %x%x",
@@ -2141,8 +2152,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
tmp_share->subscriber_bitmap[node_id].bitmap[1],
tmp_share->subscriber_bitmap[node_id].bitmap[0]);
}
- (void) pthread_mutex_unlock(&tmp_share->mutex);
- (void) pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&tmp_share->mutex);
+ mysql_cond_signal(&injector_cond);
break;
}
default:
@@ -2182,22 +2193,22 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
if (schema_type == SOT_CLEAR_SLOCK)
{
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
NDB_SCHEMA_OBJECT *ndb_schema_object=
(NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
(uchar*) key, strlen(key));
if (ndb_schema_object)
{
- pthread_mutex_lock(&ndb_schema_object->mutex);
+ mysql_mutex_lock(&ndb_schema_object->mutex);
memcpy(ndb_schema_object->slock, schema->slock,
sizeof(ndb_schema_object->slock));
DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
(uchar*)ndb_schema_object->slock_bitmap.bitmap,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
- pthread_mutex_unlock(&ndb_schema_object->mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&ndb_schema_object->mutex);
+ mysql_cond_signal(&injector_cond);
}
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
continue;
}
/* ndb_share reference temporary, free below */
@@ -2236,7 +2247,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= schema->db;
table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
+ close_cached_tables(thd, &table_list, FALSE, FALSE);
}
if (schema_type != SOT_ALTER_TABLE)
break;
@@ -2257,7 +2268,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
free_share(&share);
share= 0;
}
- pthread_mutex_lock(&LOCK_open);
+ mysql_mutex_lock(&LOCK_open);
if (ndbcluster_check_if_local_table(schema->db, schema->name))
{
DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
@@ -2271,7 +2282,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
{
print_could_not_discover_error(thd, schema);
}
- pthread_mutex_unlock(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
}
break;
default:
@@ -2323,22 +2334,20 @@ struct ndb_binlog_index_row {
/*
Open the ndb_binlog_index table
*/
-static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
- TABLE **ndb_binlog_index)
+static int open_ndb_binlog_index(THD *thd, TABLE **ndb_binlog_index)
{
static char repdb[]= NDB_REP_DB;
static char reptable[]= NDB_REP_TABLE;
const char *save_proc_info= thd->proc_info;
+ TABLE_LIST *tables= &binlog_tables;
- bzero((char*) tables, sizeof(*tables));
- tables->db= repdb;
- tables->alias= tables->table_name= reptable;
- tables->lock_type= TL_WRITE;
+ tables->init_one_table(repdb, strlen(repdb), reptable, strlen(reptable),
+ reptable, TL_WRITE);
thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
+
tables->required_type= FRMTYPE_TABLE;
- uint counter;
thd->clear_error();
- if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
+ if (open_and_lock_tables(thd, tables, FALSE, 0))
{
if (thd->killed)
sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
@@ -2364,36 +2373,18 @@ int ndb_add_ndb_binlog_index(THD *thd, void *_row)
{
ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
int error= 0;
- bool need_reopen;
/*
Turn of binlogging to prevent the table changes to be written to
the binary log.
*/
- ulong saved_options= thd->options;
- thd->options&= ~(OPTION_BIN_LOG);
+ ulong saved_options= thd->variables.option_bits;
+ thd->variables.option_bits&= ~OPTION_BIN_LOG;
- for ( ; ; ) /* loop for need_reopen */
+ if (!ndb_binlog_index && open_ndb_binlog_index(thd, &ndb_binlog_index))
{
- if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
- {
- error= -1;
- goto add_ndb_binlog_index_err;
- }
-
- if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
- {
- if (need_reopen)
- {
- TABLE_LIST *p_binlog_tables= &binlog_tables;
- close_tables_for_reopen(thd, &p_binlog_tables);
- ndb_binlog_index= 0;
- continue;
- }
- sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
- error= -1;
- goto add_ndb_binlog_index_err;
- }
- break;
+ sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
+ error= -1;
+ goto add_ndb_binlog_index_err;
}
/*
@@ -2418,14 +2409,10 @@ int ndb_add_ndb_binlog_index(THD *thd, void *_row)
goto add_ndb_binlog_index_err;
}
- mysql_unlock_tables(thd, thd->lock);
- thd->lock= 0;
- thd->options= saved_options;
- return 0;
add_ndb_binlog_index_err:
close_thread_tables(thd);
ndb_binlog_index= 0;
- thd->options= saved_options;
+ thd->variables.option_bits= saved_options;
return error;
}
@@ -2458,27 +2445,29 @@ int ndbcluster_binlog_start()
DBUG_RETURN(-1);
}
- pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&injector_cond, NULL);
- pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_injector_mutex, &injector_mutex, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_injector_cond, &injector_cond, NULL);
+ mysql_mutex_init(key_ndb_schema_share_mutex,
+ &ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
/* Create injector thread */
- if (pthread_create(&ndb_binlog_thread, &connection_attrib,
- ndb_binlog_thread_func, 0))
+ if (mysql_thread_create(key_thread_ndb_binlog,
+ &ndb_binlog_thread, &connection_attrib,
+ ndb_binlog_thread_func, 0))
{
DBUG_PRINT("error", ("Could not create ndb injector thread"));
- pthread_cond_destroy(&injector_cond);
- pthread_mutex_destroy(&injector_mutex);
+ mysql_cond_destroy(&injector_cond);
+ mysql_mutex_destroy(&injector_mutex);
DBUG_RETURN(-1);
}
ndbcluster_binlog_inited= 1;
/* Wait for the injector thread to start */
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
while (!ndb_binlog_thread_running)
- pthread_cond_wait(&injector_cond, &injector_mutex);
- pthread_mutex_unlock(&injector_mutex);
+ mysql_cond_wait(&injector_cond, &injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1);
@@ -2568,7 +2557,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
DBUG_ASSERT(strlen(key) == key_len);
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
/* Handle any trailing share */
NDB_SHARE *share= (NDB_SHARE*) my_hash_search(&ndbcluster_open_tables,
@@ -2580,7 +2569,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
share->op != 0 ||
share->op_old != 0)
{
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0); // replication already setup, or should not
}
}
@@ -2590,7 +2579,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
if (share->op || share->op_old)
{
my_errno= HA_ERR_TABLE_EXIST;
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(1);
}
if (!share_may_exist || share->connect_count !=
@@ -2633,10 +2622,10 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
if (!do_event_op)
{
share->flags|= NSF_NO_BINLOG;
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
while (share && !IS_TMP_PREFIX(table_name))
{
@@ -2655,7 +2644,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
const NDBTAB *ndbtab= ndbtab_g.get_table();
if (ndbtab == 0)
{
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
"%s, %d", key, dict->getNdbError().message,
dict->getNdbError().code);
@@ -2677,7 +2666,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
event_name.c_ptr());
break; // error
}
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: "
"CREATE (DISCOVER) TABLE Event: %s",
event_name.c_ptr());
@@ -2685,7 +2674,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
else
{
delete ev;
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
event_name.c_ptr());
}
@@ -2929,14 +2918,14 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
int retry_sleep= 100;
while (1)
{
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
Ndb *ndb= injector_ndb;
if (do_ndb_schema_share)
ndb= schema_ndb;
if (ndb == 0)
{
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1);
}
@@ -2961,7 +2950,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
ndb->getNdbError().code,
ndb->getNdbError().message,
"NDB");
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1);
}
@@ -3011,7 +3000,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
op->getNdbError().message,
"NDB");
ndb->dropEventOperation(op);
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1);
}
}
@@ -3053,7 +3042,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
op->getNdbError().code, op->getNdbError().message);
}
ndb->dropEventOperation(op);
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
if (retries)
{
my_sleep(retry_sleep);
@@ -3061,7 +3050,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
}
DBUG_RETURN(-1);
}
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
break;
}
@@ -3075,7 +3064,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
ndb_apply_status_share= get_share(share);
DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
share->key, share->use_count));
- (void) pthread_cond_signal(&injector_cond);
+ mysql_cond_signal(&injector_cond);
}
else if (do_ndb_schema_share)
{
@@ -3083,13 +3072,13 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
ndb_schema_share= get_share(share);
DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
share->key, share->use_count));
- (void) pthread_cond_signal(&injector_cond);
+ mysql_cond_signal(&injector_cond);
}
DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
share->key, (long) share->op, share->use_count));
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: logging %s", share->key);
DBUG_RETURN(0);
}
@@ -3154,17 +3143,17 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
#define SYNC_DROP_
#ifdef SYNC_DROP_
thd->proc_info= "Syncing ndb table schema operation and binlog";
- (void) pthread_mutex_lock(&share->mutex);
- safe_mutex_assert_owner(&LOCK_open);
- (void) pthread_mutex_unlock(&LOCK_open);
- int max_timeout= opt_ndb_sync_timeout;
+ mysql_mutex_lock(&share->mutex);
+ mysql_mutex_assert_owner(&LOCK_open);
+ mysql_mutex_unlock(&LOCK_open);
+ int max_timeout= DEFAULT_SYNC_TIMEOUT;
while (share->op)
{
struct timespec abstime;
set_timespec(abstime, 1);
- int ret= pthread_cond_timedwait(&injector_cond,
- &share->mutex,
- &abstime);
+ int ret= mysql_cond_timedwait(&injector_cond,
+ &share->mutex,
+ &abstime);
if (thd->killed ||
share->op == 0)
break;
@@ -3177,18 +3166,18 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
type_str, share->key);
break;
}
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
type_str, share->key);
}
}
- (void) pthread_mutex_lock(&LOCK_open);
- (void) pthread_mutex_unlock(&share->mutex);
+ mysql_mutex_lock(&LOCK_open);
+ mysql_mutex_unlock(&share->mutex);
#else
- (void) pthread_mutex_lock(&share->mutex);
+ mysql_mutex_lock(&share->mutex);
share->op_old= share->op;
share->op= 0;
- (void) pthread_mutex_unlock(&share->mutex);
+ mysql_mutex_unlock(&share->mutex);
#endif
thd->proc_info= save_proc_info;
@@ -3255,12 +3244,12 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
switch (type)
{
case NDBEVENT::TE_CLUSTER_FAILURE:
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
share->key, (unsigned) pOp->getGCI());
if (ndb_apply_status_share == share)
{
- if (ndb_extra_logging &&
+ if (opt_ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
@@ -3280,7 +3269,7 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
case NDBEVENT::TE_DROP:
if (ndb_apply_status_share == share)
{
- if (ndb_extra_logging &&
+ if (opt_ndb_extra_logging &&
ndb_binlog_tables_inited && ndb_binlog_running)
sql_print_information("NDB Binlog: ndb tables initially "
"read only on reconnect.");
@@ -3292,7 +3281,7 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
ndb_binlog_tables_inited= 0;
}
/* ToDo: remove printout */
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
sql_print_information("NDB Binlog: drop table %s.", share->key);
// fall through
case NDBEVENT::TE_ALTER:
@@ -3556,7 +3545,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
DBUG_PRINT("enter", ("key: '%s'", key));
if (!have_lock)
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
while (!(ndb_schema_object=
(NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
(uchar*) key,
@@ -3582,7 +3571,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
my_free((uchar*) ndb_schema_object, 0);
break;
}
- pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_ndb_schema_object_mutex, &ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
sizeof(ndb_schema_object->slock)*8, FALSE);
bitmap_clear_all(&ndb_schema_object->slock_bitmap);
@@ -3594,7 +3583,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
}
if (!have_lock)
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(ndb_schema_object);
}
@@ -3605,12 +3594,12 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
DBUG_ENTER("ndb_free_schema_object");
DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
if (!have_lock)
- pthread_mutex_lock(&ndbcluster_mutex);
+ mysql_mutex_lock(&ndbcluster_mutex);
if (!--(*ndb_schema_object)->use_count)
{
DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
- pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
+ mysql_mutex_destroy(&(*ndb_schema_object)->mutex);
my_free((uchar*) *ndb_schema_object, MYF(0));
*ndb_schema_object= 0;
}
@@ -3619,10 +3608,12 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
}
if (!have_lock)
- pthread_mutex_unlock(&ndbcluster_mutex);
+ mysql_mutex_unlock(&ndbcluster_mutex);
DBUG_VOID_RETURN;
}
+extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
+extern ulong opt_ndb_report_thresh_binlog_mem_usage;
pthread_handler_t ndb_binlog_thread_func(void *arg)
{
@@ -3638,7 +3629,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Timer main_timer;
#endif
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
/*
Set up the Thread
*/
@@ -3647,13 +3638,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd= new THD; /* note that contructor of THD uses DBUG_ */
THD_CHECK_SENTRY(thd);
+ thd->set_current_stmt_binlog_format_row();
/* We need to set thd->thread_id before thd->store_globals, or it will
set an invalid value for thd->variables.pseudo_thread_id.
*/
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ mysql_thread_set_psi_id(thd->thread_id);
thd->thread_stack= (char*) &thd; /* remember where our stack is */
if (thd->store_globals())
@@ -3661,15 +3655,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->cleanup();
delete thd;
ndb_binlog_thread_running= -1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&injector_mutex);
+ mysql_cond_signal(&injector_cond);
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
pthread_exit(0);
return NULL; // Avoid compiler warnings
}
- lex_start(thd);
thd->init_for_queries();
thd->command= COM_DAEMON;
@@ -3680,6 +3673,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
my_net_init(&thd->net, 0);
thd->main_security_ctx.master_access= ~0;
thd->main_security_ctx.priv_user= 0;
+ /* Do not use user-supplied timeout value for system threads. */
+ thd->variables.lock_wait_timeout= LONG_TIMEOUT;
/*
Set up ndb binlog
@@ -3688,9 +3683,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_detach_this_thread();
thd->real_id= pthread_self();
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
thd->lex->start_transaction_opt= 0;
if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
@@ -3698,8 +3693,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
ndb_binlog_thread_running= -1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&injector_mutex);
+ mysql_cond_signal(&injector_cond);
goto err;
}
@@ -3709,8 +3704,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
sql_print_error("NDB Binlog: Getting Ndb object failed");
ndb_binlog_thread_running= -1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&injector_mutex);
+ mysql_cond_signal(&injector_cond);
goto err;
}
@@ -3723,7 +3718,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Used by both sql client thread and binlog thread to interact
with the storage
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
*/
injector_thd= thd;
injector_ndb= i_ndb;
@@ -3738,27 +3733,27 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
/* Thread start up completed */
ndb_binlog_thread_running= 1;
- pthread_mutex_unlock(&injector_mutex);
- pthread_cond_signal(&injector_cond);
+ mysql_mutex_unlock(&injector_mutex);
+ mysql_cond_signal(&injector_cond);
/*
wait for mysql server to start (so that the binlog is started
and thus can receive the first GAP event)
*/
- pthread_mutex_lock(&LOCK_server_started);
+ mysql_mutex_lock(&LOCK_server_started);
while (!mysqld_server_started)
{
struct timespec abstime;
set_timespec(abstime, 1);
- pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
- &abstime);
+ mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
+ &abstime);
if (ndbcluster_terminating)
{
- pthread_mutex_unlock(&LOCK_server_started);
+ mysql_mutex_unlock(&LOCK_server_started);
goto err;
}
}
- pthread_mutex_unlock(&LOCK_server_started);
+ mysql_mutex_unlock(&LOCK_server_started);
restart:
/*
Main NDB Injector loop
@@ -3801,21 +3796,21 @@ restart:
{
thd->proc_info= "Waiting for ndbcluster to start";
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
while (!ndb_schema_share ||
(ndb_binlog_running && !ndb_apply_status_share))
{
/* ndb not connected yet */
struct timespec abstime;
set_timespec(abstime, 1);
- pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
+ mysql_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
if (ndbcluster_binlog_terminating)
{
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
goto err;
}
}
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
if (thd_ndb == NULL)
{
@@ -3882,7 +3877,7 @@ restart:
"Changes to the database that occured while "
"disconnected will not be in the binlog");
}
- if (ndb_extra_logging)
+ if (opt_ndb_extra_logging)
{
sql_print_information("NDB Binlog: starting log at epoch %u",
(unsigned)schema_gci);
@@ -3892,9 +3887,6 @@ restart:
{
static char db[]= "";
thd->db= db;
- if (ndb_binlog_running)
- open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index);
- thd->db= db;
}
do_ndbcluster_binlog_close_connection= BCCC_running;
for ( ; !((ndbcluster_binlog_terminating ||
@@ -3984,9 +3976,9 @@ restart:
{
thd->proc_info= "Processing events from schema table";
s_ndb->
- setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
+ setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
s_ndb->
- setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+ setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
NdbEventOperation *pOp= s_ndb->nextEvent();
while (pOp != NULL)
{
@@ -4049,8 +4041,8 @@ restart:
/* initialize some variables for this epoch */
g_ndb_log_slave_updates= opt_log_slave_updates;
i_ndb->
- setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
- i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+ setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
+ i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
bzero((char*) &row, sizeof(row));
thd->variables.character_set_client= &my_charset_latin1;
@@ -4286,13 +4278,13 @@ err:
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
thd->proc_info= "Shutting down";
close_thread_tables(thd);
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
injector_thd= 0;
injector_ndb= 0;
p_latest_trans_gci= 0;
schema_ndb= 0;
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory
if (ndb_apply_status_share)
@@ -4307,7 +4299,7 @@ err:
if (ndb_schema_share)
{
/* begin protect ndb_schema_share */
- pthread_mutex_lock(&ndb_schema_share_mutex);
+ mysql_mutex_lock(&ndb_schema_share_mutex);
/* ndb_share reference binlog extra free */
DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
ndb_schema_share->key,
@@ -4315,7 +4307,7 @@ err:
free_share(&ndb_schema_share);
ndb_schema_share= 0;
ndb_binlog_tables_inited= 0;
- pthread_mutex_unlock(&ndb_schema_share_mutex);
+ mysql_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
}
@@ -4375,7 +4367,7 @@ err:
ndb_binlog_thread_running= -1;
ndb_binlog_running= FALSE;
- (void) pthread_cond_signal(&injector_cond);
+ mysql_cond_signal(&injector_cond);
DBUG_PRINT("exit", ("ndb_binlog_thread"));
@@ -4394,12 +4386,12 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
ulonglong ndb_latest_epoch= 0;
DBUG_ENTER("ndbcluster_show_status_binlog");
- pthread_mutex_lock(&injector_mutex);
+ mysql_mutex_lock(&injector_mutex);
if (injector_ndb)
{
char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
ndb_latest_epoch= injector_ndb->getLatestGCI();
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
buflen=
snprintf(buf, sizeof(buf),
@@ -4419,7 +4411,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
DBUG_RETURN(TRUE);
}
else
- pthread_mutex_unlock(&injector_mutex);
+ mysql_mutex_unlock(&injector_mutex);
DBUG_RETURN(FALSE);
}