diff options
author | unknown <tomas@whalegate.ndb.mysql.com> | 2007-04-23 20:30:26 +0200 |
---|---|---|
committer | unknown <tomas@whalegate.ndb.mysql.com> | 2007-04-23 20:30:26 +0200 |
commit | e79f14e152117e0090f6a208f8e69b971c43d51d (patch) | |
tree | b5423d9047ff6677c19dc3d68b5fb54c9f911108 | |
parent | 818ab28fa5d8f85a7a37233808537a816b312db7 (diff) | |
parent | 21fe947986820a400bab327ac707dc9d55d080e4 (diff) | |
download | mariadb-git-e79f14e152117e0090f6a208f8e69b971c43d51d.tar.gz |
Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-telco-gca
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-single-user
BitKeeper/etc/ignore:
auto-union
storage/ndb/test/tools/Makefile.am:
manual merge
storage/ndb/test/tools/listen.cpp:
manual merge
-rw-r--r-- | .bzrignore | 13 | ||||
-rw-r--r-- | storage/ndb/test/tools/Makefile.am | 3 | ||||
-rw-r--r-- | storage/ndb/test/tools/listen.cpp | 188 | ||||
-rw-r--r-- | storage/ndb/test/tools/rep_latency.cpp | 304 |
4 files changed, 503 insertions, 5 deletions
diff --git a/.bzrignore b/.bzrignore index 2e6193b6368..1c5de79fa8b 100644 --- a/.bzrignore +++ b/.bzrignore @@ -2640,6 +2640,16 @@ storage/ndb/lib/libNEWTON_BASICTEST_COMMON.so storage/ndb/lib/libREP_API.so storage/ndb/lib/libndbclient.so storage/ndb/lib/libndbclient_extra.so +storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent +storage/ndb/ndbapi-examples/mgmapi_logevent2/mgmapi_logevent2 +storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async +storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1 +storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event +storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries +storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan +storage/ndb/ndbapi-examples/ndbapi_simple/ndbapi_simple +storage/ndb/ndbapi-examples/ndbapi_simple_dual/ndbapi_simple_dual +storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index storage/ndb/src/common/debugger/libtrace.dsp storage/ndb/src/common/debugger/signaldata/libsignaldataprint.dsp storage/ndb/src/common/logger/liblogger.dsp @@ -2717,6 +2727,8 @@ storage/ndb/test/ndbapi/testDataBuffers storage/ndb/test/ndbapi/testDeadlock storage/ndb/test/ndbapi/testDict storage/ndb/test/ndbapi/testIndex +storage/ndb/test/ndbapi/testIndexStat +storage/ndb/test/ndbapi/testInterpreter storage/ndb/test/ndbapi/testLcp storage/ndb/test/ndbapi/testMgm storage/ndb/test/ndbapi/testNdbApi @@ -2752,6 +2764,7 @@ storage/ndb/test/tools/hugoScanRead storage/ndb/test/tools/hugoScanUpdate storage/ndb/test/tools/listen_event storage/ndb/test/tools/ndb_cpcc +storage/ndb/test/tools/rep_latency storage/ndb/test/tools/restart storage/ndb/test/tools/verify_index storage/ndb/tools/ndb_config diff --git a/storage/ndb/test/tools/Makefile.am b/storage/ndb/test/tools/Makefile.am index 25b4e20a682..1683d4d84ae 100644 --- a/storage/ndb/test/tools/Makefile.am +++ b/storage/ndb/test/tools/Makefile.am @@ -13,7 +13,7 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog +ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog rep_latency # transproxy @@ -34,6 +34,7 @@ create_index_SOURCES = create_index.cpp ndb_cpcc_SOURCES = cpcc.cpp listen_event_SOURCES = listen.cpp eventlog_SOURCES = log_listner.cpp +rep_latency_SOURCES = rep_latency.cpp include $(top_srcdir)/storage/ndb/config/common.mk.am include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am diff --git a/storage/ndb/test/tools/listen.cpp b/storage/ndb/test/tools/listen.cpp index fd404ff0134..e51b213195b 100644 --- a/storage/ndb/test/tools/listen.cpp +++ b/storage/ndb/test/tools/listen.cpp @@ -22,6 +22,128 @@ #include <getarg.h> +#define BATCH_SIZE 128 +struct Table_info +{ + Uint32 id; +}; + +struct Trans_arg +{ + Ndb *ndb; + NdbTransaction *trans; + Uint32 bytes_batched; +}; + +Vector< Vector<NdbRecAttr*> > event_values; +Vector< Vector<NdbRecAttr*> > event_pre_values; +Vector<struct Table_info> table_infos; + +static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg) +{ + trans_arg.ndb = ndb; + trans_arg.trans = ndb->startTransaction(); + trans_arg.bytes_batched = 0; +} + +static void do_equal(NdbOperation *op, + NdbEventOperation *pOp) +{ + struct Table_info *ti = (struct Table_info *)pOp->getCustomData(); + Vector<NdbRecAttr*> &ev = event_values[ti->id]; + const NdbDictionary::Table *tab= pOp->getTable(); + unsigned i, n_columns = tab->getNoOfColumns(); + for (i= 0; i < n_columns; i++) + { + if (tab->getColumn(i)->getPrimaryKey() && + op->equal(i, ev[i]->aRef())) + { + abort(); + } + } +} + +static void do_set_value(NdbOperation *op, + NdbEventOperation *pOp) +{ + struct Table_info *ti = (struct Table_info *)pOp->getCustomData(); + Vector<NdbRecAttr*> &ev = event_values[ti->id]; + const NdbDictionary::Table *tab= pOp->getTable(); + unsigned i, n_columns = tab->getNoOfColumns(); + for (i= 0; i < n_columns; i++) + { + if (!tab->getColumn(i)->getPrimaryKey() && + op->setValue(i, ev[i]->aRef())) + { + abort(); + } + } +} + +static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->writeTuple(); + + do_equal(op, pOp); + do_set_value(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->writeTuple(); + + do_equal(op, pOp); + do_set_value(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->deleteTuple(); + + do_equal(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_commit(struct Trans_arg &trans_arg) +{ + if (!trans_arg.trans) + return; + trans_arg.trans->execute(NdbTransaction::Commit); + trans_arg.ndb->closeTransaction(trans_arg.trans); +} + int main(int argc, const char** argv){ ndb_init(); @@ -29,8 +151,14 @@ main(int argc, const char** argv){ int _help = 0; const char* db = 0; + const char* connectstring1 = 0; + const char* connectstring2 = 0; struct getargs args[] = { + { "connectstring1", 'c', + arg_string, &connectstring1, "connectstring1", "" }, + { "connectstring2", 'C', + arg_string, &connectstring2, "connectstring2", "" }, { "database", 'd', arg_string, &db, "Database", "" }, { "usage", '?', arg_flag, &_help, "Print help", "" } }; @@ -46,7 +174,7 @@ main(int argc, const char** argv){ } // Connect to Ndb - Ndb_cluster_connection con; + Ndb_cluster_connection con(connectstring1); if(con.connect(12, 5, 1) != 0) { return NDBT_ProgramExit(NDBT_FAILED); @@ -61,12 +189,35 @@ main(int argc, const char** argv){ // Connect to Ndb and wait for it to become ready while(MyNdb.waitUntilReady() != 0) ndbout << "Waiting for ndb to become ready..." << endl; - + + Ndb_cluster_connection *con2 = NULL; + Ndb *ndb2 = NULL; + if (connectstring2) + { + con2 = new Ndb_cluster_connection(connectstring2); + + if(con2->connect(12, 5, 1) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } + ndb2 = new Ndb( con2, db ? db : "TEST_DB" ); + + if(ndb2->init() != 0){ + ERR(ndb2->getNdbError()); + return NDBT_ProgramExit(NDBT_FAILED); + } + + // Connect to Ndb and wait for it to become ready + while(ndb2->waitUntilReady() != 0) + ndbout << "Waiting for ndb to become ready..." << endl; + } + int result = 0; NdbDictionary::Dictionary *myDict = MyNdb.getDictionary(); Vector<NdbDictionary::Event*> events; Vector<NdbEventOperation*> event_ops; + int sz = 0; for(i= optind; i<argc; i++) { const NdbDictionary::Table* table= myDict->getTable(argv[i]); @@ -121,12 +272,23 @@ main(int argc, const char** argv){ goto end; } + event_values.push_back(Vector<NdbRecAttr *>()); + event_pre_values.push_back(Vector<NdbRecAttr *>()); for (int a = 0; a < table->getNoOfColumns(); a++) { - pOp->getValue(table->getColumn(a)->getName()); - pOp->getPreValue(table->getColumn(a)->getName()); + event_values[sz]. + push_back(pOp->getValue(table->getColumn(a)->getName())); + event_pre_values[sz]. + push_back(pOp->getPreValue(table->getColumn(a)->getName())); } event_ops.push_back(pOp); + { + struct Table_info ti; + ti.id = sz; + table_infos.push_back(ti); + } + pOp->setCustomData((void *)&table_infos[sz]); + sz++; } for(i= 0; i<(int)event_ops.size(); i++) @@ -140,6 +302,7 @@ main(int argc, const char** argv){ } } + struct Trans_arg trans_arg; while(true) { while(MyNdb.pollEvents(100) == 0); @@ -149,18 +312,26 @@ main(int argc, const char** argv){ { Uint64 gci= pOp->getGCI(); Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0; + if (ndb2) + do_begin(ndb2, trans_arg); do { switch(pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: cnt_i++; + if (ndb2) + do_insert(trans_arg, pOp); break; case NdbDictionary::Event::TE_DELETE: cnt_d++; + if (ndb2) + do_delete(trans_arg, pOp); break; case NdbDictionary::Event::TE_UPDATE: cnt_u++; + if (ndb2) + do_update(trans_arg, pOp); break; case NdbDictionary::Event::TE_CLUSTER_FAILURE: break; @@ -180,6 +351,8 @@ main(int argc, const char** argv){ abort(); } } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI()); + if (ndb2) + do_commit(trans_arg); ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d); } } @@ -187,8 +360,15 @@ end: for(i= 0; i<(int)event_ops.size(); i++) MyNdb.dropEventOperation(event_ops[i]); + if (ndb2) + delete ndb2; + if (con2) + delete con2; return NDBT_ProgramExit(NDBT_OK); } +template class Vector<struct Table_info>; +template class Vector<NdbRecAttr*>; +template class Vector< Vector<NdbRecAttr*> >; template class Vector<NdbDictionary::Event*>; template class Vector<NdbEventOperation*>; diff --git a/storage/ndb/test/tools/rep_latency.cpp b/storage/ndb/test/tools/rep_latency.cpp new file mode 100644 index 00000000000..5ca9a1a1190 --- /dev/null +++ b/storage/ndb/test/tools/rep_latency.cpp @@ -0,0 +1,304 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + * Update on master wait for update on slave + * + */ + +#include <NdbApi.hpp> +#include <NdbSleep.h> +#include <sys/time.h> +#include <NdbOut.hpp> +#include <NDBT.hpp> + +struct Xxx +{ + Ndb *ndb; + const NdbDictionary::Table *table; + Uint32 pk_col; + Uint32 col; +}; + +struct XxxR +{ + Uint32 pk_val; + Uint32 val; + struct timeval start_time; + Uint32 latency; +}; + +static int +prepare_master_or_slave(Ndb &myNdb, + const char* table, + const char* pk, + Uint32 pk_val, + const char* col, + struct Xxx &xxx, + struct XxxR &xxxr); +static void +run_master_update(struct Xxx &xxx, struct XxxR &xxxr); +static void +run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr); + +#define PRINT_ERROR(code,msg) \ + g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \ + << ", code: " << code \ + << ", msg: " << msg << ".\n" +#define APIERROR(error) { \ + PRINT_ERROR((error).code, (error).message); \ + exit(-1); } + +int main(int argc, char** argv) +{ + if (argc != 8) + { + ndbout << "Arguments are <connect_string cluster 1> <connect_string cluster 2> <database> <table name> <primary key> <value of primary key> <attribute to update>.\n"; + exit(-1); + } + // ndb_init must be called first + ndb_init(); + { + const char *opt_connectstring1 = argv[1]; + const char *opt_connectstring2 = argv[2]; + const char *opt_db = argv[3]; + const char *opt_table = argv[4]; + const char *opt_pk = argv[5]; + const Uint32 opt_pk_val = atoi(argv[6]); + const char *opt_col = argv[7]; + + // Object representing the cluster 1 + Ndb_cluster_connection cluster1_connection(opt_connectstring1); + // Object representing the cluster 2 + Ndb_cluster_connection cluster2_connection(opt_connectstring2); + + // connect cluster 1 and run application + // Connect to cluster 1 management server (ndb_mgmd) + if (cluster1_connection.connect(4 /* retries */, + 5 /* delay between retries */, + 1 /* verbose */)) + { + g_err << "Cluster 1 management server was not ready within 30 secs.\n"; + exit(-1); + } + // Optionally connect and wait for the storage nodes (ndbd's) + if (cluster1_connection.wait_until_ready(30,0) < 0) + { + g_err << "Cluster 1 was not ready within 30 secs.\n"; + exit(-1); + } + // connect cluster 2 and run application + // Connect to cluster management server (ndb_mgmd) + if (cluster2_connection.connect(4 /* retries */, + 5 /* delay between retries */, + 1 /* verbose */)) + { + g_err << "Cluster 2 management server was not ready within 30 secs.\n"; + exit(-1); + } + // Optionally connect and wait for the storage nodes (ndbd's) + if (cluster2_connection.wait_until_ready(30,0) < 0) + { + g_err << "Cluster 2 was not ready within 30 secs.\n"; + exit(-1); + } + // Object representing the database + Ndb myNdb1(&cluster1_connection, opt_db); + Ndb myNdb2(&cluster2_connection, opt_db); + // + struct Xxx xxx1; + struct Xxx xxx2; + struct XxxR xxxr; + prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col, + xxx1, xxxr); + prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col, + xxx2, xxxr); + while (1) + { + // run the application code + run_master_update(xxx1, xxxr); + run_slave_wait(xxx2, xxxr); + ndbout << "latency: " << xxxr.latency << endl; + } + } + // Note: all connections must have been destroyed before calling ndb_end() + ndb_end(0); + + return 0; +} + +static int +prepare_master_or_slave(Ndb &myNdb, + const char* table, + const char* pk, + Uint32 pk_val, + const char* col, + struct Xxx &xxx, + struct XxxR &xxxr) +{ + if (myNdb.init()) + APIERROR(myNdb.getNdbError()); + const NdbDictionary::Dictionary* myDict = myNdb.getDictionary(); + const NdbDictionary::Table *myTable = myDict->getTable(table); + if (myTable == NULL) + APIERROR(myDict->getNdbError()); + const NdbDictionary::Column *myPkCol = myTable->getColumn(pk); + if (myPkCol == NULL) + APIERROR(myDict->getNdbError()); + if (myPkCol->getType() != NdbDictionary::Column::Unsigned) + { + PRINT_ERROR(0, "Primary key column not of type unsigned"); + exit(-1); + } + const NdbDictionary::Column *myCol = myTable->getColumn(col); + if (myCol == NULL) + APIERROR(myDict->getNdbError()); + if (myCol->getType() != NdbDictionary::Column::Unsigned) + { + PRINT_ERROR(0, "Update column not of type unsigned"); + exit(-1); + } + + xxx.ndb = &myNdb; + xxx.table = myTable; + xxx.pk_col = myPkCol->getColumnNo(); + xxx.col = myCol->getColumnNo(); + + xxxr.pk_val = pk_val; + + return 0; +} + +static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr) +{ + Ndb *ndb = xxx.ndb; + const NdbDictionary::Table *myTable = xxx.table; + int retry_sleep= 10; /* 10 milliseconds */ + int retries= 100; + while (1) + { + Uint32 val; + NdbTransaction *trans = ndb->startTransaction(); + if (trans == NULL) + goto err; + { + NdbOperation *op = trans->getNdbOperation(myTable); + if (op == NULL) + APIERROR(trans->getNdbError()); + op->readTupleExclusive(); + op->equal(xxx.pk_col, xxxr.pk_val); + op->getValue(xxx.col, (char *)&val); + } + if (trans->execute(NdbTransaction::NoCommit)) + goto err; + //fprintf(stderr, "read %u\n", val); + xxxr.val = val + 1; + { + NdbOperation *op = trans->getNdbOperation(myTable); + if (op == NULL) + APIERROR(trans->getNdbError()); + op->updateTuple(); + op->equal(xxx.pk_col, xxxr.pk_val); + op->setValue(xxx.col, xxxr.val); + } + if (trans->execute(NdbTransaction::Commit)) + goto err; + ndb->closeTransaction(trans); + //fprintf(stderr, "updated to %u\n", xxxr.val); + break; +err: + const NdbError this_error= trans ? + trans->getNdbError() : ndb->getNdbError(); + if (this_error.status == NdbError::TemporaryError) + { + if (retries--) + { + if (trans) + ndb->closeTransaction(trans); + NdbSleep_MilliSleep(retry_sleep); + continue; // retry + } + } + if (trans) + ndb->closeTransaction(trans); + APIERROR(this_error); + } + /* update done start timer */ + gettimeofday(&xxxr.start_time, 0); +} + +static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr) +{ + struct timeval old_end_time = xxxr.start_time, end_time; + Ndb *ndb = xxx.ndb; + const NdbDictionary::Table *myTable = xxx.table; + int retry_sleep= 10; /* 10 milliseconds */ + int retries= 100; + while (1) + { + Uint32 val; + NdbTransaction *trans = ndb->startTransaction(); + if (trans == NULL) + goto err; + { + NdbOperation *op = trans->getNdbOperation(myTable); + if (op == NULL) + APIERROR(trans->getNdbError()); + op->readTuple(); + op->equal(xxx.pk_col, xxxr.pk_val); + op->getValue(xxx.col, (char *)&val); + if (trans->execute(NdbTransaction::Commit)) + goto err; + } + /* read done, check time of read */ + gettimeofday(&end_time, 0); + ndb->closeTransaction(trans); + //fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val); + if (xxxr.val != val) + { + /* expected value not received yet */ + retries = 100; + NdbSleep_MilliSleep(retry_sleep); + old_end_time = end_time; + continue; + } + break; +err: + const NdbError this_error= trans ? + trans->getNdbError() : ndb->getNdbError(); + if (this_error.status == NdbError::TemporaryError) + { + if (retries--) + { + if (trans) + ndb->closeTransaction(trans); + NdbSleep_MilliSleep(retry_sleep); + continue; // retry + } + } + if (trans) + ndb->closeTransaction(trans); + APIERROR(this_error); + } + + Int64 elapsed_usec1 = + ((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 + + ((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec); + Int64 elapsed_usec2 = + ((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 + + ((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec); + xxxr.latency = + ((elapsed_usec1 - elapsed_usec2/2)+999)/1000; +} |