summaryrefslogtreecommitdiff
path: root/ndb/test/tools/transproxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/test/tools/transproxy.cpp')
-rw-r--r--ndb/test/tools/transproxy.cpp362
1 files changed, 362 insertions, 0 deletions
diff --git a/ndb/test/tools/transproxy.cpp b/ndb/test/tools/transproxy.cpp
new file mode 100644
index 00000000000..384a8a34f03
--- /dev/null
+++ b/ndb/test/tools/transproxy.cpp
@@ -0,0 +1,362 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include <NdbTCP.h>
+#include <NdbOut.hpp>
+#include <NdbThread.h>
+#include <NdbSleep.h>
+#include <Properties.hpp>
+#include <LocalConfig.hpp>
+#include <Config.hpp>
+#include <InitConfigFileParser.hpp>
+#include <IPCConfig.hpp>
+
+static void
+fatal(char const* fmt, ...)
+{
+ va_list ap;
+ char buf[200];
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ ndbout << "FATAL: " << buf << endl;
+ sleep(1);
+ exit(1);
+}
+
+static void
+debug(char const* fmt, ...)
+{
+ va_list ap;
+ char buf[200];
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ ndbout << buf << endl;
+}
+
+// node
+struct Node {
+ enum Type { MGM = 1, DB = 2, API = 3 };
+ Type type;
+ unsigned id; // node id
+ static Node* list;
+ static unsigned count;
+ static Node* find(unsigned n) {
+ for (unsigned i = 0; i < count; i++) {
+ if (list[i].id == n)
+ return &list[i];
+ }
+ return 0;
+ }
+};
+
+unsigned Node::count = 0;
+Node* Node::list = 0;
+
+struct Copy {
+ int rfd; // read from
+ int wfd; // write to
+ unsigned char* buf;
+ unsigned bufsiz;
+ NdbThread* thread;
+ void run();
+ char info[20];
+};
+
+// connection between nodes 0-server side 1-client side
+// we are client to 0 and server to 1
+struct Conn {
+ Node* node[2]; // the nodes
+ unsigned port; // server port
+ unsigned proxy; // proxy port
+ static unsigned count;
+ static unsigned proxycount;
+ static Conn* list;
+ NdbThread* thread; // thread handling this connection
+ void run(); // run the connection
+ int sockfd[2]; // socket 0-on server side 1-client side
+ void conn0(); // connect to side 0
+ void conn1(); // connect to side 0
+ char info[20];
+ Copy copy[2]; // 0-to-1 and 1-to-0
+};
+
+unsigned Conn::count = 0;
+unsigned Conn::proxycount = 0;
+Conn* Conn::list = 0;
+
+// global data
+static char* hostname = 0;
+static struct sockaddr_in hostaddr;
+static char* localcfgfile = 0;
+static char* initcfgfile = 0;
+static unsigned ownnodeid = 0;
+
+static void
+properr(const Properties* props, const char* name, int i = -1)
+{
+ if (i < 0) {
+ fatal("get %s failed: errno = %d", name, props->getPropertiesErrno());
+ } else {
+ fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno());
+ }
+}
+
+// read config and load it into our structs
+static void
+getcfg()
+{
+ LocalConfig lcfg;
+ if (! lcfg.read(localcfgfile)) {
+ fatal("read %s failed", localcfgfile);
+ }
+ ownnodeid = lcfg._ownNodeId;
+ debug("ownnodeid = %d", ownnodeid);
+ InitConfigFileParser pars(initcfgfile);
+ Config icfg;
+ if (! pars.getConfig(icfg)) {
+ fatal("parse %s failed", initcfgfile);
+ }
+ Properties* ccfg = icfg.getConfig(ownnodeid);
+ if (ccfg == 0) {
+ const char* err = "unknown error";
+ fatal("getConfig: %s", err);
+ }
+ ccfg->put("NodeId", ownnodeid);
+ ccfg->put("NodeType", "MGM");
+ if (! ccfg->get("NoOfNodes", &Node::count)) {
+ properr(ccfg, "NoOfNodes", -1);
+ }
+ debug("Node::count = %d", Node::count);
+ Node::list = new Node[Node::count];
+ for (unsigned i = 0; i < Node::count; i++) {
+ Node& node = Node::list[i];
+ const Properties* nodecfg;
+ if (! ccfg->get("Node", 1+i, &nodecfg)) {
+ properr(ccfg, "Node", 1+i);
+ }
+ const char* type;
+ if (! nodecfg->get("Type", &type)) {
+ properr(nodecfg, "Type");
+ }
+ if (strcmp(type, "MGM") == 0) {
+ node.type = Node::MGM;
+ } else if (strcmp(type, "DB") == 0) {
+ node.type = Node::DB;
+ } else if (strcmp(type, "API") == 0) {
+ node.type = Node::API;
+ } else {
+ fatal("prop %s_%d bad Type = %s", "Node", 1+i, type);
+ }
+ if (! nodecfg->get("NodeId", &node.id)) {
+ properr(nodecfg, "NodeId");
+ }
+ debug("node id=%d type=%d", node.id, node.type);
+ }
+ IPCConfig ipccfg(ccfg);
+ if (ipccfg.init() != 0) {
+ fatal("ipccfg init failed");
+ }
+ if (! ccfg->get("NoOfConnections", &Conn::count)) {
+ properr(ccfg, "NoOfConnections");
+ }
+ debug("Conn::count = %d", Conn::count);
+ Conn::list = new Conn[Conn::count];
+ for (unsigned i = 0; i < Conn::count; i++) {
+ Conn& conn = Conn::list[i];
+ const Properties* conncfg;
+ if (! ccfg->get("Connection", i, &conncfg)) {
+ properr(ccfg, "Connection", i);
+ }
+ unsigned n;
+ if (! conncfg->get("NodeId1", &n)) {
+ properr(conncfg, "NodeId1");
+ }
+ if ((conn.node[0] = Node::find(n)) == 0) {
+ fatal("node %d not found", n);
+ }
+ if (! conncfg->get("NodeId2", &n)) {
+ properr(conncfg, "NodeId2");
+ }
+ if ((conn.node[1] = Node::find(n)) == 0) {
+ fatal("node %d not found", n);
+ }
+ if (! conncfg->get("PortNumber", &conn.port)) {
+ properr(conncfg, "PortNumber");
+ }
+ conn.proxy = 0;
+ const char* proxy;
+ if (conncfg->get("Proxy", &proxy)) {
+ conn.proxy = atoi(proxy);
+ if (conn.proxy > 0) {
+ Conn::proxycount++;
+ }
+ }
+ sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id);
+ }
+}
+
+void
+Conn::conn0()
+{
+ int fd;
+ while (1) {
+ if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
+ fatal("%s: create client socket failed: %s", info, strerror(errno));
+ }
+ struct sockaddr_in servaddr;
+ memset(&servaddr, 0, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_port = htons(port);
+ servaddr.sin_addr = hostaddr.sin_addr;
+#if 0 // coredump
+ if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) {
+ fatal("%s: hostname %s lookup failed", info, hostname);
+ }
+#endif
+ if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0)
+ break;
+ if (errno != ECONNREFUSED) {
+ fatal("%s: connect failed: %s", info, strerror(errno));
+ }
+ close(fd);
+ NdbSleep_MilliSleep(100);
+ }
+ sockfd[0] = fd;
+ debug("%s: side 0 connected", info);
+}
+
+void
+Conn::conn1()
+{
+ int servfd;
+ if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
+ fatal("%s: create server socket failed: %s", info, strerror(errno));
+ }
+ struct sockaddr_in servaddr;
+ memset(&servaddr, 0, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(proxy);
+ const int on = 1;
+ setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));
+ if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
+ fatal("%s: bind %d failed: %s", info, proxy, strerror(errno));
+ }
+ if (listen(servfd, 1) == -1) {
+ fatal("%s: listen %d failed: %s", info, proxy, strerror(errno));
+ }
+ int fd;
+ if ((fd = accept(servfd, 0, 0)) == -1) {
+ fatal("%s: accept failed: %s", info, strerror(errno));
+ }
+ sockfd[1] = fd;
+ close(servfd);
+ debug("%s: side 1 connected", info);
+}
+
+void
+Copy::run()
+{
+ debug("%s: start", info);
+ int n, m;
+ while (1) {
+ n = read(rfd, buf, sizeof(buf));
+ if (n < 0)
+ fatal("read error: %s", strerror(errno));
+ m = write(wfd, buf, n);
+ if (m != n)
+ fatal("write error: %s", strerror(errno));
+ }
+ debug("%s: stop", info);
+}
+
+extern "C" void*
+copyrun_C(void* copy)
+{
+ ((Copy*) copy)->run();
+ NdbThread_Exit(0);
+ return 0;
+}
+
+void
+Conn::run()
+{
+ debug("%s: start", info);
+ conn1();
+ conn0();
+ const unsigned siz = 32 * 1024;
+ for (int i = 0; i < 2; i++) {
+ Copy& copy = this->copy[i];
+ copy.rfd = sockfd[i];
+ copy.wfd = sockfd[1-i];
+ copy.buf = new unsigned char[siz];
+ copy.bufsiz = siz;
+ sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id);
+ copy.thread = NdbThread_Create(copyrun_C, (void**)&copy,
+ 8192, "copyrun", NDB_THREAD_PRIO_LOW);
+ if (copy.thread == 0) {
+ fatal("%s: create thread %d failed errno=%d", i, errno);
+ }
+ }
+ debug("%s: stop", info);
+}
+
+extern "C" void*
+connrun_C(void* conn)
+{
+ ((Conn*) conn)->run();
+ NdbThread_Exit(0);
+ return 0;
+}
+
+static void
+start()
+{
+ NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2);
+ for (unsigned i = 0; i < Conn::count; i++) {
+ Conn& conn = Conn::list[i];
+ if (! conn.proxy)
+ continue;
+ conn.thread = NdbThread_Create(connrun_C, (void**)&conn,
+ 8192, "connrun", NDB_THREAD_PRIO_LOW);
+ if (conn.thread == 0) {
+ fatal("create thread %d failed errno=%d", i, errno);
+ }
+ }
+ sleep(3600);
+}
+
+int
+main(int av, char** ac)
+{
+ debug("start");
+ hostname = "ndb-srv7";
+ if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) {
+ fatal("hostname %s lookup failed", hostname);
+ }
+ localcfgfile = "Ndb.cfg";
+ initcfgfile = "config.txt";
+ getcfg();
+ start();
+ debug("done");
+ return 0;
+}
+
+// vim: set sw=4 noet: