/* Copyright (C) 2004, 2005 MySQL AB Use is subject to license terms This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ #include #include #include #include #include #include #include #include #include struct CASE { bool start_row; bool end_row; bool curr_row; const char * op1; const char * op2; const char * op3; int val; }; static CASE g_op_types[] = { { false, true, false, "INS", 0, 0, 0 }, // 0x001 a { true, true, false, "UPD", 0, 0, 0 }, // 0x002 d { true, false, false, "DEL", 0, 0, 0 }, // 0x004 g { false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b { false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c { true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e { true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f { true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h { false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i { true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j }; const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0])); static Ndb* g_ndb = 0; static CASE* g_ops; static Ndb_cluster_connection *g_cluster_connection= 0; static HugoOperations* g_hugo_ops; static int g_use_ops = 1 | 2 | 4; static int g_cases = 0x1; static int g_case_loop = 2; static int g_rows = 10; static int g_setup_tables = 1; static int g_one_op_at_a_time = 0; static const char * g_tablename = "T1"; static const NdbDictionary::Table* g_table = 0; static NdbRestarter g_restarter; static int init_ndb(int argc, char** argv); static int parse_args(int argc, char** argv); static int connect_ndb(); static int drop_all_tables(); static int load_table(); static int pause_lcp(int error); static int do_op(int row); static int continue_lcp(int error = 0); static int commit(); static int restart(); static int validate(); #define require(x) { bool b = x; if(!b){g_err << __LINE__ << endl; abort();}} int main(int argc, char ** argv){ ndb_init(); require(!init_ndb(argc, argv)); if(parse_args(argc, argv)) return -1; require(!connect_ndb()); if(g_setup_tables){ require(!drop_all_tables()); if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){ exit(-1); } } g_table = g_ndb->getDictionary()->getTable(g_tablename); if(g_table == 0){ g_err << "Failed to retreive table: " << g_tablename << endl; exit(-1); } require(g_hugo_ops = new HugoOperations(* g_table)); require(!g_hugo_ops->startTransaction(g_ndb)); g_ops= new CASE[g_rows]; const int use_ops = g_use_ops; for(size_t i = 0; iconnect(12, 5, 1) != 0) { return 1; } g_ndb = new Ndb(g_cluster_connection, "TEST_DB"); g_ndb->init(256); if(g_ndb->waitUntilReady(30) == 0){ return 0; // int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP }; // return g_restarter.dumpStateAllNodes(args, 1); } return -1; } static int disconnect_ndb() { delete g_ndb; delete g_cluster_connection; g_ndb = 0; g_table = 0; g_cluster_connection= 0; return 0; } static int drop_all_tables() { NdbDictionary::Dictionary * dict = g_ndb->getDictionary(); require(dict); BaseString db = g_ndb->getDatabaseName(); BaseString schema = g_ndb->getSchemaName(); NdbDictionary::Dictionary::List list; if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){ g_err << "Failed to list tables: " << endl << dict->getNdbError() << endl; return -1; } for (unsigned i = 0; i < list.count; i++) { NdbDictionary::Dictionary::List::Element& elt = list.elements[i]; switch (elt.type) { case NdbDictionary::Object::SystemTable: case NdbDictionary::Object::UserTable: g_ndb->setDatabaseName(elt.database); g_ndb->setSchemaName(elt.schema); if(dict->dropTable(elt.name) != 0){ g_err << "Failed to drop table: " << elt.database << "/" << elt.schema << "/" << elt.name <getNdbError() << endl; return -1; } break; case NdbDictionary::Object::UniqueHashIndex: case NdbDictionary::Object::OrderedIndex: case NdbDictionary::Object::HashIndexTrigger: case NdbDictionary::Object::IndexTrigger: case NdbDictionary::Object::SubscriptionTrigger: case NdbDictionary::Object::ReadOnlyConstraint: default: break; } } g_ndb->setDatabaseName(db.c_str()); g_ndb->setSchemaName(schema.c_str()); return 0; } static int load_table() { UtilTransactions clear(* g_table); require(!clear.clearTable(g_ndb)); HugoOperations ops(* g_table); require(!ops.startTransaction(g_ndb)); size_t op = 0; size_t rows = 0; size_t uncommitted = 0; bool prepared = false; for(size_t i = 0; i= 100){ require(!ops.execute_Commit(g_ndb)); require(!ops.getTransaction()->restart()); rows += uncommitted; uncommitted = 0; } } if(uncommitted) require(!ops.execute_Commit(g_ndb)); require(!ops.closeTransaction(g_ndb)); rows += uncommitted; g_info << "Inserted " << rows << " rows" << endl; return 0; } static int pause_lcp(int error) { int nodes = g_restarter.getNumDbNodes(); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 }; int fd = ndb_mgm_listen_event(g_restarter.handle, filter); require(fd >= 0); require(!g_restarter.insertErrorInAllNodes(error)); int dump[] = { DumpStateOrd::DihStartLcpImmediately }; require(!g_restarter.dumpStateAllNodes(dump, 1)); char *tmp; char buf[1024]; SocketInputStream in(fd, 1000); int count = 0; do { tmp = in.gets(buf, 1024); if(tmp) { int id; if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error && --nodes == 0){ close(fd); return 0; } } } while(count++ < 30); close(fd); return -1; } static int do_op(int row) { HugoOperations & ops = * g_hugo_ops; if(strcmp(g_ops[row].op1, "INS") == 0){ require(!g_ops[row].curr_row); g_ops[row].curr_row = true; g_ops[row].val = rand(); require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op1, "UPD") == 0){ require(g_ops[row].curr_row); g_ops[row].val = rand(); require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op1, "DEL") == 0){ require(g_ops[row].curr_row); g_ops[row].curr_row = false; require(!ops.pkDeleteRecord(g_ndb, row, 1)); } require(!ops.execute_NoCommit(g_ndb)); if(g_ops[row].op2 == 0){ } else if(strcmp(g_ops[row].op2, "INS") == 0){ require(!g_ops[row].curr_row); g_ops[row].curr_row = true; g_ops[row].val = rand(); require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op2, "UPD") == 0){ require(g_ops[row].curr_row); g_ops[row].val = rand(); require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op2, "DEL") == 0){ require(g_ops[row].curr_row); g_ops[row].curr_row = false; require(!ops.pkDeleteRecord(g_ndb, row, 1)); } if(g_ops[row].op2 != 0) require(!ops.execute_NoCommit(g_ndb)); if(g_ops[row].op3 == 0){ } else if(strcmp(g_ops[row].op3, "INS") == 0){ require(!g_ops[row].curr_row); g_ops[row].curr_row = true; g_ops[row].val = rand(); require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op3, "UPD") == 0){ require(g_ops[row].curr_row); g_ops[row].val = rand(); require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val)); } else if(strcmp(g_ops[row].op3, "DEL") == 0){ require(g_ops[row].curr_row); g_ops[row].curr_row = false; require(!ops.pkDeleteRecord(g_ndb, row, 1)); } if(g_ops[row].op3 != 0) require(!ops.execute_NoCommit(g_ndb)); return 0; } static int continue_lcp(int error) { int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 }; int fd = -1; if(error){ fd = ndb_mgm_listen_event(g_restarter.handle, filter); require(fd >= 0); } int args[] = { DumpStateOrd::LCPContinue }; if(g_restarter.dumpStateAllNodes(args, 1) != 0) return -1; if(error){ char *tmp; char buf[1024]; SocketInputStream in(fd, 1000); int count = 0; int nodes = g_restarter.getNumDbNodes(); do { tmp = in.gets(buf, 1024); if(tmp) { int id; if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error && --nodes == 0){ close(fd); return 0; } } } while(count++ < 30); close(fd); } return 0; } static int commit() { HugoOperations & ops = * g_hugo_ops; int res = ops.execute_Commit(g_ndb); if(res == 0){ return ops.getTransaction()->restart(); } return res; } static int restart() { g_info << "Restarting cluster" << endl; g_hugo_ops->closeTransaction(g_ndb); disconnect_ndb(); delete g_hugo_ops; require(!g_restarter.restartAll()); require(!g_restarter.waitClusterStarted(30)); require(!connect_ndb()); g_table = g_ndb->getDictionary()->getTable(g_tablename); require(g_table); require(g_hugo_ops = new HugoOperations(* g_table)); require(!g_hugo_ops->startTransaction(g_ndb)); return 0; } static int validate() { HugoOperations ops(* g_table); for(size_t i = 0; i