summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server-tools/instance-manager/guardian.cc8
-rw-r--r--server-tools/instance-manager/instance.cc43
-rw-r--r--server-tools/instance-manager/instance.h4
-rw-r--r--server-tools/instance-manager/instance_map.cc10
-rw-r--r--server-tools/instance-manager/instance_map.h6
-rw-r--r--server-tools/instance-manager/listener.cc22
-rw-r--r--server-tools/instance-manager/manager.cc23
-rw-r--r--server-tools/instance-manager/mysql_connection.cc29
-rw-r--r--server-tools/instance-manager/thread_registry.cc117
-rw-r--r--server-tools/instance-manager/thread_registry.h12
10 files changed, 203 insertions, 71 deletions
diff --git a/server-tools/instance-manager/guardian.cc b/server-tools/instance-manager/guardian.cc
index af57f1decbc..03bfadd8571 100644
--- a/server-tools/instance-manager/guardian.cc
+++ b/server-tools/instance-manager/guardian.cc
@@ -74,7 +74,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg,
uint monitoring_interval_arg) :
Guardian_thread_args(thread_registry_arg, instance_map_arg,
monitoring_interval_arg),
- thread_info(pthread_self()), guarded_instances(0)
+ thread_info(pthread_self(), TRUE), guarded_instances(0)
{
pthread_mutex_init(&LOCK_guardian, 0);
pthread_cond_init(&COND_guardian, 0);
@@ -250,6 +250,8 @@ void Guardian_thread::run()
LIST *node;
struct timespec timeout;
+ log_info("Guardian: started.");
+
thread_registry.register_thread(&thread_info);
my_thread_init();
@@ -277,12 +279,16 @@ void Guardian_thread::run()
&LOCK_guardian, &timeout);
}
+ log_info("Guardian: stopped.");
+
stopped= TRUE;
pthread_mutex_unlock(&LOCK_guardian);
/* now, when the Guardian is stopped we can stop the IM */
thread_registry.unregister_thread(&thread_info);
thread_registry.request_shutdown();
my_thread_end();
+
+ log_info("Guardian: finished.");
}
diff --git a/server-tools/instance-manager/instance.cc b/server-tools/instance-manager/instance.cc
index 1dfe6167020..3927363a3e5 100644
--- a/server-tools/instance-manager/instance.cc
+++ b/server-tools/instance-manager/instance.cc
@@ -34,6 +34,7 @@
#include "mysql_manager_error.h"
#include "portability.h"
#include "priv.h"
+#include "thread_registry.h"
const LEX_STRING
@@ -44,7 +45,8 @@ static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
static void start_and_monitor_instance(Instance_options *old_instance_options,
- Instance_map *instance_map);
+ Instance_map *instance_map,
+ Thread_registry *thread_registry);
#ifndef __WIN__
typedef pid_t My_process_info;
@@ -63,7 +65,8 @@ pthread_handler_t proxy(void *arg)
{
Instance *instance= (Instance *) arg;
start_and_monitor_instance(&instance->options,
- instance->get_map());
+ instance->get_map(),
+ &instance->thread_registry);
return 0;
}
@@ -99,6 +102,7 @@ static int wait_process(My_process_info *pi)
thread, but we don't know this one). Or we could use waitpid(), but
couldn't use wait(), because it could return in any wait() in the program.
*/
+
if (linuxthreads)
wait(NULL); /* LinuxThreads were detected */
else
@@ -239,11 +243,28 @@ static int start_process(Instance_options *instance_options,
*/
static void start_and_monitor_instance(Instance_options *old_instance_options,
- Instance_map *instance_map)
+ Instance_map *instance_map,
+ Thread_registry *thread_registry)
{
Instance_name instance_name(&old_instance_options->instance_name);
Instance *current_instance;
My_process_info process_info;
+ Thread_info thread_info(pthread_self(), FALSE);
+
+ log_info("Monitoring thread (instance: '%s'): started.",
+ (const char *) instance_name.get_c_str());
+
+ if (!old_instance_options->nonguarded)
+ {
+ /*
+ Register thread in Thread_registry to wait for it to stop on shutdown
+ only if instance is nuarded. If instance is guarded, the thread will not
+ finish, because nonguarded instances are not stopped on shutdown.
+ */
+
+ thread_registry->register_thread(&thread_info);
+ my_thread_init();
+ }
/*
Lock instance map to guarantee that no instances are deleted during
@@ -280,7 +301,14 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
instance_map->unlock();
- return;
+ if (!old_instance_options->nonguarded)
+ {
+ thread_registry->unregister_thread(&thread_info);
+ my_thread_end();
+ }
+
+ log_info("Monitoring thread (instance: '%s'): finished.",
+ (const char *) instance_name.get_c_str());
}
@@ -343,10 +371,6 @@ int Instance::start()
{
remove_pid();
- /*
- No need to monitor this thread in the Thread_registry, as all
- instances are to be stopped during shutdown.
- */
pthread_t proxy_thd_id;
pthread_attr_t proxy_thd_attr;
int rc;
@@ -404,7 +428,8 @@ void Instance::set_crash_flag_n_wake_all()
-Instance::Instance(): crashed(FALSE), configured(FALSE)
+Instance::Instance(Thread_registry &thread_registry_arg):
+ crashed(FALSE), configured(FALSE), thread_registry(thread_registry_arg)
{
pthread_mutex_init(&LOCK_instance, 0);
pthread_cond_init(&COND_instance_stopped, 0);
diff --git a/server-tools/instance-manager/instance.h b/server-tools/instance-manager/instance.h
index 1f06cabebf7..329eaa68b1a 100644
--- a/server-tools/instance-manager/instance.h
+++ b/server-tools/instance-manager/instance.h
@@ -27,6 +27,7 @@
#endif
class Instance_map;
+class Thread_registry;
/*
@@ -87,7 +88,7 @@ public:
static bool is_mysqld_compatible_name(const LEX_STRING *name);
public:
- Instance();
+ Instance(Thread_registry &thread_registry_arg);
~Instance();
int init(const LEX_STRING *name_arg);
@@ -120,6 +121,7 @@ public:
public:
enum { DEFAULT_SHUTDOWN_DELAY= 35 };
Instance_options options;
+ Thread_registry &thread_registry;
private:
/* This attributes is a flag, specifies if the instance has been crashed. */
diff --git a/server-tools/instance-manager/instance_map.cc b/server-tools/instance-manager/instance_map.cc
index b7704c027f1..2f830e616c4 100644
--- a/server-tools/instance-manager/instance_map.cc
+++ b/server-tools/instance-manager/instance_map.cc
@@ -169,7 +169,7 @@ int Instance_map::process_one_option(const LEX_STRING *group,
if (!(instance= (Instance *) hash_search(&hash, (byte *) group->str,
group->length)))
{
- if (!(instance= new Instance()))
+ if (!(instance= new Instance(thread_registry)))
return 1;
if (instance->init(group) || add_instance(instance))
@@ -213,8 +213,10 @@ int Instance_map::process_one_option(const LEX_STRING *group,
}
-Instance_map::Instance_map(const char *default_mysqld_path_arg):
-mysqld_path(default_mysqld_path_arg)
+Instance_map::Instance_map(const char *default_mysqld_path_arg,
+ Thread_registry &thread_registry_arg):
+ mysqld_path(default_mysqld_path_arg),
+ thread_registry(thread_registry_arg)
{
pthread_mutex_init(&LOCK_instance_map, 0);
}
@@ -333,7 +335,7 @@ int Instance_map::remove_instance(Instance *instance)
int Instance_map::create_instance(const LEX_STRING *instance_name,
const Named_value_arr *options)
{
- Instance *instance= new Instance();
+ Instance *instance= new Instance(thread_registry);
if (!instance)
{
diff --git a/server-tools/instance-manager/instance_map.h b/server-tools/instance-manager/instance_map.h
index 8e6d2360652..9de40e35e0f 100644
--- a/server-tools/instance-manager/instance_map.h
+++ b/server-tools/instance-manager/instance_map.h
@@ -28,6 +28,7 @@
class Guardian_thread;
class Instance;
class Named_value_arr;
+class Thread_registry;
extern int load_all_groups(char ***groups, const char *filename);
extern void free_groups(char **groups);
@@ -104,7 +105,8 @@ public:
int create_instance(const LEX_STRING *instance_name,
const Named_value_arr *options);
- Instance_map(const char *default_mysqld_path_arg);
+ Instance_map(const char *default_mysqld_path_arg,
+ Thread_registry &thread_registry_arg);
~Instance_map();
/*
@@ -130,6 +132,8 @@ private:
enum { START_HASH_SIZE = 16 };
pthread_mutex_t LOCK_instance_map;
HASH hash;
+
+ Thread_registry &thread_registry;
};
#endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */
diff --git a/server-tools/instance-manager/listener.cc b/server-tools/instance-manager/listener.cc
index 0ab85c0e7aa..62962c00957 100644
--- a/server-tools/instance-manager/listener.cc
+++ b/server-tools/instance-manager/listener.cc
@@ -87,7 +87,7 @@ private:
Listener_thread::Listener_thread(const Listener_thread_args &args) :
Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
,total_connection_count(0)
- ,thread_info(pthread_self())
+ ,thread_info(pthread_self(), TRUE)
,num_sockets(0)
{
}
@@ -112,6 +112,8 @@ void Listener_thread::run()
{
int i, n= 0;
+ log_info("Listener_thread: started.");
+
#ifndef __WIN__
/* we use this var to check whether we are running on LinuxThreads */
pid_t thread_pid;
@@ -164,7 +166,7 @@ void Listener_thread::run()
if (rc == 0 || rc == -1)
{
if (rc == -1 && errno != EINTR)
- log_error("Listener_thread::run(): select() failed, %s",
+ log_error("Listener_thread: select() failed, %s",
strerror(errno));
continue;
}
@@ -198,7 +200,7 @@ void Listener_thread::run()
/* III. Release all resources and exit */
- log_info("Listener_thread::run(): shutdown requested, exiting...");
+ log_info("Listener_thread: shutdown requested, exiting...");
for (i= 0; i < num_sockets; i++)
close(sockets[i]);
@@ -209,6 +211,8 @@ void Listener_thread::run()
thread_registry.unregister_thread(&thread_info);
my_thread_end();
+
+ log_info("Listener_thread: finished.");
return;
err:
@@ -230,7 +234,7 @@ int Listener_thread::create_tcp_socket()
int ip_socket= socket(AF_INET, SOCK_STREAM, 0);
if (ip_socket == INVALID_SOCKET)
{
- log_error("Listener_thead::run(): socket(AF_INET) failed, %s",
+ log_error("Listener_thead: socket(AF_INET) failed, %s",
strerror(errno));
return -1;
}
@@ -261,7 +265,7 @@ int Listener_thread::create_tcp_socket()
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
sizeof(ip_socket_address)))
{
- log_error("Listener_thread::run(): bind(ip socket) failed, '%s'",
+ log_error("Listener_thread: bind(ip socket) failed, '%s'",
strerror(errno));
close(ip_socket);
return -1;
@@ -269,7 +273,7 @@ int Listener_thread::create_tcp_socket()
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
{
- log_error("Listener_thread::run(): listen(ip socket) failed, %s",
+ log_error("Listener_thread: listen(ip socket) failed, %s",
strerror(errno));
close(ip_socket);
return -1;
@@ -294,7 +298,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
if (unix_socket == INVALID_SOCKET)
{
- log_error("Listener_thead::run(): socket(AF_UNIX) failed, %s",
+ log_error("Listener_thead: socket(AF_UNIX) failed, %s",
strerror(errno));
return -1;
}
@@ -314,7 +318,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
sizeof(unix_socket_address)))
{
- log_error("Listener_thread::run(): bind(unix socket) failed, "
+ log_error("Listener_thread: bind(unix socket) failed, "
"socket file name is '%s', error '%s'",
unix_socket_address.sun_path, strerror(errno));
close(unix_socket);
@@ -325,7 +329,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
{
- log_error("Listener_thread::run(): listen(unix socket) failed, %s",
+ log_error("Listener_thread: listen(unix socket) failed, %s",
strerror(errno));
close(unix_socket);
return -1;
diff --git a/server-tools/instance-manager/manager.cc b/server-tools/instance-manager/manager.cc
index 3fb967fc352..4bd298eedec 100644
--- a/server-tools/instance-manager/manager.cc
+++ b/server-tools/instance-manager/manager.cc
@@ -156,7 +156,8 @@ void manager()
*/
User_map user_map;
- Instance_map instance_map(Options::Main::default_mysqld_path);
+ Instance_map instance_map(Options::Main::default_mysqld_path,
+ thread_registry);
Guardian_thread guardian_thread(thread_registry,
&instance_map,
Options::Main::monitoring_interval);
@@ -308,6 +309,8 @@ void manager()
*/
pthread_cond_signal(&guardian_thread.COND_guardian);
+ log_info("Main loop: started.");
+
while (!shutdown_complete)
{
int signo;
@@ -320,6 +323,20 @@ void manager()
goto err;
}
+ /*
+ The general idea in this loop is the following:
+ - we are waiting for SIGINT, SIGTERM -- signals that mean we should
+ shutdown;
+ - as shutdown signal is caught, we stop Guardian thread (by calling
+ Guardian_thread::request_shutdown());
+ - as Guardian_thread is stopped, it sends SIGTERM to this thread
+ (by calling Thread_registry::request_shutdown()), so that the
+ my_sigwait() above returns;
+ - as we catch the second SIGTERM, we send signals to all threads
+ registered in Thread_registry (by calling
+ Thread_registry::deliver_shutdown()) and waiting for threads to stop;
+ */
+
#ifndef __WIN__
/*
On some Darwin kernels SIGHUP is delivered along with most
@@ -336,6 +353,8 @@ void manager()
else
#endif
{
+ log_info("Main loop: got shutdown signal.");
+
if (!guardian_thread.is_stopped())
{
guardian_thread.request_shutdown();
@@ -349,6 +368,8 @@ void manager()
}
}
+ log_info("Main loop: finished.");
+
err:
/* delete the pid file */
my_delete(Options::Main::pid_file_name, MYF(0));
diff --git a/server-tools/instance-manager/mysql_connection.cc b/server-tools/instance-manager/mysql_connection.cc
index 17cda3af704..1c49d3cd83b 100644
--- a/server-tools/instance-manager/mysql_connection.cc
+++ b/server-tools/instance-manager/mysql_connection.cc
@@ -97,7 +97,7 @@ Mysql_connection_thread::Mysql_connection_thread(
args.user_map,
args.connection_id,
args.instance_map)
- ,thread_info(pthread_self())
+ ,thread_info(pthread_self(), TRUE)
{
thread_registry.register_thread(&thread_info);
}
@@ -165,7 +165,7 @@ Mysql_connection_thread::~Mysql_connection_thread()
void Mysql_connection_thread::run()
{
- log_info("accepted mysql connection %d", connection_id);
+ log_info("accepted mysql connection %d", (int) connection_id);
my_thread_init();
@@ -175,7 +175,7 @@ void Mysql_connection_thread::run()
return;
}
- log_info("connection %d is checked successfully", connection_id);
+ log_info("connection %d is checked successfully", (int) connection_id);
vio_keepalive(vio, TRUE);
@@ -315,7 +315,7 @@ int Mysql_connection_thread::do_command()
enum enum_server_command command= (enum enum_server_command)
(uchar) *packet;
log_info("connection %d: packet_length=%d, command=%d",
- connection_id, packet_length, command);
+ (int) connection_id, (int) packet_length, (int) command);
return dispatch_command(command, packet + 1, packet_length - 1);
}
}
@@ -325,27 +325,33 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
{
switch (command) {
case COM_QUIT: // client exit
- log_info("query for connection %d received quit command", connection_id);
+ log_info("query for connection %d received quit command",
+ (int) connection_id);
return 1;
case COM_PING:
- log_info("query for connection %d received ping command", connection_id);
+ log_info("query for connection %d received ping command",
+ (int) connection_id);
net_send_ok(&net, connection_id, NULL);
break;
case COM_QUERY:
{
log_info("query for connection %d : ----\n%s\n-------------------------",
- connection_id,packet);
+ (int) connection_id,
+ (const char *) packet);
if (Command *command= parse_command(&instance_map, packet))
{
int res= 0;
- log_info("query for connection %d successfully parsed",connection_id);
+ log_info("query for connection %d successfully parsed",
+ (int) connection_id);
res= command->execute(&net, connection_id);
delete command;
if (!res)
- log_info("query for connection %d executed ok",connection_id);
+ log_info("query for connection %d executed ok",
+ (int) connection_id);
else
{
- log_info("query for connection %d executed err=%d",connection_id,res);
+ log_info("query for connection %d executed err=%d",
+ (int) connection_id, (int) res);
net_send_error(&net, res);
return 0;
}
@@ -358,7 +364,8 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
break;
}
default:
- log_info("query for connection %d received unknown command",connection_id);
+ log_info("query for connection %d received unknown command",
+ (int) connection_id);
net_send_error(&net, ER_UNKNOWN_COM_ERROR);
break;
}
diff --git a/server-tools/instance-manager/thread_registry.cc b/server-tools/instance-manager/thread_registry.cc
index a424860548d..10370e0981e 100644
--- a/server-tools/instance-manager/thread_registry.cc
+++ b/server-tools/instance-manager/thread_registry.cc
@@ -43,8 +43,10 @@ static void handle_signal(int __attribute__((unused)) sig_no)
*/
Thread_info::Thread_info() {}
-Thread_info::Thread_info(pthread_t thread_id_arg) :
- thread_id(thread_id_arg) {}
+Thread_info::Thread_info(pthread_t thread_id_arg,
+ bool send_signal_on_shutdown_arg) :
+ thread_id(thread_id_arg),
+ send_signal_on_shutdown(send_signal_on_shutdown_arg) {}
/*
TODO: think about moving signal information (now it's shutdown_in_progress)
@@ -86,6 +88,9 @@ Thread_registry::~Thread_registry()
void Thread_registry::register_thread(Thread_info *info)
{
+ log_info("Thread_registry: registering thread %d...",
+ (int) info->thread_id);
+
#ifndef __WIN__
struct sigaction sa;
sa.sa_handler= handle_signal;
@@ -112,11 +117,19 @@ void Thread_registry::register_thread(Thread_info *info)
void Thread_registry::unregister_thread(Thread_info *info)
{
+ log_info("Thread_registry: unregistering thread %d...",
+ (int) info->thread_id);
+
pthread_mutex_lock(&LOCK_thread_registry);
info->prev->next= info->next;
info->next->prev= info->prev;
+
if (head.next == &head)
+ {
+ log_info("Thread_registry: thread registry is empty!");
pthread_cond_signal(&COND_thread_registry_is_empty);
+ }
+
pthread_mutex_unlock(&LOCK_thread_registry);
}
@@ -181,11 +194,6 @@ int Thread_registry::cond_timedwait(Thread_info *info, pthread_cond_t *cond,
void Thread_registry::deliver_shutdown()
{
- Thread_info *info;
- struct timespec shutdown_time;
- int error;
- set_timespec(shutdown_time, 1);
-
pthread_mutex_lock(&LOCK_thread_registry);
shutdown_in_progress= TRUE;
@@ -199,29 +207,14 @@ void Thread_registry::deliver_shutdown()
process_alarm(THR_SERVER_ALARM);
#endif
- for (info= head.next; info != &head; info= info->next)
- {
- pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
- /*
- sic: race condition here, the thread may not yet fall into
- pthread_cond_wait.
- */
- if (info->current_cond)
- pthread_cond_signal(info->current_cond);
- }
/*
- The common practice is to test predicate before pthread_cond_wait.
- I don't do that here because the predicate is practically always false
- before wait - is_shutdown's been just set, and the lock's still not
- released - the only case when the predicate is false is when no other
- threads exist.
+ sic: race condition here, the thread may not yet fall into
+ pthread_cond_wait.
*/
- while (((error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
- &LOCK_thread_registry,
- &shutdown_time)) != ETIMEDOUT &&
- error != ETIME) &&
- head.next != &head)
- ;
+
+ interrupt_threads();
+
+ wait_for_threads_to_unregister();
/*
If previous signals did not reach some threads, they must be sleeping
@@ -230,11 +223,28 @@ void Thread_registry::deliver_shutdown()
so this time everybody should be informed (presumably each worker can
get CPU during shutdown_time.)
*/
- for (info= head.next; info != &head; info= info->next)
+
+ interrupt_threads();
+
+ /* Get the last chance to threads to stop. */
+
+ wait_for_threads_to_unregister();
+
+ /*
+ Print out threads, that didn't stopped. Thread_registry destructor will
+ probably abort the program if there is still any alive thread.
+ */
+
+ if (head.next != &head)
{
- pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
- if (info->current_cond)
- pthread_cond_signal(info->current_cond);
+ log_info("Thread_registry: non-stopped threads:");
+
+ for (Thread_info *info= head.next; info != &head; info= info->next)
+ log_info(" - %ld", (long int) info->thread_id);
+ }
+ else
+ {
+ log_info("Thread_registry: all threads stopped.");
}
pthread_mutex_unlock(&LOCK_thread_registry);
@@ -245,3 +255,46 @@ void Thread_registry::request_shutdown()
{
pthread_kill(sigwait_thread_pid, SIGTERM);
}
+
+
+void Thread_registry::interrupt_threads()
+{
+ for (Thread_info *info= head.next; info != &head; info= info->next)
+ {
+ if (!info->send_signal_on_shutdown)
+ continue;
+
+ pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
+ if (info->current_cond)
+ pthread_cond_signal(info->current_cond);
+ }
+}
+
+
+void Thread_registry::wait_for_threads_to_unregister()
+{
+ struct timespec shutdown_time;
+
+ set_timespec(shutdown_time, 1);
+
+ log_info("Thread_registry: joining threads...");
+
+ while (true)
+ {
+ if (head.next == &head)
+ {
+ log_info("Thread_registry: emptied.");
+ return;
+ }
+
+ int error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
+ &LOCK_thread_registry,
+ &shutdown_time);
+
+ if (error == ETIMEDOUT || error == ETIME)
+ {
+ log_info("Thread_registry: threads shutdown timed out.");
+ return;
+ }
+ }
+}
diff --git a/server-tools/instance-manager/thread_registry.h b/server-tools/instance-manager/thread_registry.h
index 6dc320a8533..503d24e5fb0 100644
--- a/server-tools/instance-manager/thread_registry.h
+++ b/server-tools/instance-manager/thread_registry.h
@@ -67,13 +67,17 @@
class Thread_info
{
public:
- Thread_info();
- Thread_info(pthread_t thread_id_arg);
+ Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg);
friend class Thread_registry;
+
+private:
+ Thread_info();
+
private:
pthread_cond_t *current_cond;
Thread_info *prev, *next;
pthread_t thread_id;
+ bool send_signal_on_shutdown;
};
@@ -98,6 +102,10 @@ public:
int cond_timedwait(Thread_info *info, pthread_cond_t *cond,
pthread_mutex_t *mutex, struct timespec *wait_time);
private:
+ void interrupt_threads();
+ void wait_for_threads_to_unregister();
+
+private:
Thread_info head;
bool shutdown_in_progress;
pthread_mutex_t LOCK_thread_registry;