diff options
Diffstat (limited to 'ndb/test/tools/transproxy.cpp')
-rw-r--r-- | ndb/test/tools/transproxy.cpp | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/ndb/test/tools/transproxy.cpp b/ndb/test/tools/transproxy.cpp new file mode 100644 index 00000000000..384a8a34f03 --- /dev/null +++ b/ndb/test/tools/transproxy.cpp @@ -0,0 +1,362 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> + +#include <NdbTCP.h> +#include <NdbOut.hpp> +#include <NdbThread.h> +#include <NdbSleep.h> +#include <Properties.hpp> +#include <LocalConfig.hpp> +#include <Config.hpp> +#include <InitConfigFileParser.hpp> +#include <IPCConfig.hpp> + +static void +fatal(char const* fmt, ...) +{ + va_list ap; + char buf[200]; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + ndbout << "FATAL: " << buf << endl; + sleep(1); + exit(1); +} + +static void +debug(char const* fmt, ...) +{ + va_list ap; + char buf[200]; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + ndbout << buf << endl; +} + +// node +struct Node { + enum Type { MGM = 1, DB = 2, API = 3 }; + Type type; + unsigned id; // node id + static Node* list; + static unsigned count; + static Node* find(unsigned n) { + for (unsigned i = 0; i < count; i++) { + if (list[i].id == n) + return &list[i]; + } + return 0; + } +}; + +unsigned Node::count = 0; +Node* Node::list = 0; + +struct Copy { + int rfd; // read from + int wfd; // write to + unsigned char* buf; + unsigned bufsiz; + NdbThread* thread; + void run(); + char info[20]; +}; + +// connection between nodes 0-server side 1-client side +// we are client to 0 and server to 1 +struct Conn { + Node* node[2]; // the nodes + unsigned port; // server port + unsigned proxy; // proxy port + static unsigned count; + static unsigned proxycount; + static Conn* list; + NdbThread* thread; // thread handling this connection + void run(); // run the connection + int sockfd[2]; // socket 0-on server side 1-client side + void conn0(); // connect to side 0 + void conn1(); // connect to side 0 + char info[20]; + Copy copy[2]; // 0-to-1 and 1-to-0 +}; + +unsigned Conn::count = 0; +unsigned Conn::proxycount = 0; +Conn* Conn::list = 0; + +// global data +static char* hostname = 0; +static struct sockaddr_in hostaddr; +static char* localcfgfile = 0; +static char* initcfgfile = 0; +static unsigned ownnodeid = 0; + +static void +properr(const Properties* props, const char* name, int i = -1) +{ + if (i < 0) { + fatal("get %s failed: errno = %d", name, props->getPropertiesErrno()); + } else { + fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno()); + } +} + +// read config and load it into our structs +static void +getcfg() +{ + LocalConfig lcfg; + if (! lcfg.read(localcfgfile)) { + fatal("read %s failed", localcfgfile); + } + ownnodeid = lcfg._ownNodeId; + debug("ownnodeid = %d", ownnodeid); + InitConfigFileParser pars(initcfgfile); + Config icfg; + if (! pars.getConfig(icfg)) { + fatal("parse %s failed", initcfgfile); + } + Properties* ccfg = icfg.getConfig(ownnodeid); + if (ccfg == 0) { + const char* err = "unknown error"; + fatal("getConfig: %s", err); + } + ccfg->put("NodeId", ownnodeid); + ccfg->put("NodeType", "MGM"); + if (! ccfg->get("NoOfNodes", &Node::count)) { + properr(ccfg, "NoOfNodes", -1); + } + debug("Node::count = %d", Node::count); + Node::list = new Node[Node::count]; + for (unsigned i = 0; i < Node::count; i++) { + Node& node = Node::list[i]; + const Properties* nodecfg; + if (! ccfg->get("Node", 1+i, &nodecfg)) { + properr(ccfg, "Node", 1+i); + } + const char* type; + if (! nodecfg->get("Type", &type)) { + properr(nodecfg, "Type"); + } + if (strcmp(type, "MGM") == 0) { + node.type = Node::MGM; + } else if (strcmp(type, "DB") == 0) { + node.type = Node::DB; + } else if (strcmp(type, "API") == 0) { + node.type = Node::API; + } else { + fatal("prop %s_%d bad Type = %s", "Node", 1+i, type); + } + if (! nodecfg->get("NodeId", &node.id)) { + properr(nodecfg, "NodeId"); + } + debug("node id=%d type=%d", node.id, node.type); + } + IPCConfig ipccfg(ccfg); + if (ipccfg.init() != 0) { + fatal("ipccfg init failed"); + } + if (! ccfg->get("NoOfConnections", &Conn::count)) { + properr(ccfg, "NoOfConnections"); + } + debug("Conn::count = %d", Conn::count); + Conn::list = new Conn[Conn::count]; + for (unsigned i = 0; i < Conn::count; i++) { + Conn& conn = Conn::list[i]; + const Properties* conncfg; + if (! ccfg->get("Connection", i, &conncfg)) { + properr(ccfg, "Connection", i); + } + unsigned n; + if (! conncfg->get("NodeId1", &n)) { + properr(conncfg, "NodeId1"); + } + if ((conn.node[0] = Node::find(n)) == 0) { + fatal("node %d not found", n); + } + if (! conncfg->get("NodeId2", &n)) { + properr(conncfg, "NodeId2"); + } + if ((conn.node[1] = Node::find(n)) == 0) { + fatal("node %d not found", n); + } + if (! conncfg->get("PortNumber", &conn.port)) { + properr(conncfg, "PortNumber"); + } + conn.proxy = 0; + const char* proxy; + if (conncfg->get("Proxy", &proxy)) { + conn.proxy = atoi(proxy); + if (conn.proxy > 0) { + Conn::proxycount++; + } + } + sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id); + } +} + +void +Conn::conn0() +{ + int fd; + while (1) { + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { + fatal("%s: create client socket failed: %s", info, strerror(errno)); + } + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(port); + servaddr.sin_addr = hostaddr.sin_addr; +#if 0 // coredump + if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) { + fatal("%s: hostname %s lookup failed", info, hostname); + } +#endif + if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0) + break; + if (errno != ECONNREFUSED) { + fatal("%s: connect failed: %s", info, strerror(errno)); + } + close(fd); + NdbSleep_MilliSleep(100); + } + sockfd[0] = fd; + debug("%s: side 0 connected", info); +} + +void +Conn::conn1() +{ + int servfd; + if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { + fatal("%s: create server socket failed: %s", info, strerror(errno)); + } + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(proxy); + const int on = 1; + setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)); + if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) { + fatal("%s: bind %d failed: %s", info, proxy, strerror(errno)); + } + if (listen(servfd, 1) == -1) { + fatal("%s: listen %d failed: %s", info, proxy, strerror(errno)); + } + int fd; + if ((fd = accept(servfd, 0, 0)) == -1) { + fatal("%s: accept failed: %s", info, strerror(errno)); + } + sockfd[1] = fd; + close(servfd); + debug("%s: side 1 connected", info); +} + +void +Copy::run() +{ + debug("%s: start", info); + int n, m; + while (1) { + n = read(rfd, buf, sizeof(buf)); + if (n < 0) + fatal("read error: %s", strerror(errno)); + m = write(wfd, buf, n); + if (m != n) + fatal("write error: %s", strerror(errno)); + } + debug("%s: stop", info); +} + +extern "C" void* +copyrun_C(void* copy) +{ + ((Copy*) copy)->run(); + NdbThread_Exit(0); + return 0; +} + +void +Conn::run() +{ + debug("%s: start", info); + conn1(); + conn0(); + const unsigned siz = 32 * 1024; + for (int i = 0; i < 2; i++) { + Copy& copy = this->copy[i]; + copy.rfd = sockfd[i]; + copy.wfd = sockfd[1-i]; + copy.buf = new unsigned char[siz]; + copy.bufsiz = siz; + sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id); + copy.thread = NdbThread_Create(copyrun_C, (void**)©, + 8192, "copyrun", NDB_THREAD_PRIO_LOW); + if (copy.thread == 0) { + fatal("%s: create thread %d failed errno=%d", i, errno); + } + } + debug("%s: stop", info); +} + +extern "C" void* +connrun_C(void* conn) +{ + ((Conn*) conn)->run(); + NdbThread_Exit(0); + return 0; +} + +static void +start() +{ + NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2); + for (unsigned i = 0; i < Conn::count; i++) { + Conn& conn = Conn::list[i]; + if (! conn.proxy) + continue; + conn.thread = NdbThread_Create(connrun_C, (void**)&conn, + 8192, "connrun", NDB_THREAD_PRIO_LOW); + if (conn.thread == 0) { + fatal("create thread %d failed errno=%d", i, errno); + } + } + sleep(3600); +} + +int +main(int av, char** ac) +{ + debug("start"); + hostname = "ndb-srv7"; + if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) { + fatal("hostname %s lookup failed", hostname); + } + localcfgfile = "Ndb.cfg"; + initcfgfile = "config.txt"; + getcfg(); + start(); + debug("done"); + return 0; +} + +// vim: set sw=4 noet: |