diff options
Diffstat (limited to 'ndb/src/cw/cpcd')
-rw-r--r-- | ndb/src/cw/cpcd/APIService.cpp | 385 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/APIService.hpp | 64 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/CPCD.cpp | 435 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/CPCD.hpp | 382 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/Makefile | 11 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/Monitor.cpp | 76 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/Process.cpp | 482 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/common.cpp | 158 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/common.hpp | 35 | ||||
-rw-r--r-- | ndb/src/cw/cpcd/main.cpp | 178 |
10 files changed, 2206 insertions, 0 deletions
diff --git a/ndb/src/cw/cpcd/APIService.cpp b/ndb/src/cw/cpcd/APIService.cpp new file mode 100644 index 00000000000..9cf17addcc2 --- /dev/null +++ b/ndb/src/cw/cpcd/APIService.cpp @@ -0,0 +1,385 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <Parser.hpp> +#include <NdbOut.hpp> +#include <Properties.hpp> +#include <socket_io.h> + +#include "APIService.hpp" +#include "CPCD.hpp" +#include <NdbMutex.h> +#include <NdbStdio.h> +#include <OutputStream.hpp> + +/** + const char * name; + const char * realName; + const Type type; + const ArgType argType; + const ArgRequired argRequired; + const ArgMinMax argMinMax; + const int minVal; + const int maxVal; + void (T::* function)(const class Properties & args); + const char * description; +*/ + +#define CPCD_CMD(name, fun, desc) \ + { name, \ + 0, \ + ParserRow<CPCDAPISession>::Cmd, \ + ParserRow<CPCDAPISession>::String, \ + ParserRow<CPCDAPISession>::Optional, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + 0, 0, \ + fun, \ + desc } + +#define CPCD_ARG(name, type, opt, desc) \ + { name, \ + 0, \ + ParserRow<CPCDAPISession>::Arg, \ + ParserRow<CPCDAPISession>::type, \ + ParserRow<CPCDAPISession>::opt, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + 0, 0, \ + 0, \ + desc } + +#define CPCD_ARG2(name, type, opt, min, max, desc) \ + { name, \ + 0, \ + ParserRow<CPCDAPISession>::Arg, \ + ParserRow<CPCDAPISession>::type, \ + ParserRow<CPCDAPISession>::opt, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + min, max, \ + 0, \ + desc } + +#define CPCD_END() \ + { 0, \ + 0, \ + ParserRow<CPCDAPISession>::Arg, \ + ParserRow<CPCDAPISession>::Int, \ + ParserRow<CPCDAPISession>::Optional, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + 0, 0, \ + 0, \ + 0 } + +#define CPCD_CMD_ALIAS(name, realName, fun) \ + { name, \ + realName, \ + ParserRow<CPCDAPISession>::CmdAlias, \ + ParserRow<CPCDAPISession>::Int, \ + ParserRow<CPCDAPISession>::Optional, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + 0, 0, \ + 0, \ + 0 } + +#define CPCD_ARG_ALIAS(name, realName, fun) \ + { name, \ + realName, \ + ParserRow<CPCDAPISession>::ArgAlias, \ + ParserRow<CPCDAPISession>::Int, \ + ParserRow<CPCDAPISession>::Optional, \ + ParserRow<CPCDAPISession>::IgnoreMinMax, \ + 0, 0, \ + 0, \ + 0 } + +const +ParserRow<CPCDAPISession> commands[] = +{ + CPCD_CMD("define process" , &CPCDAPISession::defineProcess, ""), + CPCD_ARG("id", Int, Optional, "Id of process."), + CPCD_ARG("name", String, Mandatory, "Name of process"), + CPCD_ARG("group", String, Mandatory, "Group of process"), + CPCD_ARG("env", String, Optional, "Environment variables for process"), + CPCD_ARG("path", String, Mandatory, "Path to binary"), + CPCD_ARG("args", String, Optional, "Arguments to process"), + CPCD_ARG("type", String, Mandatory, "Type of process"), + CPCD_ARG("cwd", String, Mandatory, "Working directory of process"), + CPCD_ARG("owner", String, Mandatory, "Owner of process"), + CPCD_ARG("runas", String, Optional, "Run as user"), + CPCD_ARG("stdout", String, Optional, "Redirection of stdout"), + CPCD_ARG("stderr", String, Optional, "Redirection of stderr"), + CPCD_ARG("stdin", String, Optional, "Redirection of stderr"), + CPCD_ARG("ulimit", String, Optional, "ulimit"), + + CPCD_CMD("undefine process", &CPCDAPISession::undefineProcess, ""), + CPCD_CMD_ALIAS("undef", "undefine process", 0), + CPCD_ARG("id", Int, Mandatory, "Id of process"), + CPCD_ARG_ALIAS("i", "id", 0), + + CPCD_CMD("start process", &CPCDAPISession::startProcess, ""), + CPCD_ARG("id", Int, Mandatory, "Id of process"), + + CPCD_CMD("stop process", &CPCDAPISession::stopProcess, ""), + CPCD_ARG("id", Int, Mandatory, "Id of process"), + + CPCD_CMD("list processes", &CPCDAPISession::listProcesses, ""), + + CPCD_END() +}; +CPCDAPISession::CPCDAPISession(NDB_SOCKET_TYPE sock, + CPCD & cpcd) + : SocketServer::Session(sock) + , m_cpcd(cpcd) +{ + m_input = new SocketInputStream(sock); + m_output = new SocketOutputStream(sock); + m_parser = new Parser<CPCDAPISession>(commands, *m_input, true, true, true); +} + +CPCDAPISession::CPCDAPISession(FILE * f, CPCD & cpcd) + : SocketServer::Session(1) + , m_cpcd(cpcd) +{ + m_input = new FileInputStream(f); + m_parser = new Parser<CPCDAPISession>(commands, *m_input, true, true, true); +} + +CPCDAPISession::~CPCDAPISession() { + delete m_input; + delete m_parser; +} + +void +CPCDAPISession::runSession(){ + Parser_t::Context ctx; + while(!m_stop){ + m_parser->run(ctx, * this); + if(ctx.m_currentToken == 0) + break; + + switch(ctx.m_status){ + case Parser_t::Ok: + for(size_t i = 0; i<ctx.m_aliasUsed.size(); i++) + ndbout_c("Used alias: %s -> %s", + ctx.m_aliasUsed[i]->name, ctx.m_aliasUsed[i]->realName); + break; + case Parser_t::NoLine: + case Parser_t::EmptyLine: + break; + default: + break; + } + } + NDB_CLOSE_SOCKET(m_socket); +} + +void +CPCDAPISession::stopSession(){ + CPCD::RequestStatus rs; + for(size_t i = 0; i<m_temporaryProcesses.size(); i++){ + Uint32 id = m_temporaryProcesses[i]; + m_cpcd.undefineProcess(&rs, id); + } +} + +void +CPCDAPISession::loadFile(){ + Parser_t::Context ctx; + while(!m_stop){ + m_parser->run(ctx, * this); + if(ctx.m_currentToken == 0) + break; + + switch(ctx.m_status){ + case Parser_t::Ok: + for(size_t i = 0; i<ctx.m_aliasUsed.size(); i++) + ndbout_c("Used alias: %s -> %s", + ctx.m_aliasUsed[i]->name, ctx.m_aliasUsed[i]->realName); + break; + case Parser_t::NoLine: + case Parser_t::EmptyLine: + break; + default: + break; + } + } +} + +static const int g_TimeOut = 1000; + +void +CPCDAPISession::defineProcess(Parser_t::Context & /* unused */, + const class Properties & args){ + + CPCD::Process * p = new CPCD::Process(args, &m_cpcd); + + CPCD::RequestStatus rs; + + bool ret = m_cpcd.defineProcess(&rs, p); + if(!m_cpcd.loadingProcessList) { + m_output->println("define process"); + m_output->println("status: %d", rs.getStatus()); + if(ret == true){ + m_output->println("id: %d", p->m_id); + if(p->m_processType == TEMPORARY){ + m_temporaryProcesses.push_back(p->m_id); + } + } else { + m_output->println("errormessage: %s", rs.getErrMsg()); + } + m_output->println(""); + } +} + +void +CPCDAPISession::undefineProcess(Parser_t::Context & /* unused */, + const class Properties & args){ + Uint32 id; + CPCD::RequestStatus rs; + + args.get("id", &id); + bool ret = m_cpcd.undefineProcess(&rs, id); + + m_output->println("undefine process"); + m_output->println("id: %d", id); + m_output->println("status: %d", rs.getStatus()); + if(!ret) + m_output->println("errormessage: %s", rs.getErrMsg()); + + m_output->println(""); +} + +void +CPCDAPISession::startProcess(Parser_t::Context & /* unused */, + const class Properties & args){ + Uint32 id; + CPCD::RequestStatus rs; + + args.get("id", &id); + const int ret = m_cpcd.startProcess(&rs, id); + + if(!m_cpcd.loadingProcessList) { + m_output->println("start process"); + m_output->println("id: %d", id); + m_output->println("status: %d", rs.getStatus()); + if(!ret) + m_output->println("errormessage: %s", rs.getErrMsg()); + m_output->println(""); + } +} + +void +CPCDAPISession::stopProcess(Parser_t::Context & /* unused */, + const class Properties & args){ + Uint32 id; + CPCD::RequestStatus rs; + + args.get("id", &id); + int ret = m_cpcd.stopProcess(&rs, id); + + m_output->println("stop process"); + m_output->println("id: %d", id); + m_output->println("status: %d", rs.getStatus()); + if(!ret) + m_output->println("errormessage: %s", rs.getErrMsg()); + + m_output->println(""); +} + +static const char * +propToString(Properties *prop, const char *key) { + static char buf[32]; + const char *retval = NULL; + PropertiesType pt; + + prop->getTypeOf(key, &pt); + switch(pt) { + case PropertiesType_Uint32: + Uint32 val; + prop->get(key, &val); + snprintf(buf, sizeof buf, "%d", val); + retval = buf; + break; + case PropertiesType_char: + const char *str; + prop->get(key, &str); + retval = str; + break; + default: + snprintf(buf, sizeof buf, "(unknown)"); + retval = buf; + } + return retval; +} + +void +CPCDAPISession::printProperty(Properties *prop, const char *key) { + m_output->println("%s: %s", key, propToString(prop, key)); +} + +void +CPCDAPISession::listProcesses(Parser_t::Context & /* unused */, + const class Properties & /* unused */){ + m_cpcd.m_processes.lock(); + MutexVector<CPCD::Process *> *proclist = m_cpcd.getProcessList(); + + m_output->println("start processes"); + m_output->println(""); + + + for(size_t i = 0; i < proclist->size(); i++) { + CPCD::Process *p = (*proclist)[i]; + + m_output->println("process"); + + m_output->println("id: %d", p->m_id); + m_output->println("name: %s", p->m_name.c_str()); + m_output->println("path: %s", p->m_path.c_str()); + m_output->println("args: %s", p->m_args.c_str()); + m_output->println("type: %s", p->m_type.c_str()); + m_output->println("cwd: %s", p->m_cwd.c_str()); + m_output->println("env: %s", p->m_env.c_str()); + m_output->println("owner: %s", p->m_owner.c_str()); + m_output->println("group: %s", p->m_group.c_str()); + m_output->println("runas: %s", p->m_runas.c_str()); + m_output->println("stdin: %s", p->m_stdin.c_str()); + m_output->println("stdout: %s", p->m_stdout.c_str()); + m_output->println("stderr: %s", p->m_stderr.c_str()); + m_output->println("ulimit: %s", p->m_ulimit.c_str()); + switch(p->m_status){ + case STOPPED: + m_output->println("status: stopped"); + break; + case STARTING: + m_output->println("status: starting"); + break; + case RUNNING: + m_output->println("status: running"); + break; + case STOPPING: + m_output->println("status: stopping"); + break; + } + + m_output->println(""); + + } + + m_output->println("end processes"); + m_output->println(""); + + m_cpcd.m_processes.unlock(); +} diff --git a/ndb/src/cw/cpcd/APIService.hpp b/ndb/src/cw/cpcd/APIService.hpp new file mode 100644 index 00000000000..ef988785f89 --- /dev/null +++ b/ndb/src/cw/cpcd/APIService.hpp @@ -0,0 +1,64 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CPCD_API_HPP +#define CPCD_API_HPP + +#include <Parser.hpp> +#include <InputStream.hpp> +#include <SocketServer.hpp> + +class CPCD; + +class CPCDAPISession : public SocketServer::Session { + typedef Parser<CPCDAPISession> Parser_t; + + class CPCD & m_cpcd; + InputStream *m_input; + OutputStream *m_output; + Parser_t *m_parser; + + Vector<int> m_temporaryProcesses; + + void printProperty(Properties *prop, const char *key); +public: + CPCDAPISession(NDB_SOCKET_TYPE, class CPCD &); + CPCDAPISession(FILE * f, CPCD & cpcd); + ~CPCDAPISession(); + + virtual void runSession(); + virtual void stopSession(); + void loadFile(); + + void defineProcess(Parser_t::Context & ctx, const class Properties & args); + void undefineProcess(Parser_t::Context & ctx, const class Properties & args); + void startProcess(Parser_t::Context & ctx, const class Properties & args); + void stopProcess(Parser_t::Context & ctx, const class Properties & args); + void showProcess(Parser_t::Context & ctx, const class Properties & args); + void listProcesses(Parser_t::Context & ctx, const class Properties & args); +}; + +class CPCDAPIService : public SocketServer::Service { + class CPCD & m_cpcd; +public: + CPCDAPIService(class CPCD & cpcd) : m_cpcd(cpcd) {} + + CPCDAPISession * newSession(NDB_SOCKET_TYPE theSock){ + return new CPCDAPISession(theSock, m_cpcd); + } +}; + +#endif diff --git a/ndb/src/cw/cpcd/CPCD.cpp b/ndb/src/cw/cpcd/CPCD.cpp new file mode 100644 index 00000000000..8864ccf6e4e --- /dev/null +++ b/ndb/src/cw/cpcd/CPCD.cpp @@ -0,0 +1,435 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <string.h> +#include <NdbOut.hpp> +#include <NdbUnistd.h> +#include <NdbStdio.h> +#include <errno.h> + +#include "APIService.hpp" +#include "CPCD.hpp" +#include <NdbMutex.h> + +#include "common.hpp" + +extern const ParserRow<CPCDAPISession> commands[]; + + +CPCD::CPCD() { + loadingProcessList = false; + m_processes.clear(); + m_monitor = NULL; + m_monitor = new Monitor(this); + m_procfile = "ndb_cpcd.db"; +} + +CPCD::~CPCD() { + if(m_monitor != NULL) { + delete m_monitor; + m_monitor = NULL; + } +} + +int +CPCD::findUniqueId() { + int id; + bool ok = false; + m_processes.lock(); + + while(!ok) { + ok = true; + id = random() % 8192; /* Don't want so big numbers */ + + if(id == 0) + ok = false; + + for(size_t i = 0; i<m_processes.size(); i++) { + if(m_processes[i]->m_id == id) + ok = false; + } + } + m_processes.unlock(); + return id; +} + +bool +CPCD::defineProcess(RequestStatus * rs, Process * arg){ + if(arg->m_id == -1) + arg->m_id = findUniqueId(); + + Guard tmp(m_processes); + + for(size_t i = 0; i<m_processes.size(); i++) { + Process * proc = m_processes[i]; + + if((strcmp(arg->m_name.c_str(), proc->m_name.c_str()) == 0) && + (strcmp(arg->m_group.c_str(), proc->m_group.c_str()) == 0)) { + /* Identical names in the same group */ + rs->err(AlreadyExists, "Name already exists"); + return false; + } + + if(arg->m_id == proc->m_id) { + /* Identical ID numbers */ + rs->err(AlreadyExists, "Id already exists"); + return false; + } + } + + m_processes.push_back(arg, false); + + notifyChanges(); + report(arg->m_id, CPCEvent::ET_PROC_USER_DEFINE); + + return true; +} + +bool +CPCD::undefineProcess(CPCD::RequestStatus *rs, int id) { + + Guard tmp(m_processes); + + Process * proc = 0; + size_t i; + for(i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case RUNNING: + case STOPPED: + case STOPPING: + case STARTING: + proc->stop(); + m_processes.erase(i, false /* Already locked */); + } + + + notifyChanges(); + + report(id, CPCEvent::ET_PROC_USER_UNDEFINE); + + return true; +} + +bool +CPCD::startProcess(CPCD::RequestStatus *rs, int id) { + + Process * proc = 0; + { + + Guard tmp(m_processes); + + for(size_t i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case STOPPED: + proc->m_status = STARTING; + if(proc->start() != 0){ + rs->err(Error, "Failed to start"); + return false; + } + break; + case STARTING: + rs->err(Error, "Already starting"); + return false; + case RUNNING: + rs->err(Error, "Already started"); + return false; + case STOPPING: + rs->err(Error, "Currently stopping"); + return false; + } + + notifyChanges(); + } + report(id, CPCEvent::ET_PROC_USER_START); + + return true; +} + +bool +CPCD::stopProcess(CPCD::RequestStatus *rs, int id) { + + Guard tmp(m_processes); + + Process * proc = 0; + for(size_t i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case STARTING: + case RUNNING: + proc->stop(); + break; + case STOPPED: + rs->err(AlreadyStopped, "Already stopped"); + return false; + break; + case STOPPING: + rs->err(Error, "Already stopping"); + return false; + } + + notifyChanges(); + + report(id, CPCEvent::ET_PROC_USER_START); + + return true; +} + +bool +CPCD::notifyChanges() { + bool ret = true; + if(!loadingProcessList) + ret = saveProcessList(); + + m_monitor->signal(); + + return ret; +} + +/* Must be called with m_processlist locked */ +bool +CPCD::saveProcessList(){ + char newfile[PATH_MAX+4]; + char oldfile[PATH_MAX+4]; + char curfile[PATH_MAX]; + FILE *f; + + /* Create the filenames that we will use later */ + snprintf(newfile, sizeof(newfile), "%s.new", m_procfile.c_str()); + snprintf(oldfile, sizeof(oldfile), "%s.old", m_procfile.c_str()); + snprintf(curfile, sizeof(curfile), "%s", m_procfile.c_str()); + + f = fopen(newfile, "w"); + + if(f == NULL) { + /* XXX What should be done here? */ + logger.critical("Cannot open `%s': %s\n", newfile, strerror(errno)); + return false; + } + + for(size_t i = 0; i<m_processes.size(); i++){ + m_processes[i]->print(f); + fprintf(f, "\n"); + + if(m_processes[i]->m_processType == TEMPORARY){ + /** + * Interactive process should never be "restarted" on cpcd restart + */ + continue; + } + + if(m_processes[i]->m_status == RUNNING || + m_processes[i]->m_status == STARTING){ + fprintf(f, "start process\nid: %d\n\n", m_processes[i]->m_id); + } + } + + fclose(f); + f = NULL; + + /* This will probably only work on reasonably Unix-like systems. You have + * been warned... + * + * The motivation behind all this link()ing is that the daemon might + * crash right in the middle of updating the configuration file, and in + * that case we want to be sure that the old file is around until we are + * guaranteed that there is always at least one copy of either the old or + * the new configuration file left. + */ + + /* Remove an old config file if it exists */ + unlink(oldfile); + + if(link(curfile, oldfile) != 0) /* make a backup of the running config */ + logger.error("Cannot rename '%s' -> '%s'", curfile, oldfile); + else { + if(unlink(curfile) != 0) { /* remove the running config file */ + logger.critical("Cannot remove file '%s'", curfile); + return false; + } + } + + if(link(newfile, curfile) != 0) { /* put the new config file in place */ + printf("-->%d\n", __LINE__); + + logger.critical("Cannot rename '%s' -> '%s': %s", + curfile, newfile, strerror(errno)); + return false; + } + + /* XXX Ideally we would fsync() the directory here, but I'm not sure if + * that actually works. + */ + + unlink(newfile); /* remove the temporary file */ + unlink(oldfile); /* remove the old file */ + + logger.info("Process list saved as '%s'", curfile); + + return true; +} + +bool +CPCD::loadProcessList(){ + BaseString secondfile; + FILE *f; + + loadingProcessList = true; + + secondfile.assfmt("%s.new", m_procfile.c_str()); + + /* Try to open the config file */ + f = fopen(m_procfile.c_str(), "r"); + + /* If it did not exist, try to open the backup. See the saveProcessList() + * method for an explanation why it is done this way. + */ + if(f == NULL) { + f = fopen(secondfile.c_str(), "r"); + + if(f == NULL) { + /* XXX What to do here? */ + logger.info("Configuration file `%s' not found", + m_procfile.c_str()); + logger.info("Starting with empty configuration"); + loadingProcessList = false; + return false; + } else { + logger.info("Configuration file `%s' missing", + m_procfile.c_str()); + logger.info("Backup configuration file `%s' is used", + secondfile.c_str()); + /* XXX Maybe we should just rename the backup file to the official + * name, and be done with it? + */ + } + } + + CPCDAPISession sess(f, *this); + sess.loadFile(); + loadingProcessList = false; + + Vector<int> temporary; + for(size_t i = 0; i<m_processes.size(); i++){ + Process * proc = m_processes[i]; + proc->readPid(); + if(proc->m_processType == TEMPORARY){ + temporary.push_back(proc->m_id); + } + } + + for(size_t i = 0; i<temporary.size(); i++){ + RequestStatus rs; + undefineProcess(&rs, temporary[i]); + } + + /* Don't call notifyChanges here, as that would save the file we just + loaded */ + m_monitor->signal(); + return true; +} + +MutexVector<CPCD::Process *> * +CPCD::getProcessList() { + return &m_processes; +} + +void +CPCD::RequestStatus::err(enum RequestStatusCode status, char *msg) { + m_status = status; + snprintf(m_errorstring, sizeof(m_errorstring), "%s", msg); +} + +#if 0 +void +CPCD::sigchild(int pid){ + m_processes.lock(); + for(size_t i = 0; i<m_processes.size(); i++){ + if(m_processes[i].m_pid == pid){ + } + } + wait(pid, 0, 0); +} +#endif + + /** Register event subscriber */ +void +CPCD::do_register(EventSubscriber * sub){ + m_subscribers.lock(); + m_subscribers.push_back(sub, false); + m_subscribers.unlock(); +} + +EventSubscriber* +CPCD::do_unregister(EventSubscriber * sub){ + m_subscribers.lock(); + + for(size_t i = 0; i<m_subscribers.size(); i++){ + if(m_subscribers[i] == sub){ + m_subscribers.erase(i); + m_subscribers.unlock(); + return sub; + } + } + + m_subscribers.unlock(); + return 0; +} + +void +CPCD::report(int id, CPCEvent::EventType t){ + CPCEvent e; + e.m_time = time(0); + e.m_proc = id; + e.m_type = t; + m_subscribers.lock(); + for(size_t i = 0; i<m_subscribers.size(); i++){ + (* m_subscribers[i]).report(e); + } + m_subscribers.unlock(); +} diff --git a/ndb/src/cw/cpcd/CPCD.hpp b/ndb/src/cw/cpcd/CPCD.hpp new file mode 100644 index 00000000000..4a7cab23bab --- /dev/null +++ b/ndb/src/cw/cpcd/CPCD.hpp @@ -0,0 +1,382 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CPCD_HPP +#define CPCD_HPP + +#include <Vector.hpp> +#include <Properties.hpp> +#include <NdbOut.hpp> +#include <NdbThread.h> +#include <NdbCondition.h> +#include <BaseString.hpp> + +/* XXX Need to figure out how to do this for non-Unix systems */ +#define CPCD_DEFAULT_WORK_DIR "/var/run/ndb_cpcd" +#define CPCD_DEFAULT_PROC_FILE "ndb_cpcd.conf" +#define CPCD_DEFAULT_TCP_PORT 1234 +#define CPCD_DEFAULT_POLLING_INTERVAL 5 /* seconds */ +#define CPCD_DEFAULT_CONFIG_FILE "/etc/ndb_cpcd.conf" + +enum ProcessStatus { + STOPPED = 0, + STARTING = 1, + RUNNING = 2, + STOPPING = 3 +}; + +enum ProcessType { + PERMANENT = 0, + TEMPORARY = 1 +}; + +struct CPCEvent { + enum EventType { + ET_USER_CONNECT, + ET_USER_DISCONNECT, + + ET_PROC_USER_DEFINE, // Defined proc + ET_PROC_USER_UNDEFINE, // Undefined proc + ET_PROC_USER_START, // Proc ordered to start + ET_PROC_USER_STOP, // Proc ordered to stop + ET_PROC_STATE_RUNNING, // exec returned(?) ok + ET_PROC_STATE_STOPPED // detected that proc is ! running + }; + + int m_proc; + time_t m_time; + EventType m_type; +}; + +struct EventSubscriber { + virtual void report(const CPCEvent &) = 0; +}; + +/** + * @brief Error codes for CPCD requests + */ +enum RequestStatusCode { + OK = 0, ///< Everything OK + Error = 1, ///< Generic error + AlreadyExists = 2, ///< Entry already exists in list + NotExists = 3, ///< Entry does not exist in list + AlreadyStopped = 4 +}; + +/** + * @class CPCD + * @brief Manages processes, letting them be controlled with a TCP connection. + * + * The class implementing the Cluster Process Control Daemon + */ +class CPCD { +public: + /** @brief Describes the status of a client request */ + class RequestStatus { + public: + /** @brief Constructs an empty RequestStatus */ + RequestStatus() { m_status = OK; m_errorstring[0] = '\0'; }; + + /** @brief Sets an errorcode and a printable message */ + void err(enum RequestStatusCode, char *); + + /** @brief Returns the error message */ + char *getErrMsg() { return m_errorstring; }; + + /** @brief Returns the error code */ + enum RequestStatusCode getStatus() { return m_status; }; + private: + enum RequestStatusCode m_status; + char m_errorstring[256]; + }; + /** + * @brief Manages a process + */ + class Process { + int m_pid; + public: + /** + * @brief Constructs and empty Process + */ + Process(const Properties & props, class CPCD *cpcd); + /** + * @brief Monitors the process + * + * The process is started or stopped as needed. + */ + void monitor(); + + /** + * @brief Checks if the process is running or not + * + * @return + * - true if the process is running, + * - false if the process is not running + */ + bool isRunning(); + + /** @brief Starts the process */ + int start(); + + /** @brief Stops the process */ + void stop(); + + /** + * @brief Reads the pid from stable storage + * + * @return The pid number + */ + int readPid(); + + /** + * @brief Writes the pid from stable storage + * + * @return + * - 0 if successful + - -1 and sets errno if an error occured + */ + int writePid(int pid); + + /** + * @brief Prints a textual description of the process on a file + */ + void print(FILE *); + + /** Id number of the Process. + * + * @note This is not the same as a pid. This number is used in the + * protocol, and will not be changed if a processes is restarted. + */ + int m_id; + + /** @brief The name shown to the user */ + BaseString m_name; + + /** @brief Used to group a number of processes */ + BaseString m_group; + + /** @brief Environment variables + * + * Environmentvariables to add for the process. + * + * @note + * - The environment cpcd started with is preserved + * - There is no way to delete variables + */ + BaseString m_env; + + /** @brief Path to the binary to run */ + BaseString m_path; + + /** @brief Arguments to the process. + * + * @note + * - This includes argv[0]. + * - If no argv[0] is given, argv[0] will be set to m_path. + */ + BaseString m_args; + + /** + * @brief Type of process + * + * Either set to "interactive" or "permanent". + */ + BaseString m_type; + ProcessType m_processType; + + /** + * @brief Working directory + * + * Working directory the process will start in. + */ + BaseString m_cwd; + + /** + * @brief Owner of the process. + * + * @note This will not affect the process' uid or gid; + * it is only used for managemental purposes. + * @see m_runas + */ + BaseString m_owner; + + /** + * @bried Run as + * @note This affects uid + * @see m_owner + */ + BaseString m_runas; + + /** + * @brief redirection for stdin + */ + BaseString m_stdin; + + /** + * @brief redirection for stdout + */ + BaseString m_stdout; + + /** + * @brief redirection for stderr + */ + BaseString m_stderr; + + /** @brief Status of the process */ + enum ProcessStatus m_status; + + /** + * @brief ulimits for process + * @desc Format c:unlimited d:0 ... + */ + BaseString m_ulimit; + private: + class CPCD *m_cpcd; + void do_exec(); + }; + + /** + * @brief Starts and stops processes as needed + * + * At a specified interval (default 5 seconds) calls the monitor function + * of all the processes in the CPCDs list, causing the to start or + * stop, depending on the configuration. + */ + class Monitor { + public: + /** Creates a new CPCD::Monitor object, connected to the specified + * CPCD. + * A new thread will be created, which will poll the processes of + * the CPCD at the specifed interval. + */ + Monitor(CPCD *cpcd, int poll = CPCD_DEFAULT_POLLING_INTERVAL); + + /** Stops the monitor, but does not stop the processes */ + ~Monitor(); + + /** Runs the monitor thread. */ + void run(); + + /** Signals configuration changes to the monitor thread, causing it to + * do the check without waiting for the timeout */ + void signal(); + private: + class CPCD *m_cpcd; + struct NdbThread *m_monitorThread; + bool m_monitorThreadQuitFlag; + struct NdbCondition *m_changeCondition; + NdbMutex *m_changeMutex; + int m_pollingInterval; /* seconds */ + }; + + /** @brief Constructs a CPCD object */ + CPCD(); + + /** + * @brief Destroys a CPCD object, + * but does not stop the processes it manages + */ + ~CPCD(); + + /** Adds a Process to the CPCDs list of managed Processes. + * + * @note The process will not be started until it is explicitly + * marked as running with CPCD::startProcess(). + * + * @return + * - true if the addition was successful, + * - false if not + * - RequestStatus will be filled in with a suitable error + * if an error occured. + */ + bool defineProcess(RequestStatus *rs, Process * arg); + + /** Removes a Process from the CPCD. + * + * @note A Process that is running cannot be removed. + * + * @return + * - true if the removal was successful, + * - false if not + * - The RequestStatus will be filled in with a suitable error + * if an error occured. + */ + bool undefineProcess(RequestStatus *rs, int id); + + /** Marks a Process for starting. + * + * @note The fact that a process has started does not mean it will actually + * start properly. This command only makes sure the CPCD will + * try to start it. + * + * @return + * - true if the marking was successful + * - false if not + * - RequestStatus will be filled in with a suitable error + * if an error occured. + */ + bool startProcess(RequestStatus *rs, int id); + + /** Marks a Process for stopping. + * + * @return + * - true if the marking was successful + * - false if not + * - The RequestStatus will be filled in with a suitable error + * if an error occured. + */ + bool stopProcess(RequestStatus *rs, int id); + + /** Generates a list of processes, and sends them to the CPCD client */ + bool listProcesses(RequestStatus *rs, MutexVector<const char *> &); + + /** Set to true while the CPCD is reading the configuration file */ + bool loadingProcessList; + + /** Saves the list of Processes and their status to the configuration file. + * Called whenever the configuration is changed. + */ + bool saveProcessList(); + + /** Loads the list of Processes and their status from the configuration + * file. + * @note This function should only be called when the CPCD is starting, + * calling it at other times will cause unspecified behaviour. + */ + bool loadProcessList(); + + /** Returns the list of processes */ + MutexVector<Process *> *getProcessList(); + + /** The list of processes. Should not be used directly */ + MutexVector<Process *> m_processes; + + /** Register event subscriber */ + void do_register(EventSubscriber * sub); + EventSubscriber* do_unregister(EventSubscriber * sub); + +private: + friend class Process; + bool notifyChanges(); + int findUniqueId(); + BaseString m_procfile; + Monitor *m_monitor; + + void report(int id, CPCEvent::EventType); + MutexVector<EventSubscriber *> m_subscribers; +}; + +#endif diff --git a/ndb/src/cw/cpcd/Makefile b/ndb/src/cw/cpcd/Makefile new file mode 100644 index 00000000000..f214fb087d2 --- /dev/null +++ b/ndb/src/cw/cpcd/Makefile @@ -0,0 +1,11 @@ +include .defs.mk + +TYPE := util +BIN_TARGET := ndb_cpcd + +# Source files of non-templated classes (.cpp files) +SOURCES = main.cpp CPCD.cpp Process.cpp APIService.cpp Monitor.cpp common.cpp + +BIN_TARGET_LIBS += logger + +include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/cw/cpcd/Monitor.cpp b/ndb/src/cw/cpcd/Monitor.cpp new file mode 100644 index 00000000000..a96f3509ee8 --- /dev/null +++ b/ndb/src/cw/cpcd/Monitor.cpp @@ -0,0 +1,76 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <NdbThread.h> +#include <NdbOut.hpp> +#include <NdbUnistd.h> +#include <NdbSleep.h> + +#include "CPCD.hpp" +#include "common.hpp" + +static void * +monitor_thread_create_wrapper(void * arg) { + CPCD::Monitor *mon = (CPCD::Monitor *)arg; + mon->run(); + return NULL; +} + +CPCD::Monitor::Monitor(CPCD *cpcd, int poll) { + m_cpcd = cpcd; + m_pollingInterval = poll; + m_changeCondition = NdbCondition_Create(); + m_changeMutex = NdbMutex_Create(); + m_monitorThread = NdbThread_Create(monitor_thread_create_wrapper, + (NDB_THREAD_ARG*) this, + 32768, + "ndb_cpcd_monitor", + NDB_THREAD_PRIO_MEAN); + m_monitorThreadQuitFlag = false; +} + +CPCD::Monitor::~Monitor() { + NdbThread_Destroy(&m_monitorThread); + NdbCondition_Destroy(m_changeCondition); + NdbMutex_Destroy(m_changeMutex); +} + +void +CPCD::Monitor::run() { + while(1) { + NdbMutex_Lock(m_changeMutex); + NdbCondition_WaitTimeout(m_changeCondition, + m_changeMutex, + m_pollingInterval * 1000); + + MutexVector<CPCD::Process *> &proc = *m_cpcd->getProcessList(); + + proc.lock(); + + for(size_t i = 0; i < proc.size(); i++) { + proc[i]->monitor(); + } + + proc.unlock(); + + NdbMutex_Unlock(m_changeMutex); + } +} + +void +CPCD::Monitor::signal() { + NdbCondition_Signal(m_changeCondition); +} diff --git a/ndb/src/cw/cpcd/Process.cpp b/ndb/src/cw/cpcd/Process.cpp new file mode 100644 index 00000000000..01a63a5c653 --- /dev/null +++ b/ndb/src/cw/cpcd/Process.cpp @@ -0,0 +1,482 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <sys/types.h> +#include <signal.h> + +#include <assert.h> +#include <stdlib.h> + +#include <NdbUnistd.h> +#include <BaseString.hpp> +#include <InputStream.hpp> + +#include "common.hpp" +#include "CPCD.hpp" + +#include <pwd.h> +#include <sys/types.h> +#include <unistd.h> +#include <sys/stat.h> +#include <sys/resource.h> + +void +CPCD::Process::print(FILE * f){ + fprintf(f, "define process\n"); + fprintf(f, "id: %d\n", m_id); + fprintf(f, "name: %s\n", m_name.c_str() ? m_name.c_str() : ""); + fprintf(f, "group: %s\n", m_group.c_str() ? m_group.c_str() : ""); + fprintf(f, "env: %s\n", m_env.c_str() ? m_env.c_str() : ""); + fprintf(f, "path: %s\n", m_path.c_str() ? m_path.c_str() : ""); + fprintf(f, "args: %s\n", m_args.c_str() ? m_args.c_str() : ""); + fprintf(f, "type: %s\n", m_type.c_str() ? m_type.c_str() : ""); + fprintf(f, "cwd: %s\n", m_cwd.c_str() ? m_cwd.c_str() : ""); + fprintf(f, "owner: %s\n", m_owner.c_str() ? m_owner.c_str() : ""); + fprintf(f, "runas: %s\n", m_runas.c_str() ? m_runas.c_str() : ""); + fprintf(f, "stdin: %s\n", m_stdin.c_str() ? m_stdin.c_str() : ""); + fprintf(f, "stdout: %s\n", m_stdout.c_str() ? m_stdout.c_str() : ""); + fprintf(f, "stderr: %s\n", m_stderr.c_str() ? m_stderr.c_str() : ""); + fprintf(f, "ulimit: %s\n", m_ulimit.c_str() ? m_ulimit.c_str() : ""); +} + +CPCD::Process::Process(const Properties & props, class CPCD *cpcd) { + m_id = -1; + m_pid = -1; + props.get("id", (Uint32 *) &m_id); + props.get("name", m_name); + props.get("group", m_group); + props.get("env", m_env); + props.get("path", m_path); + props.get("args", m_args); + props.get("cwd", m_cwd); + props.get("owner", m_owner); + props.get("type", m_type); + props.get("runas", m_runas); + + props.get("stdin", m_stdin); + props.get("stdout", m_stdout); + props.get("stderr", m_stderr); + props.get("ulimit", m_ulimit); + m_status = STOPPED; + + if(strcasecmp(m_type.c_str(), "temporary") == 0){ + m_processType = TEMPORARY; + } else { + m_processType = PERMANENT; + } + + m_cpcd = cpcd; +} + +void +CPCD::Process::monitor() { + switch(m_status) { + case STARTING: + break; + case RUNNING: + if(!isRunning()){ + m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_STOPPED); + if(m_processType == TEMPORARY){ + m_status = STOPPED; + } else { + start(); + } + } + break; + case STOPPED: + assert(!isRunning()); + break; + case STOPPING: + break; + } +} + +bool +CPCD::Process::isRunning() { + + if(m_pid <= 1){ + logger.critical("isRunning(%d) invalid pid: %d", m_id, m_pid); + return false; + } + /* Check if there actually exists a process with such a pid */ + errno = 0; + int s = kill((pid_t) m_pid, 0); /* Sending "signal" 0 to a process only + * checkes if the process actually exists */ + if(s != 0) { + switch(errno) { + case EPERM: + logger.critical("Not enough privileges to control pid %d\n", m_pid); + break; + case ESRCH: + /* The pid in the file does not exist, which probably means that it + has died, or the file contains garbage for some other reason */ + break; + default: + logger.critical("Cannot not control pid %d: %s\n", m_pid, strerror(errno)); + break; + } + return false; + } + + return true; +} + +int +CPCD::Process::readPid() { + if(m_pid != -1){ + logger.critical("Reading pid while != -1(%d)", m_pid); + return m_pid; + } + + char filename[PATH_MAX*2+1]; + char buf[1024]; + FILE *f; + + memset(buf, 0, sizeof(buf)); + + snprintf(filename, sizeof(filename), "%d", m_id); + + f = fopen(filename, "r"); + + if(f == NULL){ + logger.debug("readPid - %s not found", filename); + return -1; /* File didn't exist */ + } + + errno = 0; + size_t r = fread(buf, 1, sizeof(buf), f); + fclose(f); + if(r > 0) + m_pid = strtol(buf, (char **)NULL, 0); + + if(errno == 0){ + return m_pid; + } + + return -1; +} + +int +CPCD::Process::writePid(int pid) { + char tmpfilename[PATH_MAX+1+4+8]; + char filename[PATH_MAX*2+1]; + FILE *f; + + snprintf(tmpfilename, sizeof(tmpfilename), "tmp.XXXXXX"); + snprintf(filename, sizeof(filename), "%d", m_id); + + int fd = mkstemp(tmpfilename); + if(fd < 0) { + logger.error("Cannot open `%s': %s\n", tmpfilename, strerror(errno)); + return -1; /* Couldn't open file */ + } + + f = fdopen(fd, "w"); + + if(f == NULL) { + logger.error("Cannot open `%s': %s\n", tmpfilename, strerror(errno)); + return -1; /* Couldn't open file */ + } + + fprintf(f, "%d", pid); + fclose(f); + + if(rename(tmpfilename, filename) == -1){ + logger.error("Unable to rename from %s to %s", tmpfilename, filename); + return -1; + } + return 0; +} + +static void +setup_environment(const char *env) { + char **p; + p = BaseString::argify("", env); + for(int i = 0; p[i] != NULL; i++){ + /*int res = */ putenv(p[i]); + } +} + +static +int +set_ulimit(const BaseString & pair){ + errno = 0; + do { + Vector<BaseString> list; + pair.split(list, ":"); + if(list.size() != 2){ + break; + } + + int resource = 0; + rlim_t value = RLIM_INFINITY; + if(!(list[1].trim() == "unlimited")){ + value = atoi(list[1].c_str()); + } + if(list[0].trim() == "c"){ + resource = RLIMIT_CORE; + } else if(list[0] == "d"){ + resource = RLIMIT_DATA; + } else if(list[0] == "f"){ + resource = RLIMIT_FSIZE; + } else if(list[0] == "n"){ + resource = RLIMIT_NOFILE; + } else if(list[0] == "s"){ + resource = RLIMIT_STACK; + } else if(list[0] == "t"){ + resource = RLIMIT_CPU; + } else { + errno = EINVAL; + break; + } + struct rlimit rlp; + if(getrlimit(resource, &rlp) != 0){ + break; + } + + rlp.rlim_cur = value; + if(setrlimit(resource, &rlp) != 0){ + break; + } + return 0; + } while(false); + logger.error("Unable to process ulimit: %s(%s)", + pair.c_str(), strerror(errno)); + return -1; +} + +void +CPCD::Process::do_exec() { + + setup_environment(m_env.c_str()); + + char **argv = BaseString::argify(m_path.c_str(), m_args.c_str()); + + if(strlen(m_cwd.c_str()) > 0) { + int err = chdir(m_cwd.c_str()); + if(err == -1) { + BaseString err; + logger.error("%s: %s\n", m_cwd.c_str(), strerror(errno)); + _exit(1); + } + } + + Vector<BaseString> ulimit; + m_ulimit.split(ulimit); + for(size_t i = 0; i<ulimit.size(); i++){ + if(ulimit[i].trim().length() > 0 && set_ulimit(ulimit[i]) != 0){ + _exit(1); + } + } + + int fd = open("/dev/null", O_RDWR, 0); + if(fd == -1) { + logger.error("Cannot open `/dev/null': %s\n", strerror(errno)); + _exit(1); + } + + BaseString * redirects[] = { &m_stdin, &m_stdout, &m_stderr }; + int fds[3]; + for(int i = 0; i<3; i++){ + if(redirects[i]->empty()){ +#ifndef DEBUG + dup2(fd, i); +#endif + continue; + } + + if((* redirects[i]) == "2>&1" && i == 2){ + dup2(fds[1], 2); + continue; + } + + /** + * Make file + */ + int flags = 0; + int mode = S_IRUSR | S_IWUSR ; + if(i == 0){ + flags |= O_RDONLY; + } else { + flags |= O_WRONLY | O_CREAT | O_APPEND; + } + int f = fds[i]= open(redirects[i]->c_str(), flags, mode); + if(f == -1){ + logger.error("Cannot redirect %d to/from '%s' : %s\n", i, + redirects[i]->c_str(), strerror(errno)); + _exit(1); + } + dup2(f, i); + } + + /* Close all filedescriptors */ + for(int i = STDERR_FILENO+1; i < getdtablesize(); i++) + close(i); + + execv(m_path.c_str(), argv); + /* XXX If we reach this point, an error has occurred, but it's kind of hard + * to report it, because we've closed all files... So we should probably + * create a new logger here */ + logger.error("Exec failed: %s\n", strerror(errno)); + /* NOTREACHED */ +} + +int +CPCD::Process::start() { + /* We need to fork() twice, so that the second child (grandchild?) can + * become a daemon. The original child then writes the pid file, + * so that the monitor knows the pid of the new process, and then + * exit()s. That way, the monitor process can pickup the pid, and + * the running process is a daemon. + * + * This is a bit tricky but has the following advantages: + * - the cpcd can die, and "reconnect" to the monitored clients + * without restarting them. + * - the cpcd does not have to wait() for the childs. init(1) will + * take care of that. + */ + logger.info("Starting %d: %s", m_id, m_name.c_str()); + m_status = STARTING; + + int pid = -1; + switch(m_processType){ + case TEMPORARY:{ + /** + * Simple fork + * don't ignore child + */ + switch(pid = fork()) { + case 0: /* Child */ + + if(runas(m_runas.c_str()) == 0){ + writePid(getpid()); + do_exec(); + } + _exit(1); + break; + case -1: /* Error */ + logger.error("Cannot fork: %s\n", strerror(errno)); + m_status = STOPPED; + return -1; + break; + default: /* Parent */ + logger.debug("Started temporary %d : pid=%d", m_id, pid); + m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_RUNNING); + break; + } + break; + } + case PERMANENT:{ + /** + * PERMANENT + */ + switch(fork()) { + case 0: /* Child */ + if(runas(m_runas.c_str()) != 0){ + writePid(-1); + _exit(1); + } + signal(SIGCHLD, SIG_IGN); + pid_t pid; + switch(pid = fork()) { + case 0: /* Child */ + writePid(getpid()); + setsid(); + do_exec(); + _exit(1); + /* NOTREACHED */ + break; + case -1: /* Error */ + logger.error("Cannot fork: %s\n", strerror(errno)); + writePid(-1); + _exit(1); + break; + default: /* Parent */ + logger.debug("Started permanent %d : pid=%d", m_id, pid); + _exit(0); + break; + } + break; + case -1: /* Error */ + logger.error("Cannot fork: %s\n", strerror(errno)); + m_status = STOPPED; + return -1; + break; + default: /* Parent */ + m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_RUNNING); + break; + } + break; + } + default: + logger.critical("Unknown process type"); + return -1; + } + + while(readPid() < 0){ + sched_yield(); + } + + if(pid != -1 && pid != m_pid){ + logger.error("pid and m_pid don't match: %d %d", pid, m_pid); + } + + if(isRunning()){ + m_status = RUNNING; + return 0; + } + m_status = STOPPED; + return -1; +} + +void +CPCD::Process::stop() { + + char filename[PATH_MAX*2+1]; + snprintf(filename, sizeof(filename), "%d", m_id); + unlink(filename); + + if(m_pid <= 1){ + logger.critical("Stopping process with bogus pid: %d", m_pid); + return; + } + m_status = STOPPING; + + int ret = kill((pid_t)m_pid, SIGTERM); + switch(ret) { + case 0: + logger.debug("Sent SIGTERM to pid %d", (int)m_pid); + break; + default: + logger.debug("kill pid: %d : %s", (int)m_pid, strerror(errno)); + break; + } + + if(isRunning()){ + ret = kill((pid_t)m_pid, SIGKILL); + switch(ret) { + case 0: + logger.debug("Sent SIGKILL to pid %d", (int)m_pid); + break; + default: + logger.debug("kill pid: %d : %s\n", (int)m_pid, strerror(errno)); + break; + } + } + + m_pid = -1; + m_status = STOPPED; +} diff --git a/ndb/src/cw/cpcd/common.cpp b/ndb/src/cw/cpcd/common.cpp new file mode 100644 index 00000000000..731866b22fd --- /dev/null +++ b/ndb/src/cw/cpcd/common.cpp @@ -0,0 +1,158 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "common.hpp" +#include <logger/Logger.hpp> +#include <pwd.h> +#include <sys/types.h> +#include <unistd.h> + +#include <Properties.hpp> +#include <BaseString.hpp> + +int debug = 0; + +Logger logger; + +int +runas(const char * user){ + if(user == 0 || strlen(user) == 0){ + return 0; + } + struct passwd * pw = getpwnam(user); + if(pw == 0){ + logger.error("Can't find user to %s", user); + return -1; + } + uid_t uid = pw->pw_uid; + gid_t gid = pw->pw_gid; + int res = setgid(gid); + if(res != 0){ + logger.error("Can't change group to %s(%d)", user, gid); + return res; + } + + res = setuid(uid); + if(res != 0){ + logger.error("Can't change user to %s(%d)", user, uid); + } + return res; +} + +int +insert(const char * pair, Properties & p){ + BaseString tmp(pair); + + tmp.trim(" \t\n\r"); + + Vector<BaseString> split; + tmp.split(split, ":=", 2); + + if(split.size() != 2) + return -1; + + p.put(split[0].trim().c_str(), split[1].trim().c_str()); + + return 0; +} + +int +insert_file(FILE * f, class Properties& p, bool break_on_empty){ + if(f == 0) + return -1; + + while(!feof(f)){ + char buf[1024]; + fgets(buf, 1024, f); + BaseString tmp = buf; + + if(tmp.length() > 0 && tmp.c_str()[0] == '#') + continue; + + if(insert(tmp.c_str(), p) != 0 && break_on_empty) + break; + } + + return 0; +} + +int +insert_file(const char * filename, class Properties& p){ + FILE * f = fopen(filename, "r"); + int res = insert_file(f, p); + if(f) fclose(f); + return res; +} + +int +parse_config_file(struct getargs args[], int num_arg, const Properties& p){ + Properties::Iterator it(&p); + for(const char * name = it.first(); name != 0; name = it.next()){ + bool found = false; + for(int i = 0; i<num_arg; i++){ + if(strcmp(name, args[i].long_name) != 0) + continue; + + found = true; + + const char * tmp; + p.get(name, &tmp); + + int t = 1; + + switch(args[i].type){ + case arg_integer:{ + int val = atoi(tmp); + if(args[i].value){ + *((int*)args[i].value) = val; + } + } + break; + case arg_string: + if(args[i].value){ + *((const char**)args[i].value) = tmp; + } + break; + case arg_negative_flag: + t = 0; + case arg_flag: + if(args[i].value){ + if(!strcasecmp(tmp, "y") || + !strcasecmp(tmp, "on") || + !strcasecmp(tmp, "true") || + !strcasecmp(tmp, "1")){ + *((int*)args[i].value) = t; + } + if(!strcasecmp(tmp, "n") || + !strcasecmp(tmp, "off") || + !strcasecmp(tmp, "false") || + !strcasecmp(tmp, "0")){ + *((int*)args[i].value) = t; + } + } + t = 1; + break; + case arg_strings: + case arg_double: + case arg_collect: + case arg_counter: + break; + } + } + if(!found) + printf("Unknown parameter: %s\n", name); + } +} diff --git a/ndb/src/cw/cpcd/common.hpp b/ndb/src/cw/cpcd/common.hpp new file mode 100644 index 00000000000..65fcce05f66 --- /dev/null +++ b/ndb/src/cw/cpcd/common.hpp @@ -0,0 +1,35 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef __CPCD_COMMON_HPP_INCLUDED__ +#define __CPCD_COMMON_HPP_INCLUDED__ + +#include <stdio.h> +#include <logger/Logger.hpp> +#include <getarg.h> + +extern int debug; + +extern Logger logger; + +int runas(const char * user); +int insert(const char * pair, class Properties & p); + +int insert_file(const char * filename, class Properties&); +int insert_file(FILE *, class Properties&, bool break_on_empty = false); +int parse_config_file(struct getargs args[], int num_arg, const Properties& p); + +#endif /* ! __CPCD_COMMON_HPP_INCLUDED__ */ diff --git a/ndb/src/cw/cpcd/main.cpp b/ndb/src/cw/cpcd/main.cpp new file mode 100644 index 00000000000..8dd4f2b4608 --- /dev/null +++ b/ndb/src/cw/cpcd/main.cpp @@ -0,0 +1,178 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <sys/types.h> /* Needed for mkdir(2) */ +#include <sys/stat.h> /* Needed for mkdir(2) */ +#include <errno.h> +#include <signal.h> + +#include "CPCD.hpp" +#include "APIService.hpp" +#include <NdbMain.h> +#include <NdbSleep.h> +#include <BaseString.hpp> +#include <getarg.h> +#include <logger/Logger.hpp> +#include <logger/FileLogHandler.hpp> +#include <logger/SysLogHandler.hpp> + +#include "common.hpp" + +static char *work_dir = CPCD_DEFAULT_WORK_DIR; +static int port = CPCD_DEFAULT_TCP_PORT; +static int use_syslog = 0; +static char *logfile = NULL; +static char *config_file = CPCD_DEFAULT_CONFIG_FILE; +static char *user = 0; + +static struct getargs args[] = { + { "work-dir", 'w', arg_string, &work_dir, + "Work directory", "directory" }, + { "port", 'p', arg_integer, &port, + "TCP port to listen on", "port" }, + { "syslog", 'S', arg_flag, &use_syslog, + "Log events to syslog", NULL}, + { "logfile", 'L', arg_string, &logfile, + "File to log events to", "file"}, + { "debug", 'D', arg_flag, &debug, + "Enable debug mode", NULL}, + { "config", 'c', arg_string, &config_file, "Config file", NULL }, + { "user", 'u', arg_string, &user, "Run as user", NULL } +}; + +static const int num_args = sizeof(args) / sizeof(args[0]); + +static CPCD * g_cpcd = 0; +#if 0 +extern "C" static void sig_child(int signo, siginfo_t*, void*); +#endif + +const char *progname = "ndb_cpcd"; + +NDB_MAIN(ndb_cpcd){ + int optind = 0; + + if(getarg(args, num_args, argc, argv, &optind)) { + arg_printusage(args, num_args, progname, ""); + exit(1); + } + + Properties p; + insert_file(config_file, p); + if(parse_config_file(args, num_args, p)){ + ndbout_c("Invalid config file: %s", config_file); + exit(1); + } + + if(getarg(args, num_args, argc, argv, &optind)) { + arg_printusage(args, num_args, progname, ""); + exit(1); + } + + logger.setCategory(progname); + logger.enable(Logger::LL_ALL); + + if(debug) + logger.createConsoleHandler(); + + if(user && runas(user) != 0){ + logger.critical("Unable to change user: %s", user); + _exit(1); + } + + if(logfile != NULL){ + BaseString tmp; + if(logfile[0] != '/') + tmp.append(work_dir); + tmp.append(logfile); + logger.addHandler(new FileLogHandler(tmp.c_str())); + } + + if(use_syslog) + logger.addHandler(new SysLogHandler()); + + logger.info("Starting"); + + CPCD cpcd; + g_cpcd = &cpcd; + + /* XXX This will probably not work on !unix */ + int err = mkdir(work_dir, S_IRWXU | S_IRGRP | S_IROTH); + if(err != 0) { + switch(errno) { + case EEXIST: + break; + default: + fprintf(stderr, "Cannot mkdir %s: %s\n", work_dir, strerror(errno)); + exit(1); + } + } + + if(strlen(work_dir) > 0){ + logger.debug("Changing dir to '%s'", work_dir); + if((err = chdir(work_dir)) != 0){ + fprintf(stderr, "Cannot chdir %s: %s\n", work_dir, strerror(errno)); + exit(1); + } + } + + cpcd.loadProcessList(); + + SocketServer * ss = new SocketServer(); + CPCDAPIService * serv = new CPCDAPIService(cpcd); + if(!ss->setup(serv, port)){ + logger.critical("Cannot setup server: %s", strerror(errno)); + sleep(1); + delete ss; + delete serv; + return 1; + } + + ss->startServer(); + + { + signal(SIGPIPE, SIG_IGN); + signal(SIGCHLD, SIG_IGN); +#if 0 + struct sigaction act; + act.sa_handler = 0; + act.sa_sigaction = sig_child; + sigemptyset(&act.sa_mask); + act.sa_flags = SA_SIGINFO; + sigaction(SIGCHLD, &act, 0); +#endif + } + + logger.debug("Start completed"); + while(true) NdbSleep_MilliSleep(1000); + + delete ss; + return 0; +} + +#if 0 +extern "C" +void +sig_child(int signo, siginfo_t* info, void*){ + printf("signo: %d si_signo: %d si_errno: %d si_code: %d si_pid: %d\n", + signo, + info->si_signo, + info->si_errno, + info->si_code, + info->si_pid); + +} +#endif |