summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com>2006-03-21 16:54:56 +0100
committerunknown <tomas@poseidon.ndb.mysql.com>2006-03-21 16:54:56 +0100
commitf64b1f359d363ff55538e08e944486ae7a4aaf50 (patch)
treeaf454c52fbae4e508420a5e54115b02c3f7525ce /sql
parentc8caeebf854d8df94b39c72d3003245fa1d372db (diff)
downloadmariadb-git-f64b1f359d363ff55538e08e944486ae7a4aaf50.tar.gz
Bug #18395 dual mysqld's with binlog do not sync drop table binlog event correctly
- in ndb_binlog_thread_handle_schema_event make sure to postpone "slock achnowledgement" until event has been put into the binlog + some cleanup, Cluster_replication_schem -> Cluster_schema, indentation
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster_binlog.cc114
1 files changed, 62 insertions, 52 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 15362cde681..36171e87ffe 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -804,7 +804,7 @@ void ndbcluster_setup_binlog_table_shares(THD *thd)
#define SCHEMA_SLOCK_SIZE 32u
#define SCHEMA_QUERY_SIZE 4096u
-struct Cluster_replication_schema
+struct Cluster_schema
{
unsigned char db_length;
char db[64];
@@ -825,7 +825,7 @@ struct Cluster_replication_schema
Transfer schema table data into corresponding struct
*/
static void ndbcluster_get_schema(TABLE *table,
- Cluster_replication_schema *s)
+ Cluster_schema *s)
{
Field **field;
/* db varchar 1 length byte */
@@ -1153,7 +1153,8 @@ end:
/* remove any unsubscribed from share->slock */
bitmap_intersect(&share->slock_bitmap, &schema_subscribers);
- DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap,
+ DBUG_DUMP("share->slock_bitmap.bitmap",
+ (char*)share->slock_bitmap.bitmap,
no_bytes_in_map(&share->slock_bitmap));
if (bitmap_is_clear_all(&share->slock_bitmap))
@@ -1484,9 +1485,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
static int
ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NdbEventOperation *pOp,
- List<Cluster_replication_schema>
+ List<Cluster_schema>
*post_epoch_log_list,
- List<Cluster_replication_schema>
+ List<Cluster_schema>
*post_epoch_unlock_list,
MEM_ROOT *mem_root)
{
@@ -1497,48 +1498,51 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
NDBEVENT::TableEvent ev_type= pOp->getEventType();
DBUG_PRINT("enter", ("%s.%s ev_type: %d",
share->db, share->table_name, ev_type));
- switch (ev_type)
- {
- case NDBEVENT::TE_UPDATE:
- /* fall through */
- case NDBEVENT::TE_INSERT:
+ if (ev_type == NDBEVENT::TE_UPDATE ||
+ ev_type == NDBEVENT::TE_INSERT)
{
- Cluster_replication_schema *schema= (Cluster_replication_schema *)
- sql_alloc(sizeof(Cluster_replication_schema));
+ Cluster_schema *schema= (Cluster_schema *)
+ sql_alloc(sizeof(Cluster_schema));
MY_BITMAP slock;
bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(share->table, schema);
if (schema->node_id != node_id)
{
- int log_query= 0;
+ int log_query= 0, post_epoch_unlock= 0;
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
schema->query_length, schema->query));
char key[FN_REFLEN];
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false);
-
switch ((enum SCHEMA_OP_TYPE)schema->type)
{
case SOT_DROP_TABLE:
/* binlog dropping table after any table operations */
if (share && share->op)
+ {
post_epoch_log_list->push_back(schema, mem_root);
+ /* acknowledge this query _after_ epoch completion */
+ post_epoch_unlock= 1;
+ }
+ /* table is either ignored or logging is postponed to later */
log_query= 0;
break;
case SOT_RENAME_TABLE:
if (share && share->op)
{
- log_query= 0;
post_epoch_log_list->push_back(schema, mem_root);
+ /* acknowledge this query _after_ epoch completion */
+ post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */
}
goto sot_create_table;
case SOT_ALTER_TABLE:
if (share && share->op)
{
- log_query= 0;
post_epoch_log_list->push_back(schema, mem_root);
+ /* acknowledge this query _after_ epoch completion */
+ post_epoch_unlock= 1;
break; /* discovery will be handled by binlog */
}
goto sot_create_table;
@@ -1571,8 +1575,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
TRUE); /* don't binlog the query */
/* binlog dropping database after any table operations */
if (ndb_binlog_running)
+ {
post_epoch_log_list->push_back(schema, mem_root);
- log_query= 0;
+ /* acknowledge this query _after_ epoch completion */
+ post_epoch_unlock= 1;
+ }
break;
case SOT_CREATE_DB:
/* fall through */
@@ -1581,7 +1588,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema->query + schema->query_length,
TRUE, /* print error */
FALSE); /* binlog the query */
- log_query= 0;
break;
case SOT_CLEAR_SLOCK:
{
@@ -1609,25 +1615,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
free_share(&share);
share= 0;
}
- /* signal that schema operation has been handled */
- if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
- {
- DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
- if (bitmap_is_set(&slock, node_id))
- {
- /*
- If it is an SOT_ALTER_TABLE we need to acknowledge the
- schema operation _after_ all the events have been
- processed so that all schema events coming through
- the event operation has been processed
- */
- if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE)
- post_epoch_unlock_list->push_back(schema, mem_root);
- else
- ndbcluster_update_slock(thd, schema->db, schema->name);
- }
- }
-
if (log_query && ndb_binlog_running)
{
char *thd_db_save= thd->db;
@@ -1637,9 +1624,23 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema->name[0] == 0 || thd->db[0] == 0);
thd->db= thd_db_save;
}
+ /* signal that schema operation has been handled */
+ DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
+ if (bitmap_is_set(&slock, node_id))
+ {
+ if (post_epoch_unlock)
+ post_epoch_unlock_list->push_back(schema, mem_root);
+ else
+ ndbcluster_update_slock(thd, schema->db, schema->name);
+ }
}
+ DBUG_RETURN(0);
}
- break;
+ /*
+ the normal case of UPDATE/INSERT has already been handled
+ */
+ switch (ev_type)
+ {
case NDBEVENT::TE_DELETE:
// skip
break;
@@ -1726,13 +1727,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
*/
static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
- List<Cluster_replication_schema>
+ List<Cluster_schema>
*post_epoch_log_list,
- List<Cluster_replication_schema>
+ List<Cluster_schema>
*post_epoch_unlock_list)
{
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
- Cluster_replication_schema *schema;
+ Cluster_schema *schema;
while ((schema= post_epoch_log_list->pop()))
{
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
@@ -2120,7 +2121,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
}
if (share->flags & NSF_NO_BINLOG)
{
- DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG));
+ DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
+ share->flags, share->flags & NSF_NO_BINLOG));
DBUG_RETURN(0);
}
@@ -2137,7 +2139,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
share->key);
if (push_warning)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION),
+ ER_ILLEGAL_HA_CREATE_OPTION,
+ ER(ER_ILLEGAL_HA_CREATE_OPTION),
ndbcluster_hton.name,
"Binlog of table with BLOB attribute and no PK");
@@ -2268,7 +2271,8 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
if (share->flags & NSF_NO_BINLOG)
{
- DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags));
+ DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
+ share->flags));
DBUG_RETURN(0);
}
@@ -2690,7 +2694,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
{
case NDBEVENT::TE_INSERT:
row.n_inserts++;
- DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str));
+ DBUG_PRINT("info", ("INSERT INTO %s.%s",
+ table_s->db.str, table_s->table_name.str));
{
if (share->flags & NSF_BLOB_FLAG)
{
@@ -2701,14 +2706,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
- int ret= trans.write_row(::server_id, injector::transaction::table(table, true),
+ int ret= trans.write_row(::server_id,
+ injector::transaction::table(table, true),
&b, n_fields, table->record[0]);
DBUG_ASSERT(ret == 0);
}
break;
case NDBEVENT::TE_DELETE:
row.n_deletes++;
- DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str));
+ DBUG_PRINT("info",("DELETE FROM %s.%s",
+ table_s->db.str, table_s->table_name.str));
{
/*
table->record[0] contains only the primary key in this case
@@ -2737,14 +2744,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
}
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
DBUG_EXECUTE("info", print_records(table, table->record[n]););
- int ret= trans.delete_row(::server_id, injector::transaction::table(table, true),
+ int ret= trans.delete_row(::server_id,
+ injector::transaction::table(table, true),
&b, n_fields, table->record[n]);
DBUG_ASSERT(ret == 0);
}
break;
case NDBEVENT::TE_UPDATE:
row.n_updates++;
- DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str));
+ DBUG_PRINT("info", ("UPDATE %s.%s",
+ table_s->db.str, table_s->table_name.str));
{
if (share->flags & NSF_BLOB_FLAG)
{
@@ -3025,15 +3034,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
MEM_ROOT *old_root= *root_ptr;
MEM_ROOT mem_root;
init_sql_alloc(&mem_root, 4096, 0);
- List<Cluster_replication_schema> post_epoch_log_list;
- List<Cluster_replication_schema> post_epoch_unlock_list;
+ List<Cluster_schema> post_epoch_log_list;
+ List<Cluster_schema> post_epoch_unlock_list;
*root_ptr= &mem_root;
if (unlikely(schema_res > 0))
{
schema_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
- schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+ schema_ndb->
+ setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
NdbEventOperation *pOp= schema_ndb->nextEvent();
while (pOp != NULL)
{