diff options
author | unknown <tomas@poseidon.ndb.mysql.com> | 2006-03-11 06:58:48 +0100 |
---|---|---|
committer | unknown <tomas@poseidon.ndb.mysql.com> | 2006-03-11 06:58:48 +0100 |
commit | 435e084481f56879ba633d9633e5b8a2cbb3ab0b (patch) | |
tree | 38ed6335c091e2a32f3454e532b00e7f96cb89d4 /sql | |
parent | 29c9ca33b7c07d582d04d2efbb501100c10035ff (diff) | |
download | mariadb-git-435e084481f56879ba633d9633e5b8a2cbb3ab0b.tar.gz |
wl#3023 ndb to return correct tables for initial table maps
+ removed extra binlog events generated by drop table schema ops to produce predictable test cases
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
ndb: dict use define for number of pages in table definition
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 149 | ||||
-rw-r--r-- | sql/rpl_injector.cc | 3 |
2 files changed, 103 insertions, 49 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 3d84647e0db..e24e47e0356 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -2651,7 +2651,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, return 0; TABLE *table= share->table; - assert(table != 0); + DBUG_ASSERT(trans.good()); + DBUG_ASSERT(table != 0); dbug_print_table("table", table); @@ -3051,66 +3052,100 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Binlog_index_row row; while (pOp != NULL) { + gci= pOp->getGCI(); + DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci)); // sometimes get TE_ALTER with invalid table DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER || ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName())); + DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch); + ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); - assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch); bzero((char*) &row, sizeof(row)); - injector::transaction trans= inj->new_trans(thd); - { // pass table map before epoch - Uint32 iter=0; - const NdbEventOperation* gci_op; + injector::transaction trans; + // pass table map before epoch + { + Uint32 iter= 0; + const NdbEventOperation *gci_op; Uint32 event_types; - while ((gci_op=ndb->getGCIEventOperations(&iter, &event_types)) - != NULL) + while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) + != NULL) { - NDB_SHARE* share=(NDB_SHARE*)gci_op->getCustomData(); - DBUG_PRINT("info", ("per gci op %p share %p event types 0x%x", + NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData(); + DBUG_PRINT("info", ("per gci_op: %p share: %p event_types: 0x%x", gci_op, share, event_types)); + // workaround for interface returning TE_STOP events + // which are normally filtered out below in the nextEvent loop + if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0) + { + DBUG_PRINT("info", ("Skipped TE_STOP on table %s", + gci_op->getEvent()->getTable()->getName())); + continue; + } // this should not happen if (share == NULL || share->table == NULL) { - DBUG_PRINT("info", ("no share or table !")); + DBUG_PRINT("info", ("no share or table %s!", + gci_op->getEvent()->getTable()->getName())); continue; } - TABLE* table=share->table; - const LEX_STRING& name=table->s->table_name; + if (share == apply_status_share) + { + // skip this table, it is handled specially + continue; + } + TABLE *table= share->table; + const LEX_STRING &name= table->s->table_name; + if ((event_types & (NdbDictionary::Event::TE_INSERT | + NdbDictionary::Event::TE_UPDATE | + NdbDictionary::Event::TE_DELETE)) == 0) + { + DBUG_PRINT("info", ("skipping non data event table: %.*s", + name.length, name.str)); + continue; + } + if (!trans.good()) + { + DBUG_PRINT("info", + ("Found new data event, initializing transaction")); + inj->new_trans(thd, &trans); + } DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str)); injector::transaction::table tbl(table, true); // TODO enable when mats patch pushed //trans.use_table(::server_id, tbl); } } - gci= pOp->getGCI(); - if (apply_status_share) - { - TABLE *table= apply_status_share->table; - - const LEX_STRING& name=table->s->table_name; - DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str)); - injector::transaction::table tbl(table, true); - // TODO enable when mats patch pushed - //trans.use_table(::server_id, tbl); - - MY_BITMAP b; - uint32 bitbuf; - DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8); - bitmap_init(&b, &bitbuf, table->s->fields, false); - bitmap_set_all(&b); - table->field[0]->store((longlong)::server_id); - table->field[1]->store((longlong)gci); - trans.write_row(::server_id, - injector::transaction::table(table, true), - &b, table->s->fields, - table->record[0]); - } - else + if (trans.good()) { - sql_print_error("NDB: Could not get apply status share"); + if (apply_status_share) + { + TABLE *table= apply_status_share->table; + + const LEX_STRING& name=table->s->table_name; + DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str)); + injector::transaction::table tbl(table, true); + // TODO enable when mats patch pushed + //trans.use_table(::server_id, tbl); + + MY_BITMAP b; + uint32 bitbuf; + DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8); + bitmap_init(&b, &bitbuf, table->s->fields, false); + bitmap_set_all(&b); + table->field[0]->store((longlong)::server_id); + table->field[1]->store((longlong)gci); + trans.write_row(::server_id, + injector::transaction::table(table, true), + &b, table->s->fields, + table->record[0]); + } + else + { + sql_print_error("NDB: Could not get apply status share"); + } } #ifdef RUN_NDB_BINLOG_TIMER write_timer.start(); @@ -3128,11 +3163,28 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData(); DBUG_PRINT("info", - ("EVENT TYPE:%d GCI:%lld last applied: %lld " - "share: 0x%lx", pOp->getEventType(), gci, - ndb_latest_applied_binlog_epoch, share)); + ("EVENT TYPE: %d GCI: %lld last applied: %lld " + "share: 0x%lx (%s.%s)", pOp->getEventType(), gci, + ndb_latest_applied_binlog_epoch, share, + share ? share->db : "share == NULL", + share ? share->table_name : "")); DBUG_ASSERT(share != 0); } + // assert that there is consistancy between gci op list + // and event list + { + Uint32 iter= 0; + const NdbEventOperation *gci_op; + Uint32 event_types; + while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) + != NULL) + { + if (gci_op == pOp) + break; + } + DBUG_ASSERT(gci_op == pOp); + DBUG_ASSERT((event_types & pOp->getEventType()) != 0); + } #endif if ((unsigned) pOp->getEventType() < (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) @@ -3140,8 +3192,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) else { // set injector_ndb database/schema from table internal name - int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); - assert(ret == 0); + int ret= + ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); + DBUG_ASSERT(ret == 0); ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); // reset to catch errors ndb->setDatabaseName(""); @@ -3158,13 +3211,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) write_timer.stop(); #endif - if (row.n_inserts || row.n_updates - || row.n_deletes || row.n_schemaops) + if (trans.good()) { + DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes); injector::transaction::binlog_pos start= trans.start_pos(); if (int r= trans.commit()) { - sql_print_error("NDB binlog:" + sql_print_error("NDB binlog: " "Error during COMMIT of GCI. Error: %d", r); /* TODO: Further handling? */ @@ -3173,13 +3226,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) row.master_log_file= start.file_name(); row.master_log_pos= start.file_pos(); - DBUG_PRINT("info",("COMMIT gci %lld",gci)); + DBUG_PRINT("info", ("COMMIT gci: %lld", gci)); if (ndb_update_binlog_index) ndb_add_binlog_index(thd, &row); ndb_latest_applied_binlog_epoch= gci; } - else - trans.commit(); ndb_latest_handled_binlog_epoch= gci; #ifdef RUN_NDB_BINLOG_TIMER gci_timer.stop(); diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc index a69dea9a158..bf40ead070c 100644 --- a/sql/rpl_injector.cc +++ b/sql/rpl_injector.cc @@ -43,6 +43,9 @@ injector::transaction::transaction(MYSQL_LOG *log, THD *thd) injector::transaction::~transaction() { + if (!good()) + return; + /* Needed since my_free expects a 'char*' (instead of 'void*'). */ char* const the_memory= const_cast<char*>(m_start_pos.m_file_name); |