summaryrefslogtreecommitdiff
path: root/storage/ndb/test/tools
diff options
context:
space:
mode:
authorunknown <tomas@whalegate.ndb.mysql.com>2007-04-23 19:18:46 +0200
committerunknown <tomas@whalegate.ndb.mysql.com>2007-04-23 19:18:46 +0200
commite8e7b7c0b4243309fad3291fcd1358a7c7c2f5a8 (patch)
treef719657a1428a2bdc0bd73779717d9a036c3dc61 /storage/ndb/test/tools
parente4f490c163643d1c137f5fd73531d40efedcd0f2 (diff)
downloadmariadb-git-e8e7b7c0b4243309fad3291fcd1358a7c7c2f5a8.tar.gz
simple extend of listen_event to do apply on remote cluster
Diffstat (limited to 'storage/ndb/test/tools')
-rw-r--r--storage/ndb/test/tools/listen.cpp188
1 files changed, 184 insertions, 4 deletions
diff --git a/storage/ndb/test/tools/listen.cpp b/storage/ndb/test/tools/listen.cpp
index 661193bf4b8..97c307e9c15 100644
--- a/storage/ndb/test/tools/listen.cpp
+++ b/storage/ndb/test/tools/listen.cpp
@@ -22,6 +22,128 @@
#include <getarg.h>
+#define BATCH_SIZE 128
+struct Table_info
+{
+ Uint32 id;
+};
+
+struct Trans_arg
+{
+ Ndb *ndb;
+ NdbTransaction *trans;
+ Uint32 bytes_batched;
+};
+
+Vector< Vector<NdbRecAttr*> > event_values;
+Vector< Vector<NdbRecAttr*> > event_pre_values;
+Vector<struct Table_info> table_infos;
+
+static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
+{
+ trans_arg.ndb = ndb;
+ trans_arg.trans = ndb->startTransaction();
+ trans_arg.bytes_batched = 0;
+}
+
+static void do_equal(NdbOperation *op,
+ NdbEventOperation *pOp)
+{
+ struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
+ Vector<NdbRecAttr*> &ev = event_values[ti->id];
+ const NdbDictionary::Table *tab= pOp->getTable();
+ unsigned i, n_columns = tab->getNoOfColumns();
+ for (i= 0; i < n_columns; i++)
+ {
+ if (tab->getColumn(i)->getPrimaryKey() &&
+ op->equal(i, ev[i]->aRef()))
+ {
+ abort();
+ }
+ }
+}
+
+static void do_set_value(NdbOperation *op,
+ NdbEventOperation *pOp)
+{
+ struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
+ Vector<NdbRecAttr*> &ev = event_values[ti->id];
+ const NdbDictionary::Table *tab= pOp->getTable();
+ unsigned i, n_columns = tab->getNoOfColumns();
+ for (i= 0; i < n_columns; i++)
+ {
+ if (!tab->getColumn(i)->getPrimaryKey() &&
+ op->setValue(i, ev[i]->aRef()))
+ {
+ abort();
+ }
+ }
+}
+
+static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
+{
+ if (!trans_arg.trans)
+ return;
+
+ NdbOperation *op =
+ trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
+ op->writeTuple();
+
+ do_equal(op, pOp);
+ do_set_value(op, pOp);
+
+ trans_arg.bytes_batched++;
+ if (trans_arg.bytes_batched > BATCH_SIZE)
+ {
+ trans_arg.trans->execute(NdbTransaction::NoCommit);
+ trans_arg.bytes_batched = 0;
+ }
+}
+static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
+{
+ if (!trans_arg.trans)
+ return;
+
+ NdbOperation *op =
+ trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
+ op->writeTuple();
+
+ do_equal(op, pOp);
+ do_set_value(op, pOp);
+
+ trans_arg.bytes_batched++;
+ if (trans_arg.bytes_batched > BATCH_SIZE)
+ {
+ trans_arg.trans->execute(NdbTransaction::NoCommit);
+ trans_arg.bytes_batched = 0;
+ }
+}
+static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
+{
+ if (!trans_arg.trans)
+ return;
+
+ NdbOperation *op =
+ trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
+ op->deleteTuple();
+
+ do_equal(op, pOp);
+
+ trans_arg.bytes_batched++;
+ if (trans_arg.bytes_batched > BATCH_SIZE)
+ {
+ trans_arg.trans->execute(NdbTransaction::NoCommit);
+ trans_arg.bytes_batched = 0;
+ }
+}
+static void do_commit(struct Trans_arg &trans_arg)
+{
+ if (!trans_arg.trans)
+ return;
+ trans_arg.trans->execute(NdbTransaction::Commit);
+ trans_arg.ndb->closeTransaction(trans_arg.trans);
+}
+
int
main(int argc, const char** argv){
ndb_init();
@@ -29,8 +151,14 @@ main(int argc, const char** argv){
int _help = 0;
const char* db = 0;
+ const char* connectstring1 = 0;
+ const char* connectstring2 = 0;
struct getargs args[] = {
+ { "connectstring1", 'c',
+ arg_string, &connectstring1, "connectstring1", "" },
+ { "connectstring2", 'C',
+ arg_string, &connectstring2, "connectstring2", "" },
{ "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
@@ -46,7 +174,7 @@ main(int argc, const char** argv){
}
// Connect to Ndb
- Ndb_cluster_connection con;
+ Ndb_cluster_connection con(connectstring1);
if(con.connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
@@ -61,12 +189,35 @@ main(int argc, const char** argv){
// Connect to Ndb and wait for it to become ready
while(MyNdb.waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl;
-
+
+ Ndb_cluster_connection *con2 = NULL;
+ Ndb *ndb2 = NULL;
+ if (connectstring2)
+ {
+ con2 = new Ndb_cluster_connection(connectstring2);
+
+ if(con2->connect(12, 5, 1) != 0)
+ {
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+ ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
+
+ if(ndb2->init() != 0){
+ ERR(ndb2->getNdbError());
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+
+ // Connect to Ndb and wait for it to become ready
+ while(ndb2->waitUntilReady() != 0)
+ ndbout << "Waiting for ndb to become ready..." << endl;
+ }
+
int result = 0;
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
Vector<NdbDictionary::Event*> events;
Vector<NdbEventOperation*> event_ops;
+ int sz = 0;
for(i= optind; i<argc; i++)
{
const NdbDictionary::Table* table= myDict->getTable(argv[i]);
@@ -121,12 +272,23 @@ main(int argc, const char** argv){
goto end;
}
+ event_values.push_back(Vector<NdbRecAttr *>());
+ event_pre_values.push_back(Vector<NdbRecAttr *>());
for (int a = 0; a < table->getNoOfColumns(); a++)
{
- pOp->getValue(table->getColumn(a)->getName());
- pOp->getPreValue(table->getColumn(a)->getName());
+ event_values[sz].
+ push_back(pOp->getValue(table->getColumn(a)->getName()));
+ event_pre_values[sz].
+ push_back(pOp->getPreValue(table->getColumn(a)->getName()));
}
event_ops.push_back(pOp);
+ {
+ struct Table_info ti;
+ ti.id = sz;
+ table_infos.push_back(ti);
+ }
+ pOp->setCustomData((void *)&table_infos[sz]);
+ sz++;
}
for(i= 0; i<(int)event_ops.size(); i++)
@@ -140,6 +302,7 @@ main(int argc, const char** argv){
}
}
+ struct Trans_arg trans_arg;
while(true)
{
while(MyNdb.pollEvents(100) == 0);
@@ -149,18 +312,26 @@ main(int argc, const char** argv){
{
Uint64 gci= pOp->getGCI();
Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
+ if (ndb2)
+ do_begin(ndb2, trans_arg);
do
{
switch(pOp->getEventType())
{
case NdbDictionary::Event::TE_INSERT:
cnt_i++;
+ if (ndb2)
+ do_insert(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_DELETE:
cnt_d++;
+ if (ndb2)
+ do_delete(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_UPDATE:
cnt_u++;
+ if (ndb2)
+ do_update(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_CLUSTER_FAILURE:
break;
@@ -180,12 +351,21 @@ main(int argc, const char** argv){
abort();
}
} while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
+ if (ndb2)
+ do_commit(trans_arg);
ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d);
}
}
end:
+ if (ndb2)
+ delete ndb2;
+ if (con2)
+ delete con2;
return NDBT_ProgramExit(NDBT_OK);
}
+template class Vector<struct Table_info>;
+template class Vector<NdbRecAttr*>;
+template class Vector< Vector<NdbRecAttr*> >;
template class Vector<NdbDictionary::Event*>;
template class Vector<NdbEventOperation*>;