summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc52
1 files changed, 39 insertions, 13 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 99adc8a5b21..d99b93e67df 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
share->table= 0;
if (ndb_binlog_thread_running <= 0)
{
- DBUG_ASSERT(_table != 0);
- if (_table->s->primary_key == MAX_KEY)
- share->flags|= NSF_HIDDEN_PK;
- if (_table->s->blob_fields != 0)
- share->flags|= NSF_BLOB_FLAG;
+ if (_table)
+ {
+ if (_table->s->primary_key == MAX_KEY)
+ share->flags|= NSF_HIDDEN_PK;
+ if (_table->s->blob_fields != 0)
+ share->flags|= NSF_BLOB_FLAG;
+ }
+ else
+ {
+ share->flags|= NSF_NO_BINLOG;
+ }
return;
}
while (1)
@@ -1626,32 +1632,51 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
create/discover.
*/
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
+ uint key_len,
const char *db,
const char *table_name,
- NDB_SHARE *share)
+ my_bool share_may_exist)
{
DBUG_ENTER("ndbcluster_create_binlog_setup");
+ DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
+ key, key_len, db, table_name, share_may_exist));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
+ DBUG_ASSERT(strlen(key) == key_len);
pthread_mutex_lock(&ndbcluster_mutex);
/* Handle any trailing share */
- if (share == 0)
+ NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key, key_len);
+ if (share && share_may_exist)
{
- share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
- (byte*) key, strlen(key));
- if (share)
- handle_trailing_share(share);
+ if (share->flags & NSF_NO_BINLOG ||
+ share->op != 0 ||
+ share->op_old != 0)
+ {
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(0); // replication already setup, or should not
+ }
}
- else
+
+ if (share)
+ {
handle_trailing_share(share);
-
+ }
+
/* Create share which is needed to hold replication information */
if (!(share= get_share(key, 0, true, true)))
{
sql_print_error("NDB Binlog: "
"allocating table share for %s failed", key);
}
+
+ if (ndb_binlog_thread_running <= 0)
+ {
+ share->flags|= NSF_NO_BINLOG;
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_RETURN(0);
+ }
pthread_mutex_unlock(&ndbcluster_mutex);
while (share && !IS_TMP_PREFIX(table_name))
@@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
sql_print_error("NDB Binlog: logging of table %s "
"with no PK and blob attributes is not supported",
share->key);
+ share->flags|= NSF_NO_BINLOG;
DBUG_RETURN(-1);
}
/* No primary key, subscribe for all attributes */