summaryrefslogtreecommitdiff
path: root/ndb/examples/ndbapi_event_example/ndbapi_event.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/examples/ndbapi_event_example/ndbapi_event.cpp')
-rw-r--r--ndb/examples/ndbapi_event_example/ndbapi_event.cpp268
1 files changed, 268 insertions, 0 deletions
diff --git a/ndb/examples/ndbapi_event_example/ndbapi_event.cpp b/ndb/examples/ndbapi_event_example/ndbapi_event.cpp
new file mode 100644
index 00000000000..82e39e32d13
--- /dev/null
+++ b/ndb/examples/ndbapi_event_example/ndbapi_event.cpp
@@ -0,0 +1,268 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/**
+ * ndbapi_event.cpp: Using API level events in NDB API
+ *
+ * Classes and methods used in this example:
+ *
+ * Ndb_cluster_connection
+ * connect()
+ * wait_until_ready()
+ *
+ * Ndb
+ * init()
+ * getDictionary()
+ * createEventOperation()
+ * dropEventOperation()
+ * pollEvents()
+ *
+ * NdbDictionary
+ * createEvent()
+ * dropEvent()
+ *
+ * NdbDictionary::Event
+ * setTable()
+ * addtableEvent()
+ * addEventColumn()
+ *
+ * NdbEventOperation
+ * getValue()
+ * getPreValue()
+ * execute()
+ * next()
+ * isConsistent()
+ * getEventType()
+ *
+ */
+
+#include <NdbApi.hpp>
+
+// Used for cout
+#include <stdio.h>
+#include <iostream>
+#include <unistd.h>
+
+
+/**
+ *
+ * Assume that there is a table TAB0 which is being updated by
+ * another process (e.g. flexBench -l 0 -stdtables).
+ * We want to monitor what happens with columns COL0, COL2, COL11
+ *
+ * or together with the mysqlcluster client;
+ *
+ * shell> mysqlcluster -u root
+ * mysql> create database TEST_DB;
+ * mysql> use TEST_DB;
+ * mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int);
+ *
+ * In another window start ndbapi_example5, wait until properly started
+ *
+ * mysql> insert into TAB0 values (1,2,3);
+ * mysql> insert into TAB0 values (2,2,3);
+ * mysql> insert into TAB0 values (3,2,9);
+ * mysql>
+ *
+ * you should see the data popping up in the example window
+ *
+ */
+
+#define APIERROR(error) \
+ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
+ << error.code << ", msg: " << error.message << "." << std::endl; \
+ exit(-1); }
+
+int myCreateEvent(Ndb* myNdb,
+ const char *eventName,
+ const char *eventTableName,
+ const char **eventColumnName,
+ const int noEventColumnName);
+
+int main()
+{
+ ndb_init();
+
+ Ndb_cluster_connection *cluster_connection=
+ new Ndb_cluster_connection(); // Object representing the cluster
+
+ int r= cluster_connection->connect(5 /* retries */,
+ 3 /* delay between retries */,
+ 1 /* verbose */);
+ if (r > 0)
+ {
+ std::cout
+ << "Cluster connect failed, possibly resolved with more retries.\n";
+ exit(-1);
+ }
+ else if (r < 0)
+ {
+ std::cout
+ << "Cluster connect failed.\n";
+ exit(-1);
+ }
+
+ if (cluster_connection->wait_until_ready(30,30))
+ {
+ std::cout << "Cluster was not ready within 30 secs." << std::endl;
+ exit(-1);
+ }
+
+ Ndb* myNdb= new Ndb(cluster_connection,
+ "TEST_DB"); // Object representing the database
+
+ if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
+
+ const char *eventName= "CHNG_IN_TAB0";
+ const char *eventTableName= "TAB0";
+ const int noEventColumnName= 3;
+ const char *eventColumnName[noEventColumnName]=
+ {"COL0",
+ "COL1",
+ "COL11"};
+
+ // Create events
+ myCreateEvent(myNdb,
+ eventName,
+ eventTableName,
+ eventColumnName,
+ noEventColumnName);
+ int j= 0;
+ while (j < 5) {
+
+ // Start "transaction" for handling events
+ NdbEventOperation* op;
+ printf("create EventOperation\n");
+ if ((op = myNdb->createEventOperation(eventName,100)) == NULL)
+ APIERROR(myNdb->getNdbError());
+
+ printf("get values\n");
+ NdbRecAttr* recAttr[noEventColumnName];
+ NdbRecAttr* recAttrPre[noEventColumnName];
+ // primary keys should always be a part of the result
+ for (int i = 0; i < noEventColumnName; i++) {
+ recAttr[i] = op->getValue(eventColumnName[i]);
+ recAttrPre[i] = op->getPreValue(eventColumnName[i]);
+ }
+
+ // set up the callbacks
+ printf("execute\n");
+ if (op->execute()) { // This starts changes to "start flowing"
+ printf("operation execution failed\n");
+ exit(-1);
+ }
+
+ int i= 0;
+ while(i < 40) {
+ // printf("now waiting for event...\n");
+ int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
+ if (r > 0) {
+ // printf("got data! %d\n", r);
+ int overrun;
+ while (op->next(&overrun) > 0) {
+ i++;
+ if (!op->isConsistent())
+ printf("A node failure has occured and events might be missing\n");
+ switch (op->getEventType()) {
+ case NdbDictionary::Event::TE_INSERT:
+ printf("%u INSERT: ", i);
+ break;
+ case NdbDictionary::Event::TE_DELETE:
+ printf("%u DELETE: ", i);
+ break;
+ case NdbDictionary::Event::TE_UPDATE:
+ printf("%u UPDATE: ", i);
+ break;
+ default:
+ abort(); // should not happen
+ }
+ printf("overrun %u pk %u: ", overrun, recAttr[0]->u_32_value());
+ for (int i = 1; i < noEventColumnName; i++) {
+ if (recAttr[i]->isNULL() >= 0) { // we have a value
+ printf(" post[%u]=", i);
+ if (recAttr[i]->isNULL() == 0) // we have a non-null value
+ printf("%u", recAttr[i]->u_32_value());
+ else // we have a null value
+ printf("NULL");
+ }
+ if (recAttrPre[i]->isNULL() >= 0) { // we have a value
+ printf(" post[%u]=", i);
+ if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
+ printf("%u", recAttrPre[i]->u_32_value());
+ else // we have a null value
+ printf("NULL");
+ }
+ }
+ printf("\n");
+ }
+ } else
+ ;//printf("timed out\n");
+ }
+ // don't want to listen to events anymore
+ myNdb->dropEventOperation(op);
+
+ j++;
+ }
+
+ {
+ NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
+ if (!myDict) APIERROR(myNdb->getNdbError());
+ myDict->dropEvent(eventName); // remove event from database
+ }
+
+ delete myNdb;
+ delete cluster_connection;
+ ndb_end(0);
+ return 0;
+}
+
+int myCreateEvent(Ndb* myNdb,
+ const char *eventName,
+ const char *eventTableName,
+ const char **eventColumnName,
+ const int noEventColumnName)
+{
+ NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
+ if (!myDict) APIERROR(myNdb->getNdbError());
+
+ NdbDictionary::Event myEvent(eventName);
+ myEvent.setTable(eventTableName);
+ myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
+
+ for (int i = 0; i < noEventColumnName; i++)
+ myEvent.addEventColumn(eventColumnName[i]);
+
+ int res = myDict->createEvent(myEvent); // Add event to database
+
+ if (res == 0)
+ myEvent.print();
+ else {
+ printf("Event creation failed\n");
+ printf("trying drop Event, maybe event exists\n");
+ res = myDict->dropEvent(eventName);
+ if (res)
+ exit(-1);
+ // try again
+ res = myDict->createEvent(myEvent); // Add event to database
+ if (res)
+ exit(-1);
+ }
+
+ return res;
+}