diff options
-rw-r--r-- | server-tools/instance-manager/commands.cc | 1 | ||||
-rw-r--r-- | server-tools/instance-manager/guardian.cc | 63 | ||||
-rw-r--r-- | server-tools/instance-manager/guardian.h | 43 | ||||
-rw-r--r-- | server-tools/instance-manager/instance.cc | 60 | ||||
-rw-r--r-- | server-tools/instance-manager/listener.cc | 128 | ||||
-rw-r--r-- | server-tools/instance-manager/listener.h | 42 | ||||
-rw-r--r-- | server-tools/instance-manager/manager.cc | 74 | ||||
-rw-r--r-- | server-tools/instance-manager/mysql_connection.cc | 117 | ||||
-rw-r--r-- | server-tools/instance-manager/mysql_connection.h | 59 | ||||
-rw-r--r-- | server-tools/instance-manager/priv.cc | 38 | ||||
-rw-r--r-- | server-tools/instance-manager/priv.h | 4 | ||||
-rw-r--r-- | server-tools/instance-manager/thread_registry.cc | 96 | ||||
-rw-r--r-- | server-tools/instance-manager/thread_registry.h | 31 |
13 files changed, 336 insertions, 420 deletions
diff --git a/server-tools/instance-manager/commands.cc b/server-tools/instance-manager/commands.cc index 50ab15f1ed3..1ecd4b38fcd 100644 --- a/server-tools/instance-manager/commands.cc +++ b/server-tools/instance-manager/commands.cc @@ -29,7 +29,6 @@ #include "guardian.h" #include "instance_map.h" #include "log.h" -#include "manager.h" #include "messages.h" #include "mysqld_error.h" #include "mysql_manager_error.h" diff --git a/server-tools/instance-manager/guardian.cc b/server-tools/instance-manager/guardian.cc index 71ad0760b06..1cb9aed559b 100644 --- a/server-tools/instance-manager/guardian.cc +++ b/server-tools/instance-manager/guardian.cc @@ -20,7 +20,6 @@ #endif #include "guardian.h" - #include <string.h> #include <sys/types.h> #include <signal.h> @@ -30,15 +29,6 @@ #include "log.h" #include "mysql_manager_error.h" - -pthread_handler_t guardian_thread_func(void *arg) -{ - Guardian *guardian= (Guardian *) arg; - guardian->run(); - return 0; -} - - const char * Guardian::get_instance_state_name(enum_instance_state state) { @@ -68,18 +58,19 @@ Guardian::get_instance_state_name(enum_instance_state state) return NULL; /* just to ignore compiler warning. */ } +/* {{{ Constructor & destructor. */ -Guardian::Guardian(Thread_registry &thread_registry_arg, - Instance_map *instance_map_arg, - uint monitoring_interval_arg) : - Guardian_args(thread_registry_arg, instance_map_arg, - monitoring_interval_arg), - thread_info(pthread_self(), TRUE), guarded_instances(0) +Guardian::Guardian(Thread_registry *thread_registry_arg, + Instance_map *instance_map_arg, + uint monitoring_interval_arg) + :monitoring_interval(monitoring_interval_arg), + shutdown_requested(FALSE), + stopped(FALSE), + thread_registry(thread_registry_arg), + instance_map(instance_map_arg) { pthread_mutex_init(&LOCK_guardian, 0); pthread_cond_init(&COND_guardian, 0); - shutdown_requested= FALSE; - stopped= FALSE; init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0); } @@ -94,6 +85,8 @@ Guardian::~Guardian() pthread_cond_destroy(&COND_guardian); } +/* }}} */ + void Guardian::request_shutdown() { @@ -106,9 +99,9 @@ void Guardian::request_shutdown() void Guardian::process_instance(Instance *instance, - GUARD_NODE *current_node, - LIST **guarded_instances, - LIST *node) + GUARD_NODE *current_node, + LIST **guarded_instances, + LIST *node) { uint waitchild= (uint) Instance::DEFAULT_SHUTDOWN_DELAY; /* The amount of times, Guardian attempts to restart an instance */ @@ -117,7 +110,7 @@ void Guardian::process_instance(Instance *instance, if (current_node->state == STOPPING) { - /* this brach is executed during shutdown */ + /* this branch is executed during shutdown */ if (instance->options.shutdown_delay) { /* @@ -235,7 +228,7 @@ void Guardian::process_instance(Instance *instance, /* Run guardian thread - SYNOPSYS + SYNOPSIS run() DESCRIPTION @@ -252,9 +245,8 @@ void Guardian::run() log_info("Guardian: started."); - thread_registry.register_thread(&thread_info); + thread_registry->register_thread(&thread_info); - my_thread_init(); pthread_mutex_lock(&LOCK_guardian); /* loop, until all instances were shut down at the end */ @@ -275,8 +267,8 @@ void Guardian::run() /* check the loop predicate before sleeping */ if (!(shutdown_requested && (!(guarded_instances)))) - thread_registry.cond_timedwait(&thread_info, &COND_guardian, - &LOCK_guardian, &timeout); + thread_registry->cond_timedwait(&thread_info, &COND_guardian, + &LOCK_guardian, &timeout); } log_info("Guardian: stopped."); @@ -284,9 +276,8 @@ void Guardian::run() stopped= TRUE; pthread_mutex_unlock(&LOCK_guardian); /* now, when the Guardian is stopped we can stop the IM */ - thread_registry.unregister_thread(&thread_info); - thread_registry.request_shutdown(); - my_thread_end(); + thread_registry->unregister_thread(&thread_info); + thread_registry->request_shutdown(); log_info("Guardian: finished."); } @@ -306,7 +297,7 @@ int Guardian::is_stopped() Initialize the list of guarded instances: loop through the Instance_map and add all of the instances, which don't have 'nonguarded' option specified. - SYNOPSYS + SYNOPSIS Guardian::init() NOTE: The operation should be invoked with the following locks acquired: @@ -315,7 +306,7 @@ int Guardian::is_stopped() RETURN 0 - ok - 1 - error occured + 1 - error occurred */ int Guardian::init() @@ -344,7 +335,7 @@ int Guardian::init() /* Add instance to the Guardian list - SYNOPSYS + SYNOPSIS guard() instance the instance to be guarded nolock whether we prefer do not lock Guardian here, @@ -357,7 +348,7 @@ int Guardian::init() RETURN 0 - ok - 1 - error occured + 1 - error occurred */ int Guardian::guard(Instance *instance, bool nolock) @@ -418,7 +409,7 @@ int Guardian::stop_guard(Instance *instance) An internal method which is called at shutdown to unregister instances and attempt to stop them if requested. - SYNOPSYS + SYNOPSIS stop_instances() DESCRIPTION @@ -431,7 +422,7 @@ int Guardian::stop_guard(Instance *instance) RETURN 0 - ok - 1 - error occured + 1 - error occurred */ int Guardian::stop_instances() diff --git a/server-tools/instance-manager/guardian.h b/server-tools/instance-manager/guardian.h index 4c518dddf23..0eee1dc631d 100644 --- a/server-tools/instance-manager/guardian.h +++ b/server-tools/instance-manager/guardian.h @@ -16,11 +16,10 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <my_global.h> -#include <my_sys.h> -#include <my_list.h> #include "thread_registry.h" +#include <my_sys.h> +#include <my_list.h> #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #pragma interface @@ -31,30 +30,12 @@ class Instance_map; class Thread_registry; struct GUARD_NODE; -pthread_handler_t guardian_thread_func(void *arg); - -struct Guardian_args -{ - Thread_registry &thread_registry; - Instance_map *instance_map; - int monitoring_interval; - - Guardian_args(Thread_registry &thread_registry_arg, - Instance_map *instance_map_arg, - uint monitoring_interval_arg) : - thread_registry(thread_registry_arg), - instance_map(instance_map_arg), - monitoring_interval(monitoring_interval_arg) - {} -}; - - -/* +/** The guardian thread is responsible for monitoring and restarting of guarded instances. */ -class Guardian: public Guardian_args +class Guardian: public Thread { public: /* states of an instance */ @@ -82,12 +63,10 @@ public: /* Return client state name. */ static const char *get_instance_state_name(enum_instance_state state); - Guardian(Thread_registry &thread_registry_arg, - Instance_map *instance_map_arg, - uint monitoring_interval_arg); - ~Guardian(); - /* Main funtion of the thread */ - void run(); + Guardian(Thread_registry *thread_registry_arg, + Instance_map *instance_map_arg, + uint monitoring_interval_arg); + virtual ~Guardian(); /* Initialize or refresh the list of guarded instances */ int init(); /* Request guardian shutdown. Stop instances if needed */ @@ -117,6 +96,9 @@ public: a valid list node. */ inline enum_instance_state get_instance_state(LIST *instance_node); +protected: + /* Main funtion of the thread */ + virtual void run(); public: pthread_cond_t COND_guardian; @@ -133,6 +115,9 @@ private: private: pthread_mutex_t LOCK_guardian; Thread_info thread_info; + int monitoring_interval; + Thread_registry *thread_registry; + Instance_map *instance_map; LIST *guarded_instances; MEM_ROOT alloc; /* this variable is set to TRUE when we want to stop Guardian thread */ diff --git a/server-tools/instance-manager/instance.cc b/server-tools/instance-manager/instance.cc index 3927363a3e5..f170332e132 100644 --- a/server-tools/instance-manager/instance.cc +++ b/server-tools/instance-manager/instance.cc @@ -44,9 +44,6 @@ static const char * const INSTANCE_NAME_PREFIX= Instance::DFLT_INSTANCE_NAME.str static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length; -static void start_and_monitor_instance(Instance_options *old_instance_options, - Instance_map *instance_map, - Thread_registry *thread_registry); #ifndef __WIN__ typedef pid_t My_process_info; @@ -61,13 +58,24 @@ typedef PROCESS_INFORMATION My_process_info; to do it in a portable way. */ -pthread_handler_t proxy(void *arg) +class Instance_monitor: public Thread { - Instance *instance= (Instance *) arg; - start_and_monitor_instance(&instance->options, - instance->get_map(), +public: + Instance_monitor(Instance *instance_arg) :instance(instance_arg) {} +protected: + virtual void run(); + void start_and_monitor_instance(Instance_options *old_instance_options, + Instance_map *instance_map, + Thread_registry *thread_registry); +private: + Instance *instance; +}; + +void Instance_monitor::run() +{ + start_and_monitor_instance(&instance->options, instance->get_map(), &instance->thread_registry); - return 0; + delete this; } /* @@ -242,14 +250,16 @@ static int start_process(Instance_options *instance_options, Function returns no value */ -static void start_and_monitor_instance(Instance_options *old_instance_options, - Instance_map *instance_map, - Thread_registry *thread_registry) +void +Instance_monitor:: +start_and_monitor_instance(Instance_options *old_instance_options, + Instance_map *instance_map, + Thread_registry *thread_registry) { Instance_name instance_name(&old_instance_options->instance_name); Instance *current_instance; My_process_info process_info; - Thread_info thread_info(pthread_self(), FALSE); + Thread_info thread_info; log_info("Monitoring thread (instance: '%s'): started.", (const char *) instance_name.get_c_str()); @@ -258,12 +268,10 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, { /* Register thread in Thread_registry to wait for it to stop on shutdown - only if instance is nuarded. If instance is guarded, the thread will not + only if instance is guarded. If instance is guarded, the thread will not finish, because nonguarded instances are not stopped on shutdown. */ - - thread_registry->register_thread(&thread_info); - my_thread_init(); + thread_registry->register_thread(&thread_info, FALSE); } /* @@ -302,10 +310,7 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, instance_map->unlock(); if (!old_instance_options->nonguarded) - { thread_registry->unregister_thread(&thread_info); - my_thread_end(); - } log_info("Monitoring thread (instance: '%s'): finished.", (const char *) instance_name.get_c_str()); @@ -369,22 +374,19 @@ int Instance::start() if (configured && !is_running()) { + Instance_monitor *instance_monitor; remove_pid(); - pthread_t proxy_thd_id; - pthread_attr_t proxy_thd_attr; - int rc; + instance_monitor= new Instance_monitor(this); - pthread_attr_init(&proxy_thd_attr); - pthread_attr_setdetachstate(&proxy_thd_attr, PTHREAD_CREATE_DETACHED); - rc= pthread_create(&proxy_thd_id, &proxy_thd_attr, proxy, - this); - pthread_attr_destroy(&proxy_thd_attr); - if (rc) + if (instance_monitor == NULL || instance_monitor->start_detached()) { - log_error("Instance::start(): pthread_create(proxy) failed"); + delete instance_monitor; + log_error("Instance::start(): failed to create the monitoring thread" + " to start an instance"); return ER_CANNOT_START_INSTANCE; } + /* The monitoring thread will delete itself when it's finished. */ return 0; } diff --git a/server-tools/instance-manager/listener.cc b/server-tools/instance-manager/listener.cc index 62962c00957..b749f234560 100644 --- a/server-tools/instance-manager/listener.cc +++ b/server-tools/instance-manager/listener.cc @@ -29,7 +29,6 @@ #include <sys/un.h> #endif -#include "instance_map.h" #include "log.h" #include "mysql_connection.h" #include "options.h" @@ -59,47 +58,18 @@ static void set_no_inherit(int socket) } -/* - Listener_thread - incapsulates listening functionality -*/ - -class Listener_thread: public Listener_thread_args -{ -public: - Listener_thread(const Listener_thread_args &args); - ~Listener_thread(); - void run(); -private: - static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */ - ulong total_connection_count; - Thread_info thread_info; - - int sockets[2]; - int num_sockets; - fd_set read_fds; -private: - void handle_new_mysql_connection(Vio *vio); - int create_tcp_socket(); - int create_unix_socket(struct sockaddr_un &unix_socket_address); -}; - - -Listener_thread::Listener_thread(const Listener_thread_args &args) : - Listener_thread_args(args.thread_registry, args.user_map, args.instance_map) - ,total_connection_count(0) - ,thread_info(pthread_self(), TRUE) - ,num_sockets(0) -{ -} - - -Listener_thread::~Listener_thread() +Listener::Listener(Thread_registry *thread_registry_arg, + User_map *user_map_arg) + :thread_registry(thread_registry_arg), + user_map(user_map_arg), + total_connection_count(0), + num_sockets(0) { } /* - Listener_thread::run() - listen all supported sockets and spawn a thread + Listener::run() - listen all supported sockets and spawn a thread to handle incoming connection. Using 'die' in case of syscall failure is OK now - we don't hold any resources and 'die' kills the signal thread automatically. To be rewritten @@ -108,11 +78,11 @@ Listener_thread::~Listener_thread() architecture. */ -void Listener_thread::run() +void Listener::run() { int i, n= 0; - log_info("Listener_thread: started."); + log_info("Listener: started."); #ifndef __WIN__ /* we use this var to check whether we are running on LinuxThreads */ @@ -125,9 +95,7 @@ void Listener_thread::run() linuxthreads= (thread_pid != manager_pid); #endif - thread_registry.register_thread(&thread_info); - - my_thread_init(); + thread_registry->register_thread(&thread_info); FD_ZERO(&read_fds); @@ -146,7 +114,7 @@ void Listener_thread::run() n++; timeval tv; - while (!thread_registry.is_shutdown()) + while (!thread_registry->is_shutdown()) { fd_set read_fds_arg= read_fds; /* @@ -166,7 +134,7 @@ void Listener_thread::run() if (rc == 0 || rc == -1) { if (rc == -1 && errno != EINTR) - log_error("Listener_thread: select() failed, %s", + log_error("Listener: select() failed, %s", strerror(errno)); continue; } @@ -200,7 +168,7 @@ void Listener_thread::run() /* III. Release all resources and exit */ - log_info("Listener_thread: shutdown requested, exiting..."); + log_info("Listener: shutdown requested, exiting..."); for (i= 0; i < num_sockets; i++) close(sockets[i]); @@ -209,10 +177,9 @@ void Listener_thread::run() unlink(unix_socket_address.sun_path); #endif - thread_registry.unregister_thread(&thread_info); - my_thread_end(); + thread_registry->unregister_thread(&thread_info); - log_info("Listener_thread: finished."); + log_info("Listener: finished."); return; err: @@ -220,13 +187,12 @@ err: for (i= 0; i < num_sockets; i++) close(sockets[i]); - thread_registry.unregister_thread(&thread_info); - thread_registry.request_shutdown(); - my_thread_end(); + thread_registry->unregister_thread(&thread_info); + thread_registry->request_shutdown(); return; } -int Listener_thread::create_tcp_socket() +int Listener::create_tcp_socket() { /* value to be set by setsockopt */ int arg= 1; @@ -265,7 +231,7 @@ int Listener_thread::create_tcp_socket() if (bind(ip_socket, (struct sockaddr *) &ip_socket_address, sizeof(ip_socket_address))) { - log_error("Listener_thread: bind(ip socket) failed, '%s'", + log_error("Listener: bind(ip socket) failed, '%s'", strerror(errno)); close(ip_socket); return -1; @@ -273,7 +239,7 @@ int Listener_thread::create_tcp_socket() if (listen(ip_socket, LISTEN_BACK_LOG_SIZE)) { - log_error("Listener_thread: listen(ip socket) failed, %s", + log_error("Listener: listen(ip socket) failed, %s", strerror(errno)); close(ip_socket); return -1; @@ -292,7 +258,7 @@ int Listener_thread::create_tcp_socket() } #ifndef __WIN__ -int Listener_thread:: +int Listener:: create_unix_socket(struct sockaddr_un &unix_socket_address) { int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0); @@ -318,7 +284,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) if (bind(unix_socket, (struct sockaddr *) &unix_socket_address, sizeof(unix_socket_address))) { - log_error("Listener_thread: bind(unix socket) failed, " + log_error("Listener: bind(unix socket) failed, " "socket file name is '%s', error '%s'", unix_socket_address.sun_path, strerror(errno)); close(unix_socket); @@ -329,7 +295,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) if (listen(unix_socket, LISTEN_BACK_LOG_SIZE)) { - log_error("Listener_thread: listen(unix socket) failed, %s", + log_error("Listener: listen(unix socket) failed, %s", strerror(errno)); close(unix_socket); return -1; @@ -357,46 +323,16 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) handle_new_mysql_connection() */ -void Listener_thread::handle_new_mysql_connection(Vio *vio) +void Listener::handle_new_mysql_connection(Vio *vio) { - if (Mysql_connection_thread_args *mysql_thread_args= - new Mysql_connection_thread_args(vio, thread_registry, user_map, - ++total_connection_count, - instance_map) - ) + Mysql_connection *mysql_connection= + new Mysql_connection(thread_registry, user_map, + vio, ++total_connection_count); + if (mysql_connection == NULL || mysql_connection->start_detached()) { - /* - Initialize thread attributes to create detached thread; it seems - easier to do it ad-hoc than have a global variable for attributes. - */ - pthread_t mysql_thd_id; - pthread_attr_t mysql_thd_attr; - pthread_attr_init(&mysql_thd_attr); - pthread_attr_setdetachstate(&mysql_thd_attr, PTHREAD_CREATE_DETACHED); - if (set_stacksize_n_create_thread(&mysql_thd_id, &mysql_thd_attr, - mysql_connection, mysql_thread_args)) - { - delete mysql_thread_args; - vio_delete(vio); - log_error("handle_one_mysql_connection():" - "set_stacksize_n_create_thread(mysql) failed"); - } - pthread_attr_destroy(&mysql_thd_attr); - } - else + log_error("handle_one_mysql_connection() failed"); + delete mysql_connection; vio_delete(vio); + } + /* The connection will delete itself when the thread is finished */ } - - -pthread_handler_t listener(void *arg) -{ - Listener_thread_args *args= (Listener_thread_args *) arg; - Listener_thread listener(*args); - listener.run(); - /* - args is a stack variable because listener thread lives as long as the - manager process itself - */ - return 0; -} - diff --git a/server-tools/instance-manager/listener.h b/server-tools/instance-manager/listener.h index c28ab0649d7..7758c2dc13d 100644 --- a/server-tools/instance-manager/listener.h +++ b/server-tools/instance-manager/listener.h @@ -16,33 +16,39 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <my_global.h> -#include <my_pthread.h> +#include "thread_registry.h" #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #pragma interface #endif - -pthread_handler_t listener(void *arg); - class Thread_registry; class User_map; -class Instance_map; -struct Listener_thread_args +/** + Listener - a thread listening on sockets and spawning + connection threads. +*/ + +class Listener: public Thread { - Thread_registry &thread_registry; - const User_map &user_map; - Instance_map &instance_map; - - Listener_thread_args(Thread_registry &thread_registry_arg, - const User_map &user_map_arg, - Instance_map &instance_map_arg) : - thread_registry(thread_registry_arg) - ,user_map(user_map_arg) - ,instance_map(instance_map_arg) - {} +public: + Listener(Thread_registry *thread_registry_arg, User_map *user_map_arg); +protected: + virtual void run(); +private: + Thread_info thread_info; + Thread_registry *thread_registry; + User_map *user_map; + static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */ + ulong total_connection_count; + + int sockets[2]; + int num_sockets; + fd_set read_fds; + void handle_new_mysql_connection(struct st_vio *vio); + int create_tcp_socket(); + int create_unix_socket(struct sockaddr_un &unix_socket_address); }; #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H diff --git a/server-tools/instance-manager/manager.cc b/server-tools/instance-manager/manager.cc index 55d7137c97a..8c6bffa8d97 100644 --- a/server-tools/instance-manager/manager.cc +++ b/server-tools/instance-manager/manager.cc @@ -139,10 +139,10 @@ int Manager::main() User_map user_map; Instance_map instance_map(Options::Main::default_mysqld_path, thread_registry); - Guardian guardian(thread_registry, &instance_map, + Guardian guardian(&thread_registry, &instance_map, Options::Main::monitoring_interval); - Listener_thread_args listener_args(thread_registry, user_map, instance_map); + Listener listener(&thread_registry, &user_map); manager_pid= getpid(); p_instance_map= &instance_map; @@ -212,40 +212,29 @@ int Manager::main() sigset_t mask; set_signals(&mask); - /* create guardian thread */ + /* + Create the guardian thread. The newly started thread will block until + we actually load instances. + + NOTE: Guardian should be shutdown first. Only then all other threads + can be stopped. This should be done in this order because the guardian + is responsible for shutting down all the guarded instances, and this + is a long operation. + + NOTE: Guardian uses thr_alarm() when detects the current state of an + instance (is_running()), but this does not interfere with + flush_instances() call later in the code, because until + flush_instances() completes in the main thread, Guardian thread is not + permitted to process instances. And before flush_instances() has + completed, there are no instances to guard. + */ + if (guardian.start_detached()) { - pthread_t guardian_thd_id; - pthread_attr_t guardian_thd_attr; - int rc; - - /* - NOTE: Guardian should be shutdown first. Only then all other threads - need to be stopped. This should be done, as guardian is responsible - for shutting down the instances, and this is a long operation. - - NOTE: Guardian uses thr_alarm() when detects current state of - instances (is_running()), but it is not interfere with - flush_instances() later in the code, because until flush_instances() - complete in the main thread, Guardian thread is not permitted to - process instances. And before flush_instances() there is no instances - to proceed. - */ - - pthread_attr_init(&guardian_thd_attr); - pthread_attr_setdetachstate(&guardian_thd_attr, PTHREAD_CREATE_DETACHED); - rc= set_stacksize_n_create_thread(&guardian_thd_id, &guardian_thd_attr, - guardian_thread_func, &guardian); - pthread_attr_destroy(&guardian_thd_attr); - if (rc) - { - log_error("manager(): set_stacksize_n_create_thread(guardian) failed"); - goto err; - } - + log_error("manager(): Failed to create the guardian thread"); + goto err; } /* Load instances. */ - { instance_map.guardian->lock(); instance_map.lock(); @@ -265,23 +254,12 @@ int Manager::main() } } - /* create the listener */ + /* start the listener */ + if (listener.start_detached()) { - pthread_t listener_thd_id; - pthread_attr_t listener_thd_attr; - int rc; - - pthread_attr_init(&listener_thd_attr); - pthread_attr_setdetachstate(&listener_thd_attr, PTHREAD_CREATE_DETACHED); - rc= set_stacksize_n_create_thread(&listener_thd_id, &listener_thd_attr, - listener, &listener_args); - pthread_attr_destroy(&listener_thd_attr); - if (rc) - { - log_error("manager(): set_stacksize_n_create_thread(listener) failed"); - stop_all(&guardian, &thread_registry); - goto err; - } + log_error("manager(): set_stacksize_n_create_thread(listener) failed"); + stop_all(&guardian, &thread_registry); + goto err; } /* diff --git a/server-tools/instance-manager/mysql_connection.cc b/server-tools/instance-manager/mysql_connection.cc index 72081234c94..b6f62d0f6eb 100644 --- a/server-tools/instance-manager/mysql_connection.cc +++ b/server-tools/instance-manager/mysql_connection.cc @@ -23,7 +23,6 @@ #include <m_string.h> #include <m_string.h> #include <my_global.h> -#include <mysql_com.h> #include <mysql.h> #include <my_sys.h> #include <violite.h> @@ -40,66 +39,15 @@ #include "user_map.h" -Mysql_connection_thread_args::Mysql_connection_thread_args( - struct st_vio *vio_arg, - Thread_registry &thread_registry_arg, - const User_map &user_map_arg, - ulong connection_id_arg, - Instance_map &instance_map_arg) : - vio(vio_arg) - ,thread_registry(thread_registry_arg) - ,user_map(user_map_arg) - ,connection_id(connection_id_arg) - ,instance_map(instance_map_arg) - {} - -/* - MySQL connection - handle one connection with mysql command line client - See also comments in mysqlmanager.cc to picture general Instance Manager - architecture. - We use conventional technique to work with classes without exceptions: - class acquires all vital resource in init(); Thus if init() succeed, - a user must call cleanup(). All other methods are valid only between - init() and cleanup(). -*/ - -class Mysql_connection_thread: public Mysql_connection_thread_args -{ -public: - Mysql_connection_thread(const Mysql_connection_thread_args &args); - - int init(); - void cleanup(); - - void run(); - - ~Mysql_connection_thread(); -private: - Thread_info thread_info; - NET net; - struct rand_struct rand_st; - char scramble[SCRAMBLE_LENGTH + 1]; - uint status; - ulong client_capabilities; -private: - /* Names are conventionally the same as in mysqld */ - int check_connection(); - int do_command(); - int dispatch_command(enum enum_server_command command, - const char *text, uint len); -}; - - -Mysql_connection_thread::Mysql_connection_thread( - const Mysql_connection_thread_args &args) : - Mysql_connection_thread_args(args.vio, - args.thread_registry, - args.user_map, - args.connection_id, - args.instance_map) - ,thread_info(pthread_self(), TRUE) +Mysql_connection::Mysql_connection(Thread_registry *thread_registry_arg, + User_map *user_map_arg, + struct st_vio *vio_arg, ulong + connection_id_arg) + :vio(vio_arg), + connection_id(connection_id_arg), + thread_registry(thread_registry_arg), + user_map(user_map_arg) { - thread_registry.register_thread(&thread_info); } @@ -129,7 +77,7 @@ C_MODE_END This function is complementary to cleanup(). */ -int Mysql_connection_thread::init() +int Mysql_connection::init() { /* Allocate buffers for network I/O */ if (my_net_init(&net, vio)) @@ -145,52 +93,46 @@ int Mysql_connection_thread::init() create_random_string(scramble, SCRAMBLE_LENGTH, &rand_st); /* We don't support transactions, every query is atomic */ status= SERVER_STATUS_AUTOCOMMIT; + thread_registry->register_thread(&thread_info); return 0; } -void Mysql_connection_thread::cleanup() +void Mysql_connection::cleanup() { net_end(&net); + thread_registry->unregister_thread(&thread_info); } -Mysql_connection_thread::~Mysql_connection_thread() +Mysql_connection::~Mysql_connection() { /* vio_delete closes the socket if necessary */ vio_delete(vio); - thread_registry.unregister_thread(&thread_info); } -void Mysql_connection_thread::run() +void Mysql_connection::main() { log_info("accepted mysql connection %lu", (unsigned long) connection_id); - my_thread_init(); - if (check_connection()) - { - my_thread_end(); return; - } log_info("connection %lu is checked successfully", (unsigned long) connection_id); vio_keepalive(vio, TRUE); - while (!net.error && net.vio && !thread_registry.is_shutdown()) + while (!net.error && net.vio && !thread_registry->is_shutdown()) { if (do_command()) break; } - - my_thread_end(); } -int Mysql_connection_thread::check_connection() +int Mysql_connection::check_connection() { ulong pkt_len=0; // to hold client reply length @@ -279,7 +221,7 @@ int Mysql_connection_thread::check_connection() net_send_error(&net, ER_ACCESS_DENIED_ERROR); return 1; } - if (user_map.authenticate(&user_name, password, scramble)) + if (user_map->authenticate(&user_name, password, scramble)) { net_send_error(&net, ER_ACCESS_DENIED_ERROR); return 1; @@ -289,7 +231,7 @@ int Mysql_connection_thread::check_connection() } -int Mysql_connection_thread::do_command() +int Mysql_connection::do_command() { char *packet; ulong packet_length; @@ -302,7 +244,7 @@ int Mysql_connection_thread::do_command() /* Check if we can continue without closing the connection */ if (net.error != 3) // what is 3 - find out return 1; - if (thread_registry.is_shutdown()) + if (thread_registry->is_shutdown()) return 1; net_send_error(&net, net.last_errno); net.error= 0; @@ -310,7 +252,7 @@ int Mysql_connection_thread::do_command() } else { - if (thread_registry.is_shutdown()) + if (thread_registry->is_shutdown()) return 1; packet= (char*) net.read_pos; enum enum_server_command command= (enum enum_server_command) @@ -321,8 +263,8 @@ int Mysql_connection_thread::do_command() } } -int Mysql_connection_thread::dispatch_command(enum enum_server_command command, - const char *packet, uint len) +int Mysql_connection::dispatch_command(enum enum_server_command command, + const char *packet, uint len) { switch (command) { case COM_QUIT: // client exit @@ -374,19 +316,16 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, } -pthread_handler_t mysql_connection(void *arg) +void Mysql_connection::run() { - Mysql_connection_thread_args *args= (Mysql_connection_thread_args *) arg; - Mysql_connection_thread mysql_connection_thread(*args); - delete args; - if (mysql_connection_thread.init()) - log_info("mysql_connection(): error initializing thread"); + if (init()) + log_info("Mysql_connection::run(): error initializing thread"); else { - mysql_connection_thread.run(); - mysql_connection_thread.cleanup(); + main(); + cleanup(); } - return 0; + delete this; } /* diff --git a/server-tools/instance-manager/mysql_connection.h b/server-tools/instance-manager/mysql_connection.h index 3496cc05815..aceb2d62736 100644 --- a/server-tools/instance-manager/mysql_connection.h +++ b/server-tools/instance-manager/mysql_connection.h @@ -16,33 +16,60 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <my_global.h> -#include <my_pthread.h> +#include "thread_registry.h" +#include <mysql_com.h> #if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE) #pragma interface #endif -pthread_handler_t mysql_connection(void *arg); - -class Thread_registry; -class User_map; -class Instance_map; struct st_vio; +class User_map; -struct Mysql_connection_thread_args +/* + MySQL connection - handle one connection with mysql command line client + See also comments in mysqlmanager.cc to picture general Instance Manager + architecture. + We use conventional technique to work with classes without exceptions: + class acquires all vital resource in init(); Thus if init() succeed, + a user must call cleanup(). All other methods are valid only between + init() and cleanup(). +*/ + +class Mysql_connection: public Thread { +public: + Mysql_connection(Thread_registry *thread_registry_arg, + User_map *user_map_arg, + struct st_vio *vio_arg, + ulong connection_id_arg); + virtual ~Mysql_connection(); + +protected: + virtual void run(); + +private: struct st_vio *vio; - Thread_registry &thread_registry; - const User_map &user_map; ulong connection_id; - Instance_map &instance_map; + Thread_info thread_info; + Thread_registry *thread_registry; + User_map *user_map; + NET net; + struct rand_struct rand_st; + char scramble[SCRAMBLE_LENGTH + 1]; + uint status; + ulong client_capabilities; +private: + /* The main loop implementation triad */ + int init(); + void main(); + void cleanup(); - Mysql_connection_thread_args(struct st_vio *vio_arg, - Thread_registry &thread_registry_arg, - const User_map &user_map_arg, - ulong connection_id_arg, - Instance_map &instance_map_arg); + /* Names are conventionally the same as in mysqld */ + int check_connection(); + int do_command(); + int dispatch_command(enum enum_server_command command, + const char *text, uint len); }; #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MYSQL_CONNECTION_H diff --git a/server-tools/instance-manager/priv.cc b/server-tools/instance-manager/priv.cc index 7e3553444ac..934684a1a06 100644 --- a/server-tools/instance-manager/priv.cc +++ b/server-tools/instance-manager/priv.cc @@ -22,17 +22,6 @@ #include "log.h" -#if defined(__ia64__) || defined(__ia64) -/* - We can live with 32K, but reserve 64K. Just to be safe. - On ia64 we need to reserve double of the size. -*/ -#define IM_THREAD_STACK_SIZE (128*1024L) -#else -#define IM_THREAD_STACK_SIZE (64*1024) -#endif - - /* the pid of the manager process (of the signal thread on the LinuxThreads) */ pid_t manager_pid; @@ -66,33 +55,6 @@ unsigned long bytes_sent = 0L, bytes_received = 0L; unsigned long mysqld_net_retry_count = 10L; unsigned long open_files_limit; -/* - Change the stack size and start a thread. Return an error if either - pthread_attr_setstacksize or pthread_create fails. - Arguments are the same as for pthread_create(). -*/ - -int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr, - void *(*start_routine)(void *), void *arg) -{ - int rc= 0; - -#ifndef __WIN__ -#ifndef PTHREAD_STACK_MIN -#define PTHREAD_STACK_MIN 32768 -#endif - /* - Set stack size to be safe on the platforms with too small - default thread stack. - */ - rc= pthread_attr_setstacksize(attr, - (size_t) (PTHREAD_STACK_MIN + - IM_THREAD_STACK_SIZE)); -#endif - if (!rc) - rc= pthread_create(thread, attr, start_routine, arg); - return rc; -} int create_pid_file(const char *pid_file_name, int pid) diff --git a/server-tools/instance-manager/priv.h b/server-tools/instance-manager/priv.h index 33be81bc3eb..7ac8ac73c6f 100644 --- a/server-tools/instance-manager/priv.h +++ b/server-tools/instance-manager/priv.h @@ -105,10 +105,6 @@ extern unsigned long bytes_sent, bytes_received; extern unsigned long mysqld_net_retry_count; extern unsigned long open_files_limit; - -int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr, - void *(*start_routine)(void *), void *arg); - int create_pid_file(const char *pid_file_name, int pid); #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_PRIV_H diff --git a/server-tools/instance-manager/thread_registry.cc b/server-tools/instance-manager/thread_registry.cc index 10370e0981e..037f104f7bd 100644 --- a/server-tools/instance-manager/thread_registry.cc +++ b/server-tools/instance-manager/thread_registry.cc @@ -38,15 +38,13 @@ static void handle_signal(int __attribute__((unused)) sig_no) } #endif -/* - Thread_info initializer methods -*/ +/* Thread_info initializer methods */ -Thread_info::Thread_info() {} -Thread_info::Thread_info(pthread_t thread_id_arg, - bool send_signal_on_shutdown_arg) : - thread_id(thread_id_arg), - send_signal_on_shutdown(send_signal_on_shutdown_arg) {} +void Thread_info::init(bool send_signal_on_shutdown_arg) +{ + thread_id= pthread_self(); + send_signal_on_shutdown= send_signal_on_shutdown_arg; +} /* TODO: think about moving signal information (now it's shutdown_in_progress) @@ -86,11 +84,14 @@ Thread_registry::~Thread_registry() points to the last node. */ -void Thread_registry::register_thread(Thread_info *info) +void Thread_registry::register_thread(Thread_info *info, + bool send_signal_on_shutdown) { log_info("Thread_registry: registering thread %d...", (int) info->thread_id); + info->init(send_signal_on_shutdown); + #ifndef __WIN__ struct sigaction sa; sa.sa_handler= handle_signal; @@ -298,3 +299,80 @@ void Thread_registry::wait_for_threads_to_unregister() } } } + + +/********************************************************************* + class Thread +*********************************************************************/ + +#if defined(__ia64__) || defined(__ia64) +/* + We can live with 32K, but reserve 64K. Just to be safe. + On ia64 we need to reserve double of the size. +*/ +#define IM_THREAD_STACK_SIZE (128*1024L) +#else +#define IM_THREAD_STACK_SIZE (64*1024) +#endif + +/* + Change the stack size and start a thread. Return an error if either + pthread_attr_setstacksize or pthread_create fails. + Arguments are the same as for pthread_create(). +*/ + +static +int set_stacksize_and_create_thread(pthread_t *thread, pthread_attr_t *attr, + void *(*start_routine)(void *), void *arg) +{ + int rc= 0; + +#ifndef __WIN__ +#ifndef PTHREAD_STACK_MIN +#define PTHREAD_STACK_MIN 32768 +#endif + /* + Set stack size to be safe on the platforms with too small + default thread stack. + */ + rc= pthread_attr_setstacksize(attr, + (size_t) (PTHREAD_STACK_MIN + + IM_THREAD_STACK_SIZE)); +#endif + if (!rc) + rc= pthread_create(thread, attr, start_routine, arg); + return rc; +} + + +Thread::~Thread() +{ +} + + +void *Thread::thread_func(void *arg) +{ + Thread *thread= (Thread *) arg; + my_thread_init(); + + thread->run(); + + my_thread_end(); + return NULL; +} + + +bool Thread::start_detached() +{ + pthread_t thd_id; + pthread_attr_t attr; + int rc; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + rc= set_stacksize_and_create_thread(&thd_id, &attr, + Thread::thread_func, this); + pthread_attr_destroy(&attr); + + return rc != 0; +} diff --git a/server-tools/instance-manager/thread_registry.h b/server-tools/instance-manager/thread_registry.h index 3af7c8e0240..034ac1b0ca8 100644 --- a/server-tools/instance-manager/thread_registry.h +++ b/server-tools/instance-manager/thread_registry.h @@ -57,7 +57,7 @@ #pragma interface #endif -/* +/** Thread_info - repository entry for each worker thread All entries comprise double-linked list like: 0 -- entry -- entry -- entry - 0 @@ -67,12 +67,10 @@ class Thread_info { public: - Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg); + Thread_info() {} friend class Thread_registry; - private: - Thread_info(); - + void init(bool send_signal_on_shutdown); private: pthread_cond_t *current_cond; Thread_info *prev, *next; @@ -81,7 +79,26 @@ private: }; -/* +/** + A base class for a detached thread. +*/ + +class Thread +{ +public: + Thread() {} + bool start_detached(); +protected: + virtual void run()= 0; + virtual ~Thread(); +private: + static void *thread_func(void *arg); + Thread(const Thread & /* rhs */); /* not implemented */ + Thread &operator=(const Thread & /* rhs */); /* not implemented */ +}; + + +/** Thread_registry - contains handles for each worker thread to deliver signal information to workers. */ @@ -92,7 +109,7 @@ public: Thread_registry(); ~Thread_registry(); - void register_thread(Thread_info *info); + void register_thread(Thread_info *info, bool send_signal_on_shutdown= TRUE); void unregister_thread(Thread_info *info); void deliver_shutdown(); void request_shutdown(); |