diff options
-rw-r--r-- | server-tools/instance-manager/guardian.cc | 8 | ||||
-rw-r--r-- | server-tools/instance-manager/instance.cc | 43 | ||||
-rw-r--r-- | server-tools/instance-manager/instance.h | 4 | ||||
-rw-r--r-- | server-tools/instance-manager/instance_map.cc | 10 | ||||
-rw-r--r-- | server-tools/instance-manager/instance_map.h | 6 | ||||
-rw-r--r-- | server-tools/instance-manager/listener.cc | 22 | ||||
-rw-r--r-- | server-tools/instance-manager/manager.cc | 23 | ||||
-rw-r--r-- | server-tools/instance-manager/mysql_connection.cc | 29 | ||||
-rw-r--r-- | server-tools/instance-manager/thread_registry.cc | 117 | ||||
-rw-r--r-- | server-tools/instance-manager/thread_registry.h | 12 |
10 files changed, 203 insertions, 71 deletions
diff --git a/server-tools/instance-manager/guardian.cc b/server-tools/instance-manager/guardian.cc index af57f1decbc..03bfadd8571 100644 --- a/server-tools/instance-manager/guardian.cc +++ b/server-tools/instance-manager/guardian.cc @@ -74,7 +74,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg, uint monitoring_interval_arg) : Guardian_thread_args(thread_registry_arg, instance_map_arg, monitoring_interval_arg), - thread_info(pthread_self()), guarded_instances(0) + thread_info(pthread_self(), TRUE), guarded_instances(0) { pthread_mutex_init(&LOCK_guardian, 0); pthread_cond_init(&COND_guardian, 0); @@ -250,6 +250,8 @@ void Guardian_thread::run() LIST *node; struct timespec timeout; + log_info("Guardian: started."); + thread_registry.register_thread(&thread_info); my_thread_init(); @@ -277,12 +279,16 @@ void Guardian_thread::run() &LOCK_guardian, &timeout); } + log_info("Guardian: stopped."); + stopped= TRUE; pthread_mutex_unlock(&LOCK_guardian); /* now, when the Guardian is stopped we can stop the IM */ thread_registry.unregister_thread(&thread_info); thread_registry.request_shutdown(); my_thread_end(); + + log_info("Guardian: finished."); } diff --git a/server-tools/instance-manager/instance.cc b/server-tools/instance-manager/instance.cc index 1dfe6167020..3927363a3e5 100644 --- a/server-tools/instance-manager/instance.cc +++ b/server-tools/instance-manager/instance.cc @@ -34,6 +34,7 @@ #include "mysql_manager_error.h" #include "portability.h" #include "priv.h" +#include "thread_registry.h" const LEX_STRING @@ -44,7 +45,8 @@ static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length; static void start_and_monitor_instance(Instance_options *old_instance_options, - Instance_map *instance_map); + Instance_map *instance_map, + Thread_registry *thread_registry); #ifndef __WIN__ typedef pid_t My_process_info; @@ -63,7 +65,8 @@ pthread_handler_t proxy(void *arg) { Instance *instance= (Instance *) arg; start_and_monitor_instance(&instance->options, - instance->get_map()); + instance->get_map(), + &instance->thread_registry); return 0; } @@ -99,6 +102,7 @@ static int wait_process(My_process_info *pi) thread, but we don't know this one). Or we could use waitpid(), but couldn't use wait(), because it could return in any wait() in the program. */ + if (linuxthreads) wait(NULL); /* LinuxThreads were detected */ else @@ -239,11 +243,28 @@ static int start_process(Instance_options *instance_options, */ static void start_and_monitor_instance(Instance_options *old_instance_options, - Instance_map *instance_map) + Instance_map *instance_map, + Thread_registry *thread_registry) { Instance_name instance_name(&old_instance_options->instance_name); Instance *current_instance; My_process_info process_info; + Thread_info thread_info(pthread_self(), FALSE); + + log_info("Monitoring thread (instance: '%s'): started.", + (const char *) instance_name.get_c_str()); + + if (!old_instance_options->nonguarded) + { + /* + Register thread in Thread_registry to wait for it to stop on shutdown + only if instance is nuarded. If instance is guarded, the thread will not + finish, because nonguarded instances are not stopped on shutdown. + */ + + thread_registry->register_thread(&thread_info); + my_thread_init(); + } /* Lock instance map to guarantee that no instances are deleted during @@ -280,7 +301,14 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, instance_map->unlock(); - return; + if (!old_instance_options->nonguarded) + { + thread_registry->unregister_thread(&thread_info); + my_thread_end(); + } + + log_info("Monitoring thread (instance: '%s'): finished.", + (const char *) instance_name.get_c_str()); } @@ -343,10 +371,6 @@ int Instance::start() { remove_pid(); - /* - No need to monitor this thread in the Thread_registry, as all - instances are to be stopped during shutdown. - */ pthread_t proxy_thd_id; pthread_attr_t proxy_thd_attr; int rc; @@ -404,7 +428,8 @@ void Instance::set_crash_flag_n_wake_all() -Instance::Instance(): crashed(FALSE), configured(FALSE) +Instance::Instance(Thread_registry &thread_registry_arg): + crashed(FALSE), configured(FALSE), thread_registry(thread_registry_arg) { pthread_mutex_init(&LOCK_instance, 0); pthread_cond_init(&COND_instance_stopped, 0); diff --git a/server-tools/instance-manager/instance.h b/server-tools/instance-manager/instance.h index 1f06cabebf7..329eaa68b1a 100644 --- a/server-tools/instance-manager/instance.h +++ b/server-tools/instance-manager/instance.h @@ -27,6 +27,7 @@ #endif class Instance_map; +class Thread_registry; /* @@ -87,7 +88,7 @@ public: static bool is_mysqld_compatible_name(const LEX_STRING *name); public: - Instance(); + Instance(Thread_registry &thread_registry_arg); ~Instance(); int init(const LEX_STRING *name_arg); @@ -120,6 +121,7 @@ public: public: enum { DEFAULT_SHUTDOWN_DELAY= 35 }; Instance_options options; + Thread_registry &thread_registry; private: /* This attributes is a flag, specifies if the instance has been crashed. */ diff --git a/server-tools/instance-manager/instance_map.cc b/server-tools/instance-manager/instance_map.cc index b7704c027f1..2f830e616c4 100644 --- a/server-tools/instance-manager/instance_map.cc +++ b/server-tools/instance-manager/instance_map.cc @@ -169,7 +169,7 @@ int Instance_map::process_one_option(const LEX_STRING *group, if (!(instance= (Instance *) hash_search(&hash, (byte *) group->str, group->length))) { - if (!(instance= new Instance())) + if (!(instance= new Instance(thread_registry))) return 1; if (instance->init(group) || add_instance(instance)) @@ -213,8 +213,10 @@ int Instance_map::process_one_option(const LEX_STRING *group, } -Instance_map::Instance_map(const char *default_mysqld_path_arg): -mysqld_path(default_mysqld_path_arg) +Instance_map::Instance_map(const char *default_mysqld_path_arg, + Thread_registry &thread_registry_arg): + mysqld_path(default_mysqld_path_arg), + thread_registry(thread_registry_arg) { pthread_mutex_init(&LOCK_instance_map, 0); } @@ -333,7 +335,7 @@ int Instance_map::remove_instance(Instance *instance) int Instance_map::create_instance(const LEX_STRING *instance_name, const Named_value_arr *options) { - Instance *instance= new Instance(); + Instance *instance= new Instance(thread_registry); if (!instance) { diff --git a/server-tools/instance-manager/instance_map.h b/server-tools/instance-manager/instance_map.h index 8e6d2360652..9de40e35e0f 100644 --- a/server-tools/instance-manager/instance_map.h +++ b/server-tools/instance-manager/instance_map.h @@ -28,6 +28,7 @@ class Guardian_thread; class Instance; class Named_value_arr; +class Thread_registry; extern int load_all_groups(char ***groups, const char *filename); extern void free_groups(char **groups); @@ -104,7 +105,8 @@ public: int create_instance(const LEX_STRING *instance_name, const Named_value_arr *options); - Instance_map(const char *default_mysqld_path_arg); + Instance_map(const char *default_mysqld_path_arg, + Thread_registry &thread_registry_arg); ~Instance_map(); /* @@ -130,6 +132,8 @@ private: enum { START_HASH_SIZE = 16 }; pthread_mutex_t LOCK_instance_map; HASH hash; + + Thread_registry &thread_registry; }; #endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */ diff --git a/server-tools/instance-manager/listener.cc b/server-tools/instance-manager/listener.cc index 0ab85c0e7aa..62962c00957 100644 --- a/server-tools/instance-manager/listener.cc +++ b/server-tools/instance-manager/listener.cc @@ -87,7 +87,7 @@ private: Listener_thread::Listener_thread(const Listener_thread_args &args) : Listener_thread_args(args.thread_registry, args.user_map, args.instance_map) ,total_connection_count(0) - ,thread_info(pthread_self()) + ,thread_info(pthread_self(), TRUE) ,num_sockets(0) { } @@ -112,6 +112,8 @@ void Listener_thread::run() { int i, n= 0; + log_info("Listener_thread: started."); + #ifndef __WIN__ /* we use this var to check whether we are running on LinuxThreads */ pid_t thread_pid; @@ -164,7 +166,7 @@ void Listener_thread::run() if (rc == 0 || rc == -1) { if (rc == -1 && errno != EINTR) - log_error("Listener_thread::run(): select() failed, %s", + log_error("Listener_thread: select() failed, %s", strerror(errno)); continue; } @@ -198,7 +200,7 @@ void Listener_thread::run() /* III. Release all resources and exit */ - log_info("Listener_thread::run(): shutdown requested, exiting..."); + log_info("Listener_thread: shutdown requested, exiting..."); for (i= 0; i < num_sockets; i++) close(sockets[i]); @@ -209,6 +211,8 @@ void Listener_thread::run() thread_registry.unregister_thread(&thread_info); my_thread_end(); + + log_info("Listener_thread: finished."); return; err: @@ -230,7 +234,7 @@ int Listener_thread::create_tcp_socket() int ip_socket= socket(AF_INET, SOCK_STREAM, 0); if (ip_socket == INVALID_SOCKET) { - log_error("Listener_thead::run(): socket(AF_INET) failed, %s", + log_error("Listener_thead: socket(AF_INET) failed, %s", strerror(errno)); return -1; } @@ -261,7 +265,7 @@ int Listener_thread::create_tcp_socket() if (bind(ip_socket, (struct sockaddr *) &ip_socket_address, sizeof(ip_socket_address))) { - log_error("Listener_thread::run(): bind(ip socket) failed, '%s'", + log_error("Listener_thread: bind(ip socket) failed, '%s'", strerror(errno)); close(ip_socket); return -1; @@ -269,7 +273,7 @@ int Listener_thread::create_tcp_socket() if (listen(ip_socket, LISTEN_BACK_LOG_SIZE)) { - log_error("Listener_thread::run(): listen(ip socket) failed, %s", + log_error("Listener_thread: listen(ip socket) failed, %s", strerror(errno)); close(ip_socket); return -1; @@ -294,7 +298,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0); if (unix_socket == INVALID_SOCKET) { - log_error("Listener_thead::run(): socket(AF_UNIX) failed, %s", + log_error("Listener_thead: socket(AF_UNIX) failed, %s", strerror(errno)); return -1; } @@ -314,7 +318,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) if (bind(unix_socket, (struct sockaddr *) &unix_socket_address, sizeof(unix_socket_address))) { - log_error("Listener_thread::run(): bind(unix socket) failed, " + log_error("Listener_thread: bind(unix socket) failed, " "socket file name is '%s', error '%s'", unix_socket_address.sun_path, strerror(errno)); close(unix_socket); @@ -325,7 +329,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) if (listen(unix_socket, LISTEN_BACK_LOG_SIZE)) { - log_error("Listener_thread::run(): listen(unix socket) failed, %s", + log_error("Listener_thread: listen(unix socket) failed, %s", strerror(errno)); close(unix_socket); return -1; diff --git a/server-tools/instance-manager/manager.cc b/server-tools/instance-manager/manager.cc index 3fb967fc352..4bd298eedec 100644 --- a/server-tools/instance-manager/manager.cc +++ b/server-tools/instance-manager/manager.cc @@ -156,7 +156,8 @@ void manager() */ User_map user_map; - Instance_map instance_map(Options::Main::default_mysqld_path); + Instance_map instance_map(Options::Main::default_mysqld_path, + thread_registry); Guardian_thread guardian_thread(thread_registry, &instance_map, Options::Main::monitoring_interval); @@ -308,6 +309,8 @@ void manager() */ pthread_cond_signal(&guardian_thread.COND_guardian); + log_info("Main loop: started."); + while (!shutdown_complete) { int signo; @@ -320,6 +323,20 @@ void manager() goto err; } + /* + The general idea in this loop is the following: + - we are waiting for SIGINT, SIGTERM -- signals that mean we should + shutdown; + - as shutdown signal is caught, we stop Guardian thread (by calling + Guardian_thread::request_shutdown()); + - as Guardian_thread is stopped, it sends SIGTERM to this thread + (by calling Thread_registry::request_shutdown()), so that the + my_sigwait() above returns; + - as we catch the second SIGTERM, we send signals to all threads + registered in Thread_registry (by calling + Thread_registry::deliver_shutdown()) and waiting for threads to stop; + */ + #ifndef __WIN__ /* On some Darwin kernels SIGHUP is delivered along with most @@ -336,6 +353,8 @@ void manager() else #endif { + log_info("Main loop: got shutdown signal."); + if (!guardian_thread.is_stopped()) { guardian_thread.request_shutdown(); @@ -349,6 +368,8 @@ void manager() } } + log_info("Main loop: finished."); + err: /* delete the pid file */ my_delete(Options::Main::pid_file_name, MYF(0)); diff --git a/server-tools/instance-manager/mysql_connection.cc b/server-tools/instance-manager/mysql_connection.cc index 17cda3af704..1c49d3cd83b 100644 --- a/server-tools/instance-manager/mysql_connection.cc +++ b/server-tools/instance-manager/mysql_connection.cc @@ -97,7 +97,7 @@ Mysql_connection_thread::Mysql_connection_thread( args.user_map, args.connection_id, args.instance_map) - ,thread_info(pthread_self()) + ,thread_info(pthread_self(), TRUE) { thread_registry.register_thread(&thread_info); } @@ -165,7 +165,7 @@ Mysql_connection_thread::~Mysql_connection_thread() void Mysql_connection_thread::run() { - log_info("accepted mysql connection %d", connection_id); + log_info("accepted mysql connection %d", (int) connection_id); my_thread_init(); @@ -175,7 +175,7 @@ void Mysql_connection_thread::run() return; } - log_info("connection %d is checked successfully", connection_id); + log_info("connection %d is checked successfully", (int) connection_id); vio_keepalive(vio, TRUE); @@ -315,7 +315,7 @@ int Mysql_connection_thread::do_command() enum enum_server_command command= (enum enum_server_command) (uchar) *packet; log_info("connection %d: packet_length=%d, command=%d", - connection_id, packet_length, command); + (int) connection_id, (int) packet_length, (int) command); return dispatch_command(command, packet + 1, packet_length - 1); } } @@ -325,27 +325,33 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, { switch (command) { case COM_QUIT: // client exit - log_info("query for connection %d received quit command", connection_id); + log_info("query for connection %d received quit command", + (int) connection_id); return 1; case COM_PING: - log_info("query for connection %d received ping command", connection_id); + log_info("query for connection %d received ping command", + (int) connection_id); net_send_ok(&net, connection_id, NULL); break; case COM_QUERY: { log_info("query for connection %d : ----\n%s\n-------------------------", - connection_id,packet); + (int) connection_id, + (const char *) packet); if (Command *command= parse_command(&instance_map, packet)) { int res= 0; - log_info("query for connection %d successfully parsed",connection_id); + log_info("query for connection %d successfully parsed", + (int) connection_id); res= command->execute(&net, connection_id); delete command; if (!res) - log_info("query for connection %d executed ok",connection_id); + log_info("query for connection %d executed ok", + (int) connection_id); else { - log_info("query for connection %d executed err=%d",connection_id,res); + log_info("query for connection %d executed err=%d", + (int) connection_id, (int) res); net_send_error(&net, res); return 0; } @@ -358,7 +364,8 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, break; } default: - log_info("query for connection %d received unknown command",connection_id); + log_info("query for connection %d received unknown command", + (int) connection_id); net_send_error(&net, ER_UNKNOWN_COM_ERROR); break; } diff --git a/server-tools/instance-manager/thread_registry.cc b/server-tools/instance-manager/thread_registry.cc index a424860548d..10370e0981e 100644 --- a/server-tools/instance-manager/thread_registry.cc +++ b/server-tools/instance-manager/thread_registry.cc @@ -43,8 +43,10 @@ static void handle_signal(int __attribute__((unused)) sig_no) */ Thread_info::Thread_info() {} -Thread_info::Thread_info(pthread_t thread_id_arg) : - thread_id(thread_id_arg) {} +Thread_info::Thread_info(pthread_t thread_id_arg, + bool send_signal_on_shutdown_arg) : + thread_id(thread_id_arg), + send_signal_on_shutdown(send_signal_on_shutdown_arg) {} /* TODO: think about moving signal information (now it's shutdown_in_progress) @@ -86,6 +88,9 @@ Thread_registry::~Thread_registry() void Thread_registry::register_thread(Thread_info *info) { + log_info("Thread_registry: registering thread %d...", + (int) info->thread_id); + #ifndef __WIN__ struct sigaction sa; sa.sa_handler= handle_signal; @@ -112,11 +117,19 @@ void Thread_registry::register_thread(Thread_info *info) void Thread_registry::unregister_thread(Thread_info *info) { + log_info("Thread_registry: unregistering thread %d...", + (int) info->thread_id); + pthread_mutex_lock(&LOCK_thread_registry); info->prev->next= info->next; info->next->prev= info->prev; + if (head.next == &head) + { + log_info("Thread_registry: thread registry is empty!"); pthread_cond_signal(&COND_thread_registry_is_empty); + } + pthread_mutex_unlock(&LOCK_thread_registry); } @@ -181,11 +194,6 @@ int Thread_registry::cond_timedwait(Thread_info *info, pthread_cond_t *cond, void Thread_registry::deliver_shutdown() { - Thread_info *info; - struct timespec shutdown_time; - int error; - set_timespec(shutdown_time, 1); - pthread_mutex_lock(&LOCK_thread_registry); shutdown_in_progress= TRUE; @@ -199,29 +207,14 @@ void Thread_registry::deliver_shutdown() process_alarm(THR_SERVER_ALARM); #endif - for (info= head.next; info != &head; info= info->next) - { - pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL); - /* - sic: race condition here, the thread may not yet fall into - pthread_cond_wait. - */ - if (info->current_cond) - pthread_cond_signal(info->current_cond); - } /* - The common practice is to test predicate before pthread_cond_wait. - I don't do that here because the predicate is practically always false - before wait - is_shutdown's been just set, and the lock's still not - released - the only case when the predicate is false is when no other - threads exist. + sic: race condition here, the thread may not yet fall into + pthread_cond_wait. */ - while (((error= pthread_cond_timedwait(&COND_thread_registry_is_empty, - &LOCK_thread_registry, - &shutdown_time)) != ETIMEDOUT && - error != ETIME) && - head.next != &head) - ; + + interrupt_threads(); + + wait_for_threads_to_unregister(); /* If previous signals did not reach some threads, they must be sleeping @@ -230,11 +223,28 @@ void Thread_registry::deliver_shutdown() so this time everybody should be informed (presumably each worker can get CPU during shutdown_time.) */ - for (info= head.next; info != &head; info= info->next) + + interrupt_threads(); + + /* Get the last chance to threads to stop. */ + + wait_for_threads_to_unregister(); + + /* + Print out threads, that didn't stopped. Thread_registry destructor will + probably abort the program if there is still any alive thread. + */ + + if (head.next != &head) { - pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL); - if (info->current_cond) - pthread_cond_signal(info->current_cond); + log_info("Thread_registry: non-stopped threads:"); + + for (Thread_info *info= head.next; info != &head; info= info->next) + log_info(" - %ld", (long int) info->thread_id); + } + else + { + log_info("Thread_registry: all threads stopped."); } pthread_mutex_unlock(&LOCK_thread_registry); @@ -245,3 +255,46 @@ void Thread_registry::request_shutdown() { pthread_kill(sigwait_thread_pid, SIGTERM); } + + +void Thread_registry::interrupt_threads() +{ + for (Thread_info *info= head.next; info != &head; info= info->next) + { + if (!info->send_signal_on_shutdown) + continue; + + pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL); + if (info->current_cond) + pthread_cond_signal(info->current_cond); + } +} + + +void Thread_registry::wait_for_threads_to_unregister() +{ + struct timespec shutdown_time; + + set_timespec(shutdown_time, 1); + + log_info("Thread_registry: joining threads..."); + + while (true) + { + if (head.next == &head) + { + log_info("Thread_registry: emptied."); + return; + } + + int error= pthread_cond_timedwait(&COND_thread_registry_is_empty, + &LOCK_thread_registry, + &shutdown_time); + + if (error == ETIMEDOUT || error == ETIME) + { + log_info("Thread_registry: threads shutdown timed out."); + return; + } + } +} diff --git a/server-tools/instance-manager/thread_registry.h b/server-tools/instance-manager/thread_registry.h index 6dc320a8533..503d24e5fb0 100644 --- a/server-tools/instance-manager/thread_registry.h +++ b/server-tools/instance-manager/thread_registry.h @@ -67,13 +67,17 @@ class Thread_info { public: - Thread_info(); - Thread_info(pthread_t thread_id_arg); + Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg); friend class Thread_registry; + +private: + Thread_info(); + private: pthread_cond_t *current_cond; Thread_info *prev, *next; pthread_t thread_id; + bool send_signal_on_shutdown; }; @@ -98,6 +102,10 @@ public: int cond_timedwait(Thread_info *info, pthread_cond_t *cond, pthread_mutex_t *mutex, struct timespec *wait_time); private: + void interrupt_threads(); + void wait_for_threads_to_unregister(); + +private: Thread_info head; bool shutdown_in_progress; pthread_mutex_t LOCK_thread_registry; |