summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com>2006-03-11 06:58:48 +0100
committerunknown <tomas@poseidon.ndb.mysql.com>2006-03-11 06:58:48 +0100
commit435e084481f56879ba633d9633e5b8a2cbb3ab0b (patch)
tree38ed6335c091e2a32f3454e532b00e7f96cb89d4 /sql
parent29c9ca33b7c07d582d04d2efbb501100c10035ff (diff)
downloadmariadb-git-435e084481f56879ba633d9633e5b8a2cbb3ab0b.tar.gz
wl#3023 ndb to return correct tables for initial table maps
+ removed extra binlog events generated by drop table schema ops to produce predictable test cases storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp: ndb: dict use define for number of pages in table definition
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster_binlog.cc149
-rw-r--r--sql/rpl_injector.cc3
2 files changed, 103 insertions, 49 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 3d84647e0db..e24e47e0356 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -2651,7 +2651,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
return 0;
TABLE *table= share->table;
- assert(table != 0);
+ DBUG_ASSERT(trans.good());
+ DBUG_ASSERT(table != 0);
dbug_print_table("table", table);
@@ -3051,66 +3052,100 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Binlog_index_row row;
while (pOp != NULL)
{
+ gci= pOp->getGCI();
+ DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
// sometimes get TE_ALTER with invalid table
DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
+ DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
+
ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
- assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
bzero((char*) &row, sizeof(row));
- injector::transaction trans= inj->new_trans(thd);
- { // pass table map before epoch
- Uint32 iter=0;
- const NdbEventOperation* gci_op;
+ injector::transaction trans;
+ // pass table map before epoch
+ {
+ Uint32 iter= 0;
+ const NdbEventOperation *gci_op;
Uint32 event_types;
- while ((gci_op=ndb->getGCIEventOperations(&iter, &event_types))
- != NULL)
+ while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+ != NULL)
{
- NDB_SHARE* share=(NDB_SHARE*)gci_op->getCustomData();
- DBUG_PRINT("info", ("per gci op %p share %p event types 0x%x",
+ NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
+ DBUG_PRINT("info", ("per gci_op: %p share: %p event_types: 0x%x",
gci_op, share, event_types));
+ // workaround for interface returning TE_STOP events
+ // which are normally filtered out below in the nextEvent loop
+ if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
+ {
+ DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
+ gci_op->getEvent()->getTable()->getName()));
+ continue;
+ }
// this should not happen
if (share == NULL || share->table == NULL)
{
- DBUG_PRINT("info", ("no share or table !"));
+ DBUG_PRINT("info", ("no share or table %s!",
+ gci_op->getEvent()->getTable()->getName()));
continue;
}
- TABLE* table=share->table;
- const LEX_STRING& name=table->s->table_name;
+ if (share == apply_status_share)
+ {
+ // skip this table, it is handled specially
+ continue;
+ }
+ TABLE *table= share->table;
+ const LEX_STRING &name= table->s->table_name;
+ if ((event_types & (NdbDictionary::Event::TE_INSERT |
+ NdbDictionary::Event::TE_UPDATE |
+ NdbDictionary::Event::TE_DELETE)) == 0)
+ {
+ DBUG_PRINT("info", ("skipping non data event table: %.*s",
+ name.length, name.str));
+ continue;
+ }
+ if (!trans.good())
+ {
+ DBUG_PRINT("info",
+ ("Found new data event, initializing transaction"));
+ inj->new_trans(thd, &trans);
+ }
DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
injector::transaction::table tbl(table, true);
// TODO enable when mats patch pushed
//trans.use_table(::server_id, tbl);
}
}
- gci= pOp->getGCI();
- if (apply_status_share)
- {
- TABLE *table= apply_status_share->table;
-
- const LEX_STRING& name=table->s->table_name;
- DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
- injector::transaction::table tbl(table, true);
- // TODO enable when mats patch pushed
- //trans.use_table(::server_id, tbl);
-
- MY_BITMAP b;
- uint32 bitbuf;
- DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
- bitmap_init(&b, &bitbuf, table->s->fields, false);
- bitmap_set_all(&b);
- table->field[0]->store((longlong)::server_id);
- table->field[1]->store((longlong)gci);
- trans.write_row(::server_id,
- injector::transaction::table(table, true),
- &b, table->s->fields,
- table->record[0]);
- }
- else
+ if (trans.good())
{
- sql_print_error("NDB: Could not get apply status share");
+ if (apply_status_share)
+ {
+ TABLE *table= apply_status_share->table;
+
+ const LEX_STRING& name=table->s->table_name;
+ DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
+ injector::transaction::table tbl(table, true);
+ // TODO enable when mats patch pushed
+ //trans.use_table(::server_id, tbl);
+
+ MY_BITMAP b;
+ uint32 bitbuf;
+ DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
+ bitmap_init(&b, &bitbuf, table->s->fields, false);
+ bitmap_set_all(&b);
+ table->field[0]->store((longlong)::server_id);
+ table->field[1]->store((longlong)gci);
+ trans.write_row(::server_id,
+ injector::transaction::table(table, true),
+ &b, table->s->fields,
+ table->record[0]);
+ }
+ else
+ {
+ sql_print_error("NDB: Could not get apply status share");
+ }
}
#ifdef RUN_NDB_BINLOG_TIMER
write_timer.start();
@@ -3128,11 +3163,28 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
DBUG_PRINT("info",
- ("EVENT TYPE:%d GCI:%lld last applied: %lld "
- "share: 0x%lx", pOp->getEventType(), gci,
- ndb_latest_applied_binlog_epoch, share));
+ ("EVENT TYPE: %d GCI: %lld last applied: %lld "
+ "share: 0x%lx (%s.%s)", pOp->getEventType(), gci,
+ ndb_latest_applied_binlog_epoch, share,
+ share ? share->db : "share == NULL",
+ share ? share->table_name : ""));
DBUG_ASSERT(share != 0);
}
+ // assert that there is consistancy between gci op list
+ // and event list
+ {
+ Uint32 iter= 0;
+ const NdbEventOperation *gci_op;
+ Uint32 event_types;
+ while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+ != NULL)
+ {
+ if (gci_op == pOp)
+ break;
+ }
+ DBUG_ASSERT(gci_op == pOp);
+ DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
+ }
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
@@ -3140,8 +3192,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
else
{
// set injector_ndb database/schema from table internal name
- int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
- assert(ret == 0);
+ int ret=
+ ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
+ DBUG_ASSERT(ret == 0);
ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
// reset to catch errors
ndb->setDatabaseName("");
@@ -3158,13 +3211,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
write_timer.stop();
#endif
- if (row.n_inserts || row.n_updates
- || row.n_deletes || row.n_schemaops)
+ if (trans.good())
{
+ DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
injector::transaction::binlog_pos start= trans.start_pos();
if (int r= trans.commit())
{
- sql_print_error("NDB binlog:"
+ sql_print_error("NDB binlog: "
"Error during COMMIT of GCI. Error: %d",
r);
/* TODO: Further handling? */
@@ -3173,13 +3226,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
row.master_log_file= start.file_name();
row.master_log_pos= start.file_pos();
- DBUG_PRINT("info",("COMMIT gci %lld",gci));
+ DBUG_PRINT("info", ("COMMIT gci: %lld", gci));
if (ndb_update_binlog_index)
ndb_add_binlog_index(thd, &row);
ndb_latest_applied_binlog_epoch= gci;
}
- else
- trans.commit();
ndb_latest_handled_binlog_epoch= gci;
#ifdef RUN_NDB_BINLOG_TIMER
gci_timer.stop();
diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc
index a69dea9a158..bf40ead070c 100644
--- a/sql/rpl_injector.cc
+++ b/sql/rpl_injector.cc
@@ -43,6 +43,9 @@ injector::transaction::transaction(MYSQL_LOG *log, THD *thd)
injector::transaction::~transaction()
{
+ if (!good())
+ return;
+
/* Needed since my_free expects a 'char*' (instead of 'void*'). */
char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);