diff options
Diffstat (limited to 'ndb/examples/ndbapi_event_example/ndbapi_event.cpp')
-rw-r--r-- | ndb/examples/ndbapi_event_example/ndbapi_event.cpp | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/ndb/examples/ndbapi_event_example/ndbapi_event.cpp b/ndb/examples/ndbapi_event_example/ndbapi_event.cpp new file mode 100644 index 00000000000..82e39e32d13 --- /dev/null +++ b/ndb/examples/ndbapi_event_example/ndbapi_event.cpp @@ -0,0 +1,268 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/** + * ndbapi_event.cpp: Using API level events in NDB API + * + * Classes and methods used in this example: + * + * Ndb_cluster_connection + * connect() + * wait_until_ready() + * + * Ndb + * init() + * getDictionary() + * createEventOperation() + * dropEventOperation() + * pollEvents() + * + * NdbDictionary + * createEvent() + * dropEvent() + * + * NdbDictionary::Event + * setTable() + * addtableEvent() + * addEventColumn() + * + * NdbEventOperation + * getValue() + * getPreValue() + * execute() + * next() + * isConsistent() + * getEventType() + * + */ + +#include <NdbApi.hpp> + +// Used for cout +#include <stdio.h> +#include <iostream> +#include <unistd.h> + + +/** + * + * Assume that there is a table TAB0 which is being updated by + * another process (e.g. flexBench -l 0 -stdtables). + * We want to monitor what happens with columns COL0, COL2, COL11 + * + * or together with the mysqlcluster client; + * + * shell> mysqlcluster -u root + * mysql> create database TEST_DB; + * mysql> use TEST_DB; + * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int); + * + * In another window start ndbapi_example5, wait until properly started + * + * mysql> insert into TAB0 values (1,2,3); + * mysql> insert into TAB0 values (2,2,3); + * mysql> insert into TAB0 values (3,2,9); + * mysql> + * + * you should see the data popping up in the example window + * + */ + +#define APIERROR(error) \ + { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ + << error.code << ", msg: " << error.message << "." << std::endl; \ + exit(-1); } + +int myCreateEvent(Ndb* myNdb, + const char *eventName, + const char *eventTableName, + const char **eventColumnName, + const int noEventColumnName); + +int main() +{ + ndb_init(); + + Ndb_cluster_connection *cluster_connection= + new Ndb_cluster_connection(); // Object representing the cluster + + int r= cluster_connection->connect(5 /* retries */, + 3 /* delay between retries */, + 1 /* verbose */); + if (r > 0) + { + std::cout + << "Cluster connect failed, possibly resolved with more retries.\n"; + exit(-1); + } + else if (r < 0) + { + std::cout + << "Cluster connect failed.\n"; + exit(-1); + } + + if (cluster_connection->wait_until_ready(30,30)) + { + std::cout << "Cluster was not ready within 30 secs." << std::endl; + exit(-1); + } + + Ndb* myNdb= new Ndb(cluster_connection, + "TEST_DB"); // Object representing the database + + if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); + + const char *eventName= "CHNG_IN_TAB0"; + const char *eventTableName= "TAB0"; + const int noEventColumnName= 3; + const char *eventColumnName[noEventColumnName]= + {"COL0", + "COL1", + "COL11"}; + + // Create events + myCreateEvent(myNdb, + eventName, + eventTableName, + eventColumnName, + noEventColumnName); + int j= 0; + while (j < 5) { + + // Start "transaction" for handling events + NdbEventOperation* op; + printf("create EventOperation\n"); + if ((op = myNdb->createEventOperation(eventName,100)) == NULL) + APIERROR(myNdb->getNdbError()); + + printf("get values\n"); + NdbRecAttr* recAttr[noEventColumnName]; + NdbRecAttr* recAttrPre[noEventColumnName]; + // primary keys should always be a part of the result + for (int i = 0; i < noEventColumnName; i++) { + recAttr[i] = op->getValue(eventColumnName[i]); + recAttrPre[i] = op->getPreValue(eventColumnName[i]); + } + + // set up the callbacks + printf("execute\n"); + if (op->execute()) { // This starts changes to "start flowing" + printf("operation execution failed\n"); + exit(-1); + } + + int i= 0; + while(i < 40) { + // printf("now waiting for event...\n"); + int r= myNdb->pollEvents(1000); // wait for event or 1000 ms + if (r > 0) { + // printf("got data! %d\n", r); + int overrun; + while (op->next(&overrun) > 0) { + i++; + if (!op->isConsistent()) + printf("A node failure has occured and events might be missing\n"); + switch (op->getEventType()) { + case NdbDictionary::Event::TE_INSERT: + printf("%u INSERT: ", i); + break; + case NdbDictionary::Event::TE_DELETE: + printf("%u DELETE: ", i); + break; + case NdbDictionary::Event::TE_UPDATE: + printf("%u UPDATE: ", i); + break; + default: + abort(); // should not happen + } + printf("overrun %u pk %u: ", overrun, recAttr[0]->u_32_value()); + for (int i = 1; i < noEventColumnName; i++) { + if (recAttr[i]->isNULL() >= 0) { // we have a value + printf(" post[%u]=", i); + if (recAttr[i]->isNULL() == 0) // we have a non-null value + printf("%u", recAttr[i]->u_32_value()); + else // we have a null value + printf("NULL"); + } + if (recAttrPre[i]->isNULL() >= 0) { // we have a value + printf(" post[%u]=", i); + if (recAttrPre[i]->isNULL() == 0) // we have a non-null value + printf("%u", recAttrPre[i]->u_32_value()); + else // we have a null value + printf("NULL"); + } + } + printf("\n"); + } + } else + ;//printf("timed out\n"); + } + // don't want to listen to events anymore + myNdb->dropEventOperation(op); + + j++; + } + + { + NdbDictionary::Dictionary *myDict = myNdb->getDictionary(); + if (!myDict) APIERROR(myNdb->getNdbError()); + myDict->dropEvent(eventName); // remove event from database + } + + delete myNdb; + delete cluster_connection; + ndb_end(0); + return 0; +} + +int myCreateEvent(Ndb* myNdb, + const char *eventName, + const char *eventTableName, + const char **eventColumnName, + const int noEventColumnName) +{ + NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); + if (!myDict) APIERROR(myNdb->getNdbError()); + + NdbDictionary::Event myEvent(eventName); + myEvent.setTable(eventTableName); + myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); + // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); + // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); + // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); + + for (int i = 0; i < noEventColumnName; i++) + myEvent.addEventColumn(eventColumnName[i]); + + int res = myDict->createEvent(myEvent); // Add event to database + + if (res == 0) + myEvent.print(); + else { + printf("Event creation failed\n"); + printf("trying drop Event, maybe event exists\n"); + res = myDict->dropEvent(eventName); + if (res) + exit(-1); + // try again + res = myDict->createEvent(myEvent); // Add event to database + if (res) + exit(-1); + } + + return res; +} |