diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-09-11 19:00:49 -0600 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-09-11 19:00:49 -0600 |
commit | 9e4206b990df875e063a07784c77cc39dc394654 (patch) | |
tree | 5310cc673d4a7957df83a048a5f2a712dacc8279 /tools | |
parent | 34925f8d823c700c939f0622b6af85001458a226 (diff) | |
parent | 301cd96334f9f9abad8a2b216562593bdeb8d7cb (diff) | |
download | mariadb-git-9e4206b990df875e063a07784c77cc39dc394654.tar.gz |
Merge work:/home/bk/mysql-4.0
into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
Diffstat (limited to 'tools')
-rw-r--r-- | tools/managertest1.nc | 12 | ||||
-rw-r--r-- | tools/mysqlmanager.c | 974 |
2 files changed, 956 insertions, 30 deletions
diff --git a/tools/managertest1.nc b/tools/managertest1.nc new file mode 100644 index 00000000000..1125e141588 --- /dev/null +++ b/tools/managertest1.nc @@ -0,0 +1,12 @@ +root secret +def_exec server /usr/sbin/mysqld --socket=/tmp/temp.sock --skip-grant --skip-net --datadir=/tmp +set_exec_con server root localhost /tmp/temp.sock +start_exec server 3 +show_exec +stop_exec server 3 +show_exec +start_exec server 3 +show_exec +stop_exec server 3 +show_exec +shutdown diff --git a/tools/mysqlmanager.c b/tools/mysqlmanager.c index 10c3f6065b0..3572dde67d3 100644 --- a/tools/mysqlmanager.c +++ b/tools/mysqlmanager.c @@ -35,60 +35,93 @@ #include <getopt.h> #include <stdarg.h> #include <sys/stat.h> +#include <sys/types.h> +#include <sys/wait.h> #include <unistd.h> #include <errno.h> #include <violite.h> #include <my_pthread.h> -#define MNGD_VERSION "1.0" -#define MNGD_GREETING "MySQL Server Management Daemon v.1.0" +#define MANAGER_VERSION "1.0" +#define MANAGER_GREETING "MySQL Server Management Daemon v." ## \ + MANAGER_VERSION #define LOG_ERR 1 #define LOG_WARN 2 #define LOG_INFO 3 #define LOG_DEBUG 4 -#ifndef MNGD_PORT -#define MNGD_PORT 23546 +#define CHILD_START 1 +#define CHILD_STOP 2 + +#ifndef MANAGER_PORT +#define MANAGER_PORT 23546 +#endif + +#ifndef MANAGER_CONNECT_RETRIES +#define MANAGER_CONNECT_RETRIES 5 #endif -#ifndef MNGD_MAX_CMD_LEN -#define MNGD_MAX_CMD_LEN 16384 +#ifndef MANAGER_MAX_CMD_LEN +#define MANAGER_MAX_CMD_LEN 16384 #endif -#ifndef MNGD_LOG_FILE -#define MNGD_LOG_FILE "/var/log/mysqlmanager.log" +#ifndef MANAGER_LOG_FILE +#define MANAGER_LOG_FILE "/var/log/mysqlmanager.log" #endif -#ifndef MNGD_BACK_LOG -#define MNGD_BACK_LOG 50 +#ifndef MANAGER_BACK_LOG +#define MANAGER_BACK_LOG 50 #endif #ifndef MAX_USER_NAME #define MAX_USER_NAME 16 #endif +#ifndef MANAGER_PW_FILE +#define MANAGER_PW_FILE "/etc/mysqlmanager.passwd" +#endif + +#ifndef MAX_HOST +#define MAX_HOST 128 +#endif + +#ifndef MAX_LAUNCHER_MSG +#define MAX_LAUNCHER_MSG 256 +#endif + /* Variable naming convention - if starts with manager_, either is set directly by the user, or used closely in ocnjunction with a variable set by the user */ -uint manager_port = MNGD_PORT; +#if defined(__i386__) && defined(HAVE_LINUXTHREADS) +#define DO_STACKTRACE 1 +#endif + +uint manager_port = MANAGER_PORT; FILE* errfp; -const char* manager_log_file = MNGD_LOG_FILE; -pthread_mutex_t lock_log, lock_shutdown; +const char* manager_log_file = MANAGER_LOG_FILE; +pthread_mutex_t lock_log,lock_shutdown,lock_exec_hash,lock_launch_thd; +pthread_cond_t cond_launch_thd; +pthread_t loop_th,launch_msg_th; int manager_sock = -1; +uchar* stack_bottom=0; struct sockaddr_in manager_addr; ulong manager_bind_addr = INADDR_ANY; -int manager_back_log = MNGD_BACK_LOG; +int manager_back_log = MANAGER_BACK_LOG; int in_shutdown = 0, shutdown_requested=0; -const char* manager_greeting = MNGD_GREETING; -uint manager_max_cmd_len = MNGD_MAX_CMD_LEN; +int manager_connect_retries=MANAGER_CONNECT_RETRIES; +const char* manager_greeting = MANAGER_GREETING; +uint manager_max_cmd_len = MANAGER_MAX_CMD_LEN; +const char* manager_pw_file=MANAGER_PW_FILE; +int one_thread = 0; /* for debugging */ /* messages */ #define MAX_CLIENT_MSG_LEN 256 #define NET_BLOCK 2048 +#define MD5_LEN 32 #define ESCAPE_CHAR '\\' #define EOL_CHAR '\n' @@ -98,7 +131,6 @@ uint manager_max_cmd_len = MNGD_MAX_CMD_LEN; #define MSG_CLIENT_ERR 450 #define MSG_INTERNAL_ERR 500 - /* access flags */ #define PRIV_SHUTDOWN 1 @@ -112,11 +144,52 @@ struct manager_thd int fatal,finished; }; -struct manager_thd* manager_thd_new(Vio* vio); -void manager_thd_free(struct manager_thd* thd); +struct manager_user +{ + char user[MAX_USER_NAME]; + char md5_pass[MD5_LEN]; + int user_len; + const char* error; +}; + +HASH exec_hash,user_hash; +struct manager_exec* cur_launch_exec=0; + +static struct manager_thd* manager_thd_new(Vio* vio); + +static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end); +static void manager_exec_print(Vio* vio,struct manager_exec* e); +static void manager_thd_free(struct manager_thd* thd); +static void manager_exec_free(void* e); +static void manager_exec_connect(struct manager_exec* e); +static int manager_exec_launch(struct manager_exec* e); +static struct manager_exec* manager_exec_by_pid(pid_t pid); + +static struct manager_user* manager_user_new(char* buf); +static void manager_user_free(void* u); + +static char* arg_strmov(char* dest, const char* src, int n); +static byte* get_exec_key(const byte* e, uint* len, + my_bool __attribute__((unused)) t); +static byte* get_user_key(const byte* u, uint* len, + my_bool __attribute__((unused)) t); +static uint tokenize_args(char* arg_start,char** arg_end); +static void init_arg_array(char* arg_str,char** args,uint arg_count); typedef int (*manager_cmd_handler)(struct manager_thd*,char*,char*); +static void handle_child(int __attribute__((unused)) sig); +static void handle_sigpipe(int __attribute__((unused)) sig); + +/* exec() in a threaded application is full of problems + to solve this, we fork off a launcher at the very start + and communicate with it through a pipe +*/ +static void fork_launcher(); +static void run_launcher_loop(); +int to_launcher_pipe[2],from_launcher_pipe[2]; +pid_t launcher_pid; +int in_segfault=0; struct manager_cmd { @@ -126,6 +199,29 @@ struct manager_cmd int len; }; +struct manager_exec +{ + char* ident; + int ident_len; + const char* error; + char* bin_path; + char** args; + char con_user[16]; + char con_pass[16]; + int con_port; + pid_t pid; + int exit_code; + pthread_mutex_t lock; + pthread_cond_t cond; + pthread_t th; + char con_sock[FN_REFLEN]; + char con_host[MAX_HOST]; + MYSQL mysql; + char* data_buf; + int req_len; + int num_args; +}; + #define HANDLE_DECL(com) static int handle_ ## com (struct manager_thd* thd,\ char* args_start,char* args_end) @@ -138,12 +234,25 @@ HANDLE_NOARG_DECL(ping); HANDLE_NOARG_DECL(quit); HANDLE_NOARG_DECL(help); HANDLE_NOARG_DECL(shutdown); +HANDLE_DECL(def_exec); +HANDLE_DECL(start_exec); +HANDLE_DECL(stop_exec); +HANDLE_DECL(set_exec_con); +HANDLE_NOARG_DECL(show_exec); struct manager_cmd commands[] = { {"ping", "Check if this server is alive", handle_ping,4}, {"quit", "Finish session", handle_quit,4}, {"shutdown", "Shutdown this server", handle_shutdown,8}, + {"def_exec", "Define executable entry", handle_def_exec,8}, + {"start_exec", "Launch process defined by executable entry", + handle_start_exec,10}, + {"stop_exec", "Stop process defined by executable entry", + handle_stop_exec,9}, + {"set_exec_con", "Set connection parameters for executable entry", + handle_set_exec_con,12}, + {"show_exec","Show defined executable entries",handle_show_exec,9}, {"help", "Print this message", handle_help,4}, {0,0,0,0} }; @@ -158,6 +267,9 @@ struct option long_options[] = {"tcp-backlog", required_argument, 0, 'B'}, {"greeting", required_argument, 0, 'g'}, {"max-command-len",required_argument,0,'m'}, + {"one-thread",no_argument,0,'d'}, + {"connect-retries",required_argument,0,'C'}, + {"password-file",required_argument,0,'p'}, {"version", no_argument, 0, 'V'}, {0, 0, 0, 0} }; @@ -175,8 +287,70 @@ static char* read_line(struct manager_thd* thd); /* returns pointer to end of line */ static pthread_handler_decl(process_connection,arg); +static pthread_handler_decl(process_launcher_messages, + __attribute__((unused)) arg); static int exec_line(struct manager_thd* thd,char* buf,char* buf_end); +#ifdef DO_STACKTRACE +void print_stacktrace(); +#endif + +static void handle_segfault(int sig) +{ + if (in_segfault) + exit(1); + in_segfault=1; + fprintf(errfp,"Got fatal signal %d\n",sig); +#ifdef DO_STACKTRACE + print_stacktrace(); +#endif + exit(1); +} + +static void handle_sigpipe(int __attribute__((unused)) sig) +{ + signal(SIGPIPE,handle_sigpipe); +} + +#ifdef DO_STACKTRACE + +#define MAX_DEPTH 25 +#define SIGRETURN_FRAME_COUNT 1 + +void print_stacktrace() +{ + uchar** fp; + int i; + LINT_INIT(fp); + fprintf(errfp,"Fatal errror, stacktrace follows:\n"); +#ifdef __i386__ + __asm__ __volatile__("movl %%ebp,%0" :"=r"(fp) :"r"(fp)); +#endif + if (!fp) + { + fprintf(errfp,"frame points is NULL, cannot trace stack\n"); + return; + } + for(i=0;i<MAX_DEPTH && fp<(uchar**)stack_bottom;i++) + { +#ifdef __i386__ + uchar** new_fp = (uchar**)*fp; + fprintf(errfp, "%p\n", i == SIGRETURN_FRAME_COUNT ? + *(fp+17) : *(fp+1)); +#endif /* __386__ */ + if (new_fp <= fp ) + { + fprintf(errfp, "New value of fp=%p failed sanity check,\ + terminating stack trace!\n", new_fp); + return; + } + fp = new_fp; + } + fprintf(errfp,"Stack trace successful\n"); + fflush(errfp); +} +#endif + static int exec_line(struct manager_thd* thd,char* buf,char* buf_end) { char* p=buf; @@ -190,6 +364,7 @@ static int exec_line(struct manager_thd* thd,char* buf,char* buf_end) commands"); return 1; } + for (;p<buf_end && isspace(*p);p++); return cmd->handler_func(thd,p,buf_end); } @@ -235,9 +410,337 @@ HANDLE_NOARG_DECL(shutdown) client_msg(thd->vio,MSG_OK,"Shutdown started, goodbye"); thd->finished=1; shutdown_requested = 1; + if (!one_thread) + { + kill(launcher_pid,SIGTERM); + pthread_kill(loop_th,SIGTERM); + } return 0; } +HANDLE_DECL(set_exec_con) +{ + int num_args; + const char* error=0; + struct manager_exec* e; + char* arg_p; + if ((num_args=tokenize_args(args_start,&args_end))<2) + { + error="Too few arguments"; + goto err; + } + arg_p=args_start; + pthread_mutex_lock(&lock_exec_hash); + if (!(e=(struct manager_exec*)hash_search(&exec_hash,arg_p, + strlen(arg_p)))) + { + pthread_mutex_unlock(&lock_exec_hash); + error="Exec definition entry does not exist"; + goto err; + } + arg_p+=strlen(arg_p)+1; + arg_p+=(strnmov(e->con_user,arg_p,sizeof(e->con_user))-e->con_user)+1; + if (num_args >= 3) + { + arg_p+=(strnmov(e->con_host,arg_p,sizeof(e->con_host))-e->con_host)+1; + if (num_args == 4) + { + if (!(e->con_port=atoi(arg_p))) + strnmov(e->con_sock,arg_p,sizeof(e->con_sock)); + else + e->con_sock[0]=0; + } + else + { + pthread_mutex_unlock(&lock_exec_hash); + error="Too many arguments"; + goto err; + } + } + pthread_mutex_unlock(&lock_exec_hash); + client_msg(thd->vio,MSG_OK,"Entry updated"); + return 0; +err: + client_msg(thd->vio,MSG_CLIENT_ERR,error); + return 1; +} + +HANDLE_DECL(start_exec) +{ + int num_args; + struct manager_exec* e; + int ident_len; + const char* error=0; + struct timespec t; + if ((num_args=tokenize_args(args_start,&args_end))<1) + { + error="Too few arguments"; + goto err; + } + ident_len=strlen(args_start); + pthread_mutex_lock(&lock_exec_hash); + if (!(e=(struct manager_exec*)hash_search(&exec_hash,args_start, + ident_len))) + { + pthread_mutex_unlock(&lock_exec_hash); + error="Exec definition entry does not exist"; + goto err; + } + pthread_mutex_unlock(&lock_exec_hash); + manager_exec_launch(e); + if ((error=e->error)) + goto err; + pthread_mutex_lock(&e->lock); + t.tv_sec=time(0)+atoi(args_start+ident_len+1); + t.tv_nsec=0; + pthread_cond_timedwait(&e->cond,&e->lock,&t); + if (!e->pid) + { + pthread_mutex_unlock(&e->lock); + error="Process failed to start withing alotted time"; + goto err; + } + mysql_close(&e->mysql); + manager_exec_connect(e); + error=e->error; + pthread_mutex_unlock(&e->lock); + if (error) + goto err; + client_msg(thd->vio,MSG_OK,"'%s' started",e->ident); + return 0; +err: + client_msg(thd->vio,MSG_CLIENT_ERR,error); + return 1; +} + +HANDLE_DECL(stop_exec) +{ + int num_args; + struct timespec abstime; + struct manager_exec* e; + int ident_len; + const char* error=0; + if ((num_args=tokenize_args(args_start,&args_end))<2) + { + error="Too few arguments"; + goto err; + } + ident_len=strlen(args_start); + abstime.tv_sec=time(0)+atoi(args_start+1+ident_len); + abstime.tv_nsec=0; + pthread_mutex_lock(&lock_exec_hash); + if (!(e=(struct manager_exec*)hash_search(&exec_hash,args_start, + ident_len))) + { + pthread_mutex_unlock(&lock_exec_hash); + error="Exec definition entry does not exist"; + goto err; + } + pthread_mutex_unlock(&lock_exec_hash); + pthread_mutex_lock(&e->lock); + e->th=pthread_self(); + if (!e->pid) + { + e->th=0; + pthread_mutex_unlock(&e->lock); + error="Process not running"; + goto err; + } + if (mysql_shutdown(&e->mysql)) + { + e->th=0; + pthread_mutex_unlock(&e->lock); + error="Could not send shutdown command"; + goto err; + } + pthread_cond_timedwait(&e->cond,&e->lock,&abstime); + if (e->pid) + error="Process failed to terminate within alotted time"; + e->th=0; + pthread_mutex_unlock(&e->lock); + if (!error) + { + client_msg(thd->vio,MSG_OK,"'%s' terminated",e->ident); + return 0; + } +err: + client_msg(thd->vio,MSG_CLIENT_ERR,error); + return 1; +} + +HANDLE_DECL(def_exec) +{ + struct manager_exec* e=0,*old_e; + const char* error=0; + if (!(e=manager_exec_new(args_start,args_end))) + { + error="Out of memory"; + goto err; + } + if (e->error) + { + error=e->error; + goto err; + } + pthread_mutex_lock(&lock_exec_hash); + if ((old_e=(struct manager_exec*)hash_search(&exec_hash,(byte*)e->ident, + e->ident_len))) + { + pthread_mutex_unlock(&lock_exec_hash); + error="Exec definition already exists"; + goto err; + } + hash_insert(&exec_hash,(byte*)e); + pthread_mutex_unlock(&lock_exec_hash); + client_msg(thd->vio,MSG_OK,"Exec definition created"); + return 0; +err: + client_msg(thd->vio,MSG_CLIENT_ERR,error); + if (e) + manager_exec_free(e); + return 1; +} + +HANDLE_NOARG_DECL(show_exec) +{ + uint i; + client_msg_pre(thd->vio,MSG_INFO,"Exec_def\tPid\tExit_status\tCon_info\ +\tArguments"); + pthread_mutex_lock(&lock_exec_hash); + for (i=0;i<exec_hash.records;i++) + { + struct manager_exec* e=(struct manager_exec*)hash_element(&exec_hash,i); + manager_exec_print(thd->vio,e); + } + pthread_mutex_unlock(&lock_exec_hash); + client_msg(thd->vio,MSG_INFO,"End"); + return 0; +} + +static struct manager_exec* manager_exec_by_pid(pid_t pid) +{ + struct manager_exec* e; + uint i; + pthread_mutex_lock(&lock_exec_hash); + for (i=0;i<exec_hash.records;i++) + { + e=(struct manager_exec*)hash_element(&exec_hash,i); + if (e->pid==pid) + { + pthread_mutex_unlock(&lock_exec_hash); + return e; + } + } + pthread_mutex_unlock(&lock_exec_hash); + return 0; +} + +static void manager_exec_connect(struct manager_exec* e) +{ + int i; + for (i=0;i<manager_connect_retries;i++) + { + if (mysql_real_connect(&e->mysql,e->con_host,e->con_user,e->con_pass,0, + e->con_port,e->con_sock,0)) + return; + sleep(1); + } + e->error="Could not connect to MySQL server withing the number of tries"; +} + +static int manager_exec_launch(struct manager_exec* e) +{ + if (one_thread) + { + pid_t tmp_pid; + switch ((tmp_pid=fork())) + { + case -1: + e->error="Cannot fork"; + return 1; + case 0: + { + int err_code; + close(manager_sock); + err_code=execv(e->bin_path,e->args); + exit(err_code); + } + default: + e->pid=tmp_pid; + manager_exec_connect(e); + return 0; + } + } + else + { + if (write(to_launcher_pipe[1],&e->req_len,sizeof(int))!=sizeof(int) || + write(to_launcher_pipe[1],&e->num_args,sizeof(int))!=sizeof(int) || + write(to_launcher_pipe[1],e->data_buf,e->req_len)!=e->req_len) + { + e->error="Failed write request to launcher"; + return 1; + } + } + return 0; +} + +static char* arg_strmov(char* dest, const char* src, int n) +{ + char* dest_end = dest+n-1; + char c; + for (;dest<dest_end && (c=*src++);) + { + if (c=='%') + *dest++='%'; + *dest++=c; + } + return dest; +} + +static void manager_exec_print(Vio* vio,struct manager_exec* e) +{ + char buf[MAX_CLIENT_MSG_LEN]; + char* p=buf,*buf_end=buf+sizeof(buf)-1; + char** args=e->args; + + p=arg_strmov(p,e->ident,(int)(buf_end-p)-1); + *p++='\t'; + if (p>buf_end-15) + goto end; + p=int10_to_str(e->pid,p,10); + *p++='\t'; + p=int10_to_str(e->exit_code,p,10); + *p++='\t'; + + p=arg_strmov(p,e->con_user,(int)(buf_end-p)-1); + *p++='@'; + if (p==buf_end) + goto end; + p=arg_strmov(p,e->con_host,(int)(buf_end-p)-11); + *p++=':'; + if (p==buf_end-10) + goto end; + if (e->con_sock[0]) + { + p=arg_strmov(p,e->con_sock,(int)(buf_end-p)-1); + } + else + { + p=int10_to_str(e->con_port,p,10); + } + *p++='\t'; + + for(;p<buf_end && *args;args++) + { + p=arg_strmov(p,*args,(int)(buf_end-p)-1); + *p++='\t'; + } +end: + *p=0; + client_msg_pre(vio,MSG_INFO,buf); + return; +} + static int authenticate(struct manager_thd* thd) { char* buf_end; @@ -323,8 +826,68 @@ LOG_MSG_FUNC(info,INFO) #ifndef DBUG_OFF LOG_MSG_FUNC(debug,DEBUG) +#else +inline void log_debug(char* __attribute__((unused)) fmt,...) {} #endif +static pthread_handler_decl(process_launcher_messages, + __attribute__((unused)) arg) +{ + my_thread_init(); + for (;!in_shutdown;) + { + pid_t pid; + struct manager_exec* e; + char buf[MAX_LAUNCHER_MSG]; + if (read(from_launcher_pipe[0],buf,MAX_LAUNCHER_MSG)<0) + { + log_err("error reading launcher message"); + sleep(1); + continue; + } + switch (buf[0]) + { + case CHILD_START: + { + char* ident=buf+1; + int ident_len=strlen(ident); + memcpy(&pid,ident+ident_len+1,sizeof(pid)); + log_debug("process message - ident=%s,ident_len=%d,pid=%d",ident, + ident_len,pid); + pthread_mutex_lock(&lock_exec_hash); + log_debug("hash has %d records",exec_hash.records); + e=(struct manager_exec*)hash_search(&exec_hash,ident,ident_len); + if (e) + { + pthread_mutex_lock(&e->lock); + e->pid=pid; + pthread_cond_broadcast(&e->cond); + pthread_mutex_unlock(&e->lock); + } + pthread_mutex_unlock(&lock_exec_hash); + log_debug("unlocked mutex"); + break; + } + case CHILD_STOP: + memcpy(&pid,buf+1,sizeof(pid)); + e=manager_exec_by_pid(pid); + if (e) + { + pthread_mutex_lock(&e->lock); + e->pid=0; + memcpy(&e->exit_code,buf+1+sizeof(pid),sizeof(int)); + pthread_cond_broadcast(&e->cond); + pthread_mutex_unlock(&e->lock); + } + break; + default: + log_err("Got invalid launcher message"); + break; + } + } + return 0; +} + static pthread_handler_decl(process_connection,arg) { struct manager_thd* thd = (struct manager_thd*)arg; @@ -422,6 +985,23 @@ static char* read_line(struct manager_thd* thd) return 0; } +static void handle_child(int __attribute__((unused)) sig) +{ + pid_t child; + int child_status; + + for(;(child=waitpid(-1,&child_status,WNOHANG))>0;) + { + char msg_buf[1+sizeof(int)+sizeof(int)]; + msg_buf[0]=CHILD_STOP; + memcpy(msg_buf+1,&child,sizeof(int)); + memcpy(msg_buf+1+sizeof(int),&child_status,sizeof(int)); + if (write(from_launcher_pipe[1],msg_buf,sizeof(msg_buf))!=sizeof(msg_buf)) + log_err("launcher: error writing message on child exit"); + } + signal(SIGCHLD,handle_child); +} + struct manager_thd* manager_thd_new(Vio* vio) { struct manager_thd* tmp; @@ -439,7 +1019,7 @@ struct manager_thd* manager_thd_new(Vio* vio) return tmp; } -void manager_thd_free(struct manager_thd* thd) +static void manager_thd_free(struct manager_thd* thd) { if (thd->vio) vio_close(thd->vio); @@ -462,15 +1042,16 @@ static void clean_up() log_info("Ended"); if (errfp != stderr) fclose(errfp); + hash_free(&exec_hash); } static void print_version(void) { - printf("%s Ver %s Distrib %s, for %s (%s)\n",my_progname,MNGD_VERSION, + printf("%s Ver %s Distrib %s, for %s (%s)\n",my_progname,MANAGER_VERSION, MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE); } -void usage() +static void usage() { print_version(); printf("MySQL AB, by Sasha\n"); @@ -490,13 +1071,16 @@ void usage() -B, --tcp-backlog==... Size of TCP/IP listen queue.\n\ -g, --greeting= Set greeting on connect \n\ -m, --max-command-len Maximum command length \n\ + -d, --one-thread Use one thread ( for debugging) \n\ + -C, --connect-retries Number of attempts to establish MySQL connection \n\ + -m, --max-command-len Maximum command length \n\ -V, --version Output version information and exit.\n\n"); } -int parse_args(int argc, char **argv) +static int parse_args(int argc, char **argv) { int c, option_index = 0; - while ((c=getopt_long(argc,argv,"P:?#:Vl:b:B:g:m:", + while ((c=getopt_long(argc,argv,"P:?#:Vl:b:B:g:m:dC:p:", long_options,&option_index)) != EOF) { switch (c) @@ -504,6 +1088,15 @@ int parse_args(int argc, char **argv) case '#': DBUG_PUSH(optarg ? optarg : "d:t:O,/tmp/mysqlmgrd.trace"); break; + case 'd': + one_thread=1; + break; + case 'p': + manager_pw_file=MANAGER_PW_FILE; + break; + case 'C': + manager_connect_retries=atoi(optarg); + break; case 'P': manager_port=atoi(optarg); break; @@ -535,7 +1128,7 @@ int parse_args(int argc, char **argv) return 0; } -int init_server() +static int init_server() { int arg=1; log_info("Started"); @@ -554,7 +1147,7 @@ int init_server() return 0; } -int run_server_loop() +static int run_server_loop() { pthread_t th; struct manager_thd *thd; @@ -598,7 +1191,13 @@ int run_server_loop() } if (shutdown_requested) break; - if (pthread_create(&th,0,process_connection,(void*)thd)) + if (one_thread) + { + process_connection((void*)thd); + manager_thd_free(thd); + continue; + } + else if (pthread_create(&th,0,process_connection,(void*)thd)) { client_msg(vio,MSG_INTERNAL_ERR,"Could not create thread, errno=%d", errno); @@ -609,7 +1208,7 @@ int run_server_loop() return 0; } -FILE* open_log_stream() +static FILE* open_log_stream() { FILE* fp; if (!(fp=fopen(manager_log_file,"a"))) @@ -617,7 +1216,297 @@ FILE* open_log_stream() return fp; } -int daemonize() +static byte* get_user_key(const byte* u, uint* len, + my_bool __attribute__((unused)) t) +{ + register const char* key; + key = ((struct manager_user*)u)->user; + *len = ((struct manager_user*)u)->user_len; + return (byte*)key; +} + +static byte* get_exec_key(const byte* e, uint* len, + my_bool __attribute__((unused)) t) +{ + register const char* key; + key = ((struct manager_exec*)e)->ident; + *len = ((struct manager_exec*)e)->ident_len; + return (byte*)key; +} + +static void init_arg_array(char* arg_str,char** args,uint arg_count) +{ + char* p = arg_str; + for (;arg_count>0;arg_count--) + { + *args++=p; + p += strlen(p)+1; + } + *args=0; +} + +static uint tokenize_args(char* arg_start,char** arg_end) +{ + char* p, *p_write,*p_end; + uint arg_count=0; + int quoted=0,escaped=0,last_space=0; + p_end=*arg_end; + p_write=p=arg_start; + for(;p<p_end;p++) + { + char c = *p; + switch (c) + { + case ' ': + case '\r': + case '\n': + if (!quoted) + { + if (!last_space) + { + *p_write++=0; + arg_count++; + last_space=1; + } + } + else + *p_write++=c; + escaped=0; + break; + case '"': + if (!escaped) + quoted=!quoted; + else + *p_write++=c; + last_space=0; + escaped=0; + break; + case '\\': + if (!escaped) + escaped=1; + else + { + *p_write++=c; + escaped=0; + } + last_space=0; + break; + default: + escaped=last_space=0; + *p_write++=c; + break; + } + } + if (!last_space && p_write>arg_start) + arg_count++; + *p_write=0; + *arg_end=p_write; + log_debug("arg_count=%d,arg_start='%s'",arg_count,arg_start); + return arg_count; +} + + +static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end) +{ + struct manager_exec* tmp; + char* first_arg; + uint arg_len,num_args; + num_args=tokenize_args(arg_start,&arg_end); + arg_len=(uint)(arg_end-arg_start)+1; /* include \0 terminator*/ + if (!(tmp=(struct manager_exec*)my_malloc(sizeof(*tmp)+arg_len+ + sizeof(char*)*num_args,MYF(0)))) + return 0; + if (num_args<2) + { + tmp->error="Too few arguments"; + return tmp; + } + tmp->data_buf=(char*)tmp+sizeof(*tmp); + memcpy(tmp->data_buf,arg_start,arg_len); + tmp->req_len=arg_len; + tmp->args=(char**)(tmp->data_buf+arg_len); + tmp->num_args=num_args; + tmp->ident=tmp->data_buf; + tmp->ident_len=strlen(tmp->ident); + first_arg=tmp->ident+tmp->ident_len+1; + init_arg_array(first_arg,tmp->args,num_args-1); + strmov(tmp->con_user,"root"); + tmp->con_pass[0]=0; + tmp->con_sock[0]=0; + tmp->con_port=MYSQL_PORT; + memcpy(tmp->con_host,"localhost",10); + tmp->bin_path=tmp->args[0]; + tmp->pid=0; + tmp->exit_code=0; + tmp->th=0; + pthread_mutex_init(&tmp->lock,0); + pthread_cond_init(&tmp->cond,0); + mysql_init(&tmp->mysql); + tmp->error=0; + return tmp; +} + +static void manager_exec_free(void* e) +{ + mysql_close(&((struct manager_exec*)e)->mysql); + my_free(e,MYF(0)); +} + +static struct manager_user* manager_user_new(char* buf) +{ + struct manager_user* tmp; + char* p,*user_end; + char c; + if (!(tmp=(struct manager_user*)my_malloc(sizeof(*tmp),MYF(0)))) + return 0; + p=tmp->user; + user_end=p+MAX_USER_NAME-1; + for (;(c=*buf) && p<user_end;buf++) + { + if (c == ':') + { + *p=0; + tmp->user_len=p-tmp->user; + buf++; + break; + } + else + *p++=c; + } + if (!c) + tmp->error="Missing ':'"; + if (p == user_end) + tmp->error="Username too long"; + if (tmp->error) + return tmp; + if (strlen(buf) < MD5_LEN) + { + tmp->error="Invalid MD5 sum, too short"; + return tmp; + } + memcpy(tmp->md5_pass,buf,MD5_LEN); + tmp->error=0; + return tmp; +} + +static void manager_user_free(void* u) +{ + my_free((gptr)u,MYF(0)); +} + +static void init_user_hash() +{ + FILE* f; + char buf[80]; + int line_num=1; + if (hash_init(&user_hash,1024,0,0,get_user_key,manager_user_free,MYF(0))) + die("Could not initialize user hash"); + if (!(f=fopen(manager_pw_file,"r"))) + die("Could not open password file '%s'", manager_pw_file); + for (;;line_num++) + { + struct manager_user* u; + if (!fgets(buf,sizeof(buf),f) || feof(f)) + break; + if (buf[0] == '#') + continue; + if (!(u=manager_user_new(buf))) + die("Out of memory while reading user line"); + if (u->error) + { + die("Error on line %d of '%s': %s",line_num,manager_pw_file, u->error); + } + else + { + hash_insert(&user_hash,(gptr)u); + } + } + fclose(f); +} + +static void init_globals() +{ + if (hash_init(&exec_hash,1024,0,0,get_exec_key,manager_exec_free,MYF(0))) + die("Exec hash initialization failed"); + if (!one_thread) + { + fork_launcher(); + if (pthread_create(&launch_msg_th,0,process_launcher_messages,0)) + die("Could not start launcher message handler thread"); + } + init_user_hash(); + loop_th=pthread_self(); + signal(SIGPIPE,handle_sigpipe); +} + +static void run_launcher_loop() +{ + for (;;) + { + int req_len,ident_len,num_args; + char* request_buf=0; + pid_t pid; + char* exec_path,*ident; + char** args=0; + + if (read(to_launcher_pipe[0],&req_len,sizeof(int))!=sizeof(int) || + read(to_launcher_pipe[0],&num_args,sizeof(int))!=sizeof(int) || + !(request_buf=(char*)my_malloc(req_len+sizeof(pid)+2,MYF(0))) || + !(args=(char**)my_malloc(num_args*sizeof(char*),MYF(0))) || + read(to_launcher_pipe[0],request_buf+1,req_len)!=req_len) + { + log_err("launcher: Error reading request"); + my_free((gptr)request_buf,MYF(MY_ALLOW_ZERO_PTR)); + my_free((gptr)args,MYF(MY_ALLOW_ZERO_PTR)); + sleep(1); + continue; + } + ident=request_buf+1; + ident_len=strlen(ident); + exec_path=ident+ident_len+1; + log_debug("num_args=%d,req_len=%d,ident=%s,ident_len=%d,exec_path=%s", + num_args, + req_len,ident,ident_len,exec_path); + init_arg_array(exec_path,args,num_args-1); + + switch ((pid=fork())) + { + case -1: + log_err("launcher: cannot fork"); + sleep(1); + break; + case 0: + if (execv(exec_path,args)) + log_err("launcher: cannot exec %s",exec_path); + exit(1); + default: + request_buf[0]=CHILD_START; + memcpy(request_buf+ident_len+2,&pid,sizeof(pid)); + if (write(from_launcher_pipe[1],request_buf,ident_len+2+sizeof(pid))<0) + log_err("launcher: error sending launch status report"); + break; + } + my_free((gptr)request_buf,MYF(0)); + my_free((gptr)args,MYF(0)); + } +} + +static void fork_launcher() +{ + if (pipe(to_launcher_pipe) || pipe(from_launcher_pipe)) + die("Could not create launcher pipes"); + switch ((launcher_pid=fork())) + { + case 0: + signal(SIGCHLD,handle_child); + run_launcher_loop(); + exit(0); + case -1: die("Could not fork the launcher"); + default: return; + } +} + +static int daemonize() { switch (fork()) { @@ -625,8 +1514,10 @@ int daemonize() die("Cannot fork"); case 0: errfp = open_log_stream(); + init_globals(); close(0); close(1); + close(2); init_server(); run_server_loop(); clean_up(); @@ -639,13 +1530,36 @@ int daemonize() int main(int argc, char** argv) { + char c; + stack_bottom=&c; MY_INIT(argv[0]); errfp = stderr; parse_args(argc,argv); pthread_mutex_init(&lock_log,0); pthread_mutex_init(&lock_shutdown,0); - return daemonize(); + pthread_mutex_init(&lock_exec_hash,0); + pthread_mutex_init(&lock_launch_thd,0); + pthread_cond_init(&cond_launch_thd,0); +#ifdef DO_STACKTRACE + signal(SIGSEGV,handle_segfault); +#endif + if (one_thread) + { + init_globals(); + init_server(); + run_server_loop(); + clean_up(); + return 0; + } + else + return daemonize(); } + + + + + + |