summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <kostja@bodhi.local>2006-11-17 16:11:04 +0300
committerunknown <kostja@bodhi.local>2006-11-17 16:11:04 +0300
commita163ae30f23239e1f7a9efafc620e1bc74e009ba (patch)
tree5ecdad467c796454f72a94e294ff4958563441fb
parent211b2bc92a25609e0c7323b5fee6f646abb3c748 (diff)
downloadmariadb-git-a163ae30f23239e1f7a9efafc620e1bc74e009ba.tar.gz
Replace the approach using Foo_thread_args + Foo_thread and manually
spawned threads with a reusable class Thread. This is the second idea implemented in the Alik's patch for BUG#22306: STOP INSTANCE can not be applied for instances in Crashed, Failed and Abandoned. Commiting separately to ease review process. server-tools/instance-manager/commands.cc: Remove an unused header. server-tools/instance-manager/guardian.cc: Use Thread framework instead of manually spawning the Guardian thread. Tidy up. server-tools/instance-manager/guardian.h: Use Thread framework instead of manually spawning the Guardian thread. server-tools/instance-manager/instance.cc: Use Thread framework instead of manually spawning the instance monitoring thread. server-tools/instance-manager/listener.cc: Use Thread framework instead of manually spawning the mysql connection thread. server-tools/instance-manager/listener.h: Use Thread framework instead of manually spawning the mysql connection thread. Rename Listener_thread to Listener for brevity. server-tools/instance-manager/manager.cc: Change references to pointers, as per the coding style. Use Thread framework instead of manually spawning threads. server-tools/instance-manager/mysql_connection.cc: Get rid of Mysql_connection_thread_args. Use class Thread framework instead. Rename Mysql_connection_thread to Mysql_connection for brevity. server-tools/instance-manager/mysql_connection.h: Get rid of Mysql_connection_thread_args. Use class Thread framework instead. Rename Mysql_connection_thread to Mysql_connection for brevity. server-tools/instance-manager/priv.cc: Move set_stacksize_and_create_thread to thread_registry.cc and make it static: it is not used anywhere else now. server-tools/instance-manager/priv.h: No public set_stacksize_n_create_thread server-tools/instance-manager/thread_registry.cc: Implement a base Thread class to be used for all Instance Manager threads. server-tools/instance-manager/thread_registry.h: Implement a base Thread class to be used for all Instance Manager threads.
-rw-r--r--server-tools/instance-manager/commands.cc1
-rw-r--r--server-tools/instance-manager/guardian.cc63
-rw-r--r--server-tools/instance-manager/guardian.h43
-rw-r--r--server-tools/instance-manager/instance.cc60
-rw-r--r--server-tools/instance-manager/listener.cc128
-rw-r--r--server-tools/instance-manager/listener.h42
-rw-r--r--server-tools/instance-manager/manager.cc74
-rw-r--r--server-tools/instance-manager/mysql_connection.cc117
-rw-r--r--server-tools/instance-manager/mysql_connection.h59
-rw-r--r--server-tools/instance-manager/priv.cc38
-rw-r--r--server-tools/instance-manager/priv.h4
-rw-r--r--server-tools/instance-manager/thread_registry.cc96
-rw-r--r--server-tools/instance-manager/thread_registry.h31
13 files changed, 336 insertions, 420 deletions
diff --git a/server-tools/instance-manager/commands.cc b/server-tools/instance-manager/commands.cc
index 50ab15f1ed3..1ecd4b38fcd 100644
--- a/server-tools/instance-manager/commands.cc
+++ b/server-tools/instance-manager/commands.cc
@@ -29,7 +29,6 @@
#include "guardian.h"
#include "instance_map.h"
#include "log.h"
-#include "manager.h"
#include "messages.h"
#include "mysqld_error.h"
#include "mysql_manager_error.h"
diff --git a/server-tools/instance-manager/guardian.cc b/server-tools/instance-manager/guardian.cc
index 71ad0760b06..1cb9aed559b 100644
--- a/server-tools/instance-manager/guardian.cc
+++ b/server-tools/instance-manager/guardian.cc
@@ -20,7 +20,6 @@
#endif
#include "guardian.h"
-
#include <string.h>
#include <sys/types.h>
#include <signal.h>
@@ -30,15 +29,6 @@
#include "log.h"
#include "mysql_manager_error.h"
-
-pthread_handler_t guardian_thread_func(void *arg)
-{
- Guardian *guardian= (Guardian *) arg;
- guardian->run();
- return 0;
-}
-
-
const char *
Guardian::get_instance_state_name(enum_instance_state state)
{
@@ -68,18 +58,19 @@ Guardian::get_instance_state_name(enum_instance_state state)
return NULL; /* just to ignore compiler warning. */
}
+/* {{{ Constructor & destructor. */
-Guardian::Guardian(Thread_registry &thread_registry_arg,
- Instance_map *instance_map_arg,
- uint monitoring_interval_arg) :
- Guardian_args(thread_registry_arg, instance_map_arg,
- monitoring_interval_arg),
- thread_info(pthread_self(), TRUE), guarded_instances(0)
+Guardian::Guardian(Thread_registry *thread_registry_arg,
+ Instance_map *instance_map_arg,
+ uint monitoring_interval_arg)
+ :monitoring_interval(monitoring_interval_arg),
+ shutdown_requested(FALSE),
+ stopped(FALSE),
+ thread_registry(thread_registry_arg),
+ instance_map(instance_map_arg)
{
pthread_mutex_init(&LOCK_guardian, 0);
pthread_cond_init(&COND_guardian, 0);
- shutdown_requested= FALSE;
- stopped= FALSE;
init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0);
}
@@ -94,6 +85,8 @@ Guardian::~Guardian()
pthread_cond_destroy(&COND_guardian);
}
+/* }}} */
+
void Guardian::request_shutdown()
{
@@ -106,9 +99,9 @@ void Guardian::request_shutdown()
void Guardian::process_instance(Instance *instance,
- GUARD_NODE *current_node,
- LIST **guarded_instances,
- LIST *node)
+ GUARD_NODE *current_node,
+ LIST **guarded_instances,
+ LIST *node)
{
uint waitchild= (uint) Instance::DEFAULT_SHUTDOWN_DELAY;
/* The amount of times, Guardian attempts to restart an instance */
@@ -117,7 +110,7 @@ void Guardian::process_instance(Instance *instance,
if (current_node->state == STOPPING)
{
- /* this brach is executed during shutdown */
+ /* this branch is executed during shutdown */
if (instance->options.shutdown_delay)
{
/*
@@ -235,7 +228,7 @@ void Guardian::process_instance(Instance *instance,
/*
Run guardian thread
- SYNOPSYS
+ SYNOPSIS
run()
DESCRIPTION
@@ -252,9 +245,8 @@ void Guardian::run()
log_info("Guardian: started.");
- thread_registry.register_thread(&thread_info);
+ thread_registry->register_thread(&thread_info);
- my_thread_init();
pthread_mutex_lock(&LOCK_guardian);
/* loop, until all instances were shut down at the end */
@@ -275,8 +267,8 @@ void Guardian::run()
/* check the loop predicate before sleeping */
if (!(shutdown_requested && (!(guarded_instances))))
- thread_registry.cond_timedwait(&thread_info, &COND_guardian,
- &LOCK_guardian, &timeout);
+ thread_registry->cond_timedwait(&thread_info, &COND_guardian,
+ &LOCK_guardian, &timeout);
}
log_info("Guardian: stopped.");
@@ -284,9 +276,8 @@ void Guardian::run()
stopped= TRUE;
pthread_mutex_unlock(&LOCK_guardian);
/* now, when the Guardian is stopped we can stop the IM */
- thread_registry.unregister_thread(&thread_info);
- thread_registry.request_shutdown();
- my_thread_end();
+ thread_registry->unregister_thread(&thread_info);
+ thread_registry->request_shutdown();
log_info("Guardian: finished.");
}
@@ -306,7 +297,7 @@ int Guardian::is_stopped()
Initialize the list of guarded instances: loop through the Instance_map and
add all of the instances, which don't have 'nonguarded' option specified.
- SYNOPSYS
+ SYNOPSIS
Guardian::init()
NOTE: The operation should be invoked with the following locks acquired:
@@ -315,7 +306,7 @@ int Guardian::is_stopped()
RETURN
0 - ok
- 1 - error occured
+ 1 - error occurred
*/
int Guardian::init()
@@ -344,7 +335,7 @@ int Guardian::init()
/*
Add instance to the Guardian list
- SYNOPSYS
+ SYNOPSIS
guard()
instance the instance to be guarded
nolock whether we prefer do not lock Guardian here,
@@ -357,7 +348,7 @@ int Guardian::init()
RETURN
0 - ok
- 1 - error occured
+ 1 - error occurred
*/
int Guardian::guard(Instance *instance, bool nolock)
@@ -418,7 +409,7 @@ int Guardian::stop_guard(Instance *instance)
An internal method which is called at shutdown to unregister instances and
attempt to stop them if requested.
- SYNOPSYS
+ SYNOPSIS
stop_instances()
DESCRIPTION
@@ -431,7 +422,7 @@ int Guardian::stop_guard(Instance *instance)
RETURN
0 - ok
- 1 - error occured
+ 1 - error occurred
*/
int Guardian::stop_instances()
diff --git a/server-tools/instance-manager/guardian.h b/server-tools/instance-manager/guardian.h
index 4c518dddf23..0eee1dc631d 100644
--- a/server-tools/instance-manager/guardian.h
+++ b/server-tools/instance-manager/guardian.h
@@ -16,11 +16,10 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include <my_global.h>
-#include <my_sys.h>
-#include <my_list.h>
#include "thread_registry.h"
+#include <my_sys.h>
+#include <my_list.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
@@ -31,30 +30,12 @@ class Instance_map;
class Thread_registry;
struct GUARD_NODE;
-pthread_handler_t guardian_thread_func(void *arg);
-
-struct Guardian_args
-{
- Thread_registry &thread_registry;
- Instance_map *instance_map;
- int monitoring_interval;
-
- Guardian_args(Thread_registry &thread_registry_arg,
- Instance_map *instance_map_arg,
- uint monitoring_interval_arg) :
- thread_registry(thread_registry_arg),
- instance_map(instance_map_arg),
- monitoring_interval(monitoring_interval_arg)
- {}
-};
-
-
-/*
+/**
The guardian thread is responsible for monitoring and restarting of guarded
instances.
*/
-class Guardian: public Guardian_args
+class Guardian: public Thread
{
public:
/* states of an instance */
@@ -82,12 +63,10 @@ public:
/* Return client state name. */
static const char *get_instance_state_name(enum_instance_state state);
- Guardian(Thread_registry &thread_registry_arg,
- Instance_map *instance_map_arg,
- uint monitoring_interval_arg);
- ~Guardian();
- /* Main funtion of the thread */
- void run();
+ Guardian(Thread_registry *thread_registry_arg,
+ Instance_map *instance_map_arg,
+ uint monitoring_interval_arg);
+ virtual ~Guardian();
/* Initialize or refresh the list of guarded instances */
int init();
/* Request guardian shutdown. Stop instances if needed */
@@ -117,6 +96,9 @@ public:
a valid list node.
*/
inline enum_instance_state get_instance_state(LIST *instance_node);
+protected:
+ /* Main funtion of the thread */
+ virtual void run();
public:
pthread_cond_t COND_guardian;
@@ -133,6 +115,9 @@ private:
private:
pthread_mutex_t LOCK_guardian;
Thread_info thread_info;
+ int monitoring_interval;
+ Thread_registry *thread_registry;
+ Instance_map *instance_map;
LIST *guarded_instances;
MEM_ROOT alloc;
/* this variable is set to TRUE when we want to stop Guardian thread */
diff --git a/server-tools/instance-manager/instance.cc b/server-tools/instance-manager/instance.cc
index 3927363a3e5..f170332e132 100644
--- a/server-tools/instance-manager/instance.cc
+++ b/server-tools/instance-manager/instance.cc
@@ -44,9 +44,6 @@ static const char * const INSTANCE_NAME_PREFIX= Instance::DFLT_INSTANCE_NAME.str
static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
-static void start_and_monitor_instance(Instance_options *old_instance_options,
- Instance_map *instance_map,
- Thread_registry *thread_registry);
#ifndef __WIN__
typedef pid_t My_process_info;
@@ -61,13 +58,24 @@ typedef PROCESS_INFORMATION My_process_info;
to do it in a portable way.
*/
-pthread_handler_t proxy(void *arg)
+class Instance_monitor: public Thread
{
- Instance *instance= (Instance *) arg;
- start_and_monitor_instance(&instance->options,
- instance->get_map(),
+public:
+ Instance_monitor(Instance *instance_arg) :instance(instance_arg) {}
+protected:
+ virtual void run();
+ void start_and_monitor_instance(Instance_options *old_instance_options,
+ Instance_map *instance_map,
+ Thread_registry *thread_registry);
+private:
+ Instance *instance;
+};
+
+void Instance_monitor::run()
+{
+ start_and_monitor_instance(&instance->options, instance->get_map(),
&instance->thread_registry);
- return 0;
+ delete this;
}
/*
@@ -242,14 +250,16 @@ static int start_process(Instance_options *instance_options,
Function returns no value
*/
-static void start_and_monitor_instance(Instance_options *old_instance_options,
- Instance_map *instance_map,
- Thread_registry *thread_registry)
+void
+Instance_monitor::
+start_and_monitor_instance(Instance_options *old_instance_options,
+ Instance_map *instance_map,
+ Thread_registry *thread_registry)
{
Instance_name instance_name(&old_instance_options->instance_name);
Instance *current_instance;
My_process_info process_info;
- Thread_info thread_info(pthread_self(), FALSE);
+ Thread_info thread_info;
log_info("Monitoring thread (instance: '%s'): started.",
(const char *) instance_name.get_c_str());
@@ -258,12 +268,10 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
{
/*
Register thread in Thread_registry to wait for it to stop on shutdown
- only if instance is nuarded. If instance is guarded, the thread will not
+ only if instance is guarded. If instance is guarded, the thread will not
finish, because nonguarded instances are not stopped on shutdown.
*/
-
- thread_registry->register_thread(&thread_info);
- my_thread_init();
+ thread_registry->register_thread(&thread_info, FALSE);
}
/*
@@ -302,10 +310,7 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
instance_map->unlock();
if (!old_instance_options->nonguarded)
- {
thread_registry->unregister_thread(&thread_info);
- my_thread_end();
- }
log_info("Monitoring thread (instance: '%s'): finished.",
(const char *) instance_name.get_c_str());
@@ -369,22 +374,19 @@ int Instance::start()
if (configured && !is_running())
{
+ Instance_monitor *instance_monitor;
remove_pid();
- pthread_t proxy_thd_id;
- pthread_attr_t proxy_thd_attr;
- int rc;
+ instance_monitor= new Instance_monitor(this);
- pthread_attr_init(&proxy_thd_attr);
- pthread_attr_setdetachstate(&proxy_thd_attr, PTHREAD_CREATE_DETACHED);
- rc= pthread_create(&proxy_thd_id, &proxy_thd_attr, proxy,
- this);
- pthread_attr_destroy(&proxy_thd_attr);
- if (rc)
+ if (instance_monitor == NULL || instance_monitor->start_detached())
{
- log_error("Instance::start(): pthread_create(proxy) failed");
+ delete instance_monitor;
+ log_error("Instance::start(): failed to create the monitoring thread"
+ " to start an instance");
return ER_CANNOT_START_INSTANCE;
}
+ /* The monitoring thread will delete itself when it's finished. */
return 0;
}
diff --git a/server-tools/instance-manager/listener.cc b/server-tools/instance-manager/listener.cc
index 62962c00957..b749f234560 100644
--- a/server-tools/instance-manager/listener.cc
+++ b/server-tools/instance-manager/listener.cc
@@ -29,7 +29,6 @@
#include <sys/un.h>
#endif
-#include "instance_map.h"
#include "log.h"
#include "mysql_connection.h"
#include "options.h"
@@ -59,47 +58,18 @@ static void set_no_inherit(int socket)
}
-/*
- Listener_thread - incapsulates listening functionality
-*/
-
-class Listener_thread: public Listener_thread_args
-{
-public:
- Listener_thread(const Listener_thread_args &args);
- ~Listener_thread();
- void run();
-private:
- static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
- ulong total_connection_count;
- Thread_info thread_info;
-
- int sockets[2];
- int num_sockets;
- fd_set read_fds;
-private:
- void handle_new_mysql_connection(Vio *vio);
- int create_tcp_socket();
- int create_unix_socket(struct sockaddr_un &unix_socket_address);
-};
-
-
-Listener_thread::Listener_thread(const Listener_thread_args &args) :
- Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
- ,total_connection_count(0)
- ,thread_info(pthread_self(), TRUE)
- ,num_sockets(0)
-{
-}
-
-
-Listener_thread::~Listener_thread()
+Listener::Listener(Thread_registry *thread_registry_arg,
+ User_map *user_map_arg)
+ :thread_registry(thread_registry_arg),
+ user_map(user_map_arg),
+ total_connection_count(0),
+ num_sockets(0)
{
}
/*
- Listener_thread::run() - listen all supported sockets and spawn a thread
+ Listener::run() - listen all supported sockets and spawn a thread
to handle incoming connection.
Using 'die' in case of syscall failure is OK now - we don't hold any
resources and 'die' kills the signal thread automatically. To be rewritten
@@ -108,11 +78,11 @@ Listener_thread::~Listener_thread()
architecture.
*/
-void Listener_thread::run()
+void Listener::run()
{
int i, n= 0;
- log_info("Listener_thread: started.");
+ log_info("Listener: started.");
#ifndef __WIN__
/* we use this var to check whether we are running on LinuxThreads */
@@ -125,9 +95,7 @@ void Listener_thread::run()
linuxthreads= (thread_pid != manager_pid);
#endif
- thread_registry.register_thread(&thread_info);
-
- my_thread_init();
+ thread_registry->register_thread(&thread_info);
FD_ZERO(&read_fds);
@@ -146,7 +114,7 @@ void Listener_thread::run()
n++;
timeval tv;
- while (!thread_registry.is_shutdown())
+ while (!thread_registry->is_shutdown())
{
fd_set read_fds_arg= read_fds;
/*
@@ -166,7 +134,7 @@ void Listener_thread::run()
if (rc == 0 || rc == -1)
{
if (rc == -1 && errno != EINTR)
- log_error("Listener_thread: select() failed, %s",
+ log_error("Listener: select() failed, %s",
strerror(errno));
continue;
}
@@ -200,7 +168,7 @@ void Listener_thread::run()
/* III. Release all resources and exit */
- log_info("Listener_thread: shutdown requested, exiting...");
+ log_info("Listener: shutdown requested, exiting...");
for (i= 0; i < num_sockets; i++)
close(sockets[i]);
@@ -209,10 +177,9 @@ void Listener_thread::run()
unlink(unix_socket_address.sun_path);
#endif
- thread_registry.unregister_thread(&thread_info);
- my_thread_end();
+ thread_registry->unregister_thread(&thread_info);
- log_info("Listener_thread: finished.");
+ log_info("Listener: finished.");
return;
err:
@@ -220,13 +187,12 @@ err:
for (i= 0; i < num_sockets; i++)
close(sockets[i]);
- thread_registry.unregister_thread(&thread_info);
- thread_registry.request_shutdown();
- my_thread_end();
+ thread_registry->unregister_thread(&thread_info);
+ thread_registry->request_shutdown();
return;
}
-int Listener_thread::create_tcp_socket()
+int Listener::create_tcp_socket()
{
/* value to be set by setsockopt */
int arg= 1;
@@ -265,7 +231,7 @@ int Listener_thread::create_tcp_socket()
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
sizeof(ip_socket_address)))
{
- log_error("Listener_thread: bind(ip socket) failed, '%s'",
+ log_error("Listener: bind(ip socket) failed, '%s'",
strerror(errno));
close(ip_socket);
return -1;
@@ -273,7 +239,7 @@ int Listener_thread::create_tcp_socket()
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
{
- log_error("Listener_thread: listen(ip socket) failed, %s",
+ log_error("Listener: listen(ip socket) failed, %s",
strerror(errno));
close(ip_socket);
return -1;
@@ -292,7 +258,7 @@ int Listener_thread::create_tcp_socket()
}
#ifndef __WIN__
-int Listener_thread::
+int Listener::
create_unix_socket(struct sockaddr_un &unix_socket_address)
{
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
@@ -318,7 +284,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
sizeof(unix_socket_address)))
{
- log_error("Listener_thread: bind(unix socket) failed, "
+ log_error("Listener: bind(unix socket) failed, "
"socket file name is '%s', error '%s'",
unix_socket_address.sun_path, strerror(errno));
close(unix_socket);
@@ -329,7 +295,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
{
- log_error("Listener_thread: listen(unix socket) failed, %s",
+ log_error("Listener: listen(unix socket) failed, %s",
strerror(errno));
close(unix_socket);
return -1;
@@ -357,46 +323,16 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
handle_new_mysql_connection()
*/
-void Listener_thread::handle_new_mysql_connection(Vio *vio)
+void Listener::handle_new_mysql_connection(Vio *vio)
{
- if (Mysql_connection_thread_args *mysql_thread_args=
- new Mysql_connection_thread_args(vio, thread_registry, user_map,
- ++total_connection_count,
- instance_map)
- )
+ Mysql_connection *mysql_connection=
+ new Mysql_connection(thread_registry, user_map,
+ vio, ++total_connection_count);
+ if (mysql_connection == NULL || mysql_connection->start_detached())
{
- /*
- Initialize thread attributes to create detached thread; it seems
- easier to do it ad-hoc than have a global variable for attributes.
- */
- pthread_t mysql_thd_id;
- pthread_attr_t mysql_thd_attr;
- pthread_attr_init(&mysql_thd_attr);
- pthread_attr_setdetachstate(&mysql_thd_attr, PTHREAD_CREATE_DETACHED);
- if (set_stacksize_n_create_thread(&mysql_thd_id, &mysql_thd_attr,
- mysql_connection, mysql_thread_args))
- {
- delete mysql_thread_args;
- vio_delete(vio);
- log_error("handle_one_mysql_connection():"
- "set_stacksize_n_create_thread(mysql) failed");
- }
- pthread_attr_destroy(&mysql_thd_attr);
- }
- else
+ log_error("handle_one_mysql_connection() failed");
+ delete mysql_connection;
vio_delete(vio);
+ }
+ /* The connection will delete itself when the thread is finished */
}
-
-
-pthread_handler_t listener(void *arg)
-{
- Listener_thread_args *args= (Listener_thread_args *) arg;
- Listener_thread listener(*args);
- listener.run();
- /*
- args is a stack variable because listener thread lives as long as the
- manager process itself
- */
- return 0;
-}
-
diff --git a/server-tools/instance-manager/listener.h b/server-tools/instance-manager/listener.h
index c28ab0649d7..7758c2dc13d 100644
--- a/server-tools/instance-manager/listener.h
+++ b/server-tools/instance-manager/listener.h
@@ -16,33 +16,39 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include <my_global.h>
-#include <my_pthread.h>
+#include "thread_registry.h"
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
#endif
-
-pthread_handler_t listener(void *arg);
-
class Thread_registry;
class User_map;
-class Instance_map;
-struct Listener_thread_args
+/**
+ Listener - a thread listening on sockets and spawning
+ connection threads.
+*/
+
+class Listener: public Thread
{
- Thread_registry &thread_registry;
- const User_map &user_map;
- Instance_map &instance_map;
-
- Listener_thread_args(Thread_registry &thread_registry_arg,
- const User_map &user_map_arg,
- Instance_map &instance_map_arg) :
- thread_registry(thread_registry_arg)
- ,user_map(user_map_arg)
- ,instance_map(instance_map_arg)
- {}
+public:
+ Listener(Thread_registry *thread_registry_arg, User_map *user_map_arg);
+protected:
+ virtual void run();
+private:
+ Thread_info thread_info;
+ Thread_registry *thread_registry;
+ User_map *user_map;
+ static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
+ ulong total_connection_count;
+
+ int sockets[2];
+ int num_sockets;
+ fd_set read_fds;
+ void handle_new_mysql_connection(struct st_vio *vio);
+ int create_tcp_socket();
+ int create_unix_socket(struct sockaddr_un &unix_socket_address);
};
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H
diff --git a/server-tools/instance-manager/manager.cc b/server-tools/instance-manager/manager.cc
index 55d7137c97a..8c6bffa8d97 100644
--- a/server-tools/instance-manager/manager.cc
+++ b/server-tools/instance-manager/manager.cc
@@ -139,10 +139,10 @@ int Manager::main()
User_map user_map;
Instance_map instance_map(Options::Main::default_mysqld_path,
thread_registry);
- Guardian guardian(thread_registry, &instance_map,
+ Guardian guardian(&thread_registry, &instance_map,
Options::Main::monitoring_interval);
- Listener_thread_args listener_args(thread_registry, user_map, instance_map);
+ Listener listener(&thread_registry, &user_map);
manager_pid= getpid();
p_instance_map= &instance_map;
@@ -212,40 +212,29 @@ int Manager::main()
sigset_t mask;
set_signals(&mask);
- /* create guardian thread */
+ /*
+ Create the guardian thread. The newly started thread will block until
+ we actually load instances.
+
+ NOTE: Guardian should be shutdown first. Only then all other threads
+ can be stopped. This should be done in this order because the guardian
+ is responsible for shutting down all the guarded instances, and this
+ is a long operation.
+
+ NOTE: Guardian uses thr_alarm() when detects the current state of an
+ instance (is_running()), but this does not interfere with
+ flush_instances() call later in the code, because until
+ flush_instances() completes in the main thread, Guardian thread is not
+ permitted to process instances. And before flush_instances() has
+ completed, there are no instances to guard.
+ */
+ if (guardian.start_detached())
{
- pthread_t guardian_thd_id;
- pthread_attr_t guardian_thd_attr;
- int rc;
-
- /*
- NOTE: Guardian should be shutdown first. Only then all other threads
- need to be stopped. This should be done, as guardian is responsible
- for shutting down the instances, and this is a long operation.
-
- NOTE: Guardian uses thr_alarm() when detects current state of
- instances (is_running()), but it is not interfere with
- flush_instances() later in the code, because until flush_instances()
- complete in the main thread, Guardian thread is not permitted to
- process instances. And before flush_instances() there is no instances
- to proceed.
- */
-
- pthread_attr_init(&guardian_thd_attr);
- pthread_attr_setdetachstate(&guardian_thd_attr, PTHREAD_CREATE_DETACHED);
- rc= set_stacksize_n_create_thread(&guardian_thd_id, &guardian_thd_attr,
- guardian_thread_func, &guardian);
- pthread_attr_destroy(&guardian_thd_attr);
- if (rc)
- {
- log_error("manager(): set_stacksize_n_create_thread(guardian) failed");
- goto err;
- }
-
+ log_error("manager(): Failed to create the guardian thread");
+ goto err;
}
/* Load instances. */
-
{
instance_map.guardian->lock();
instance_map.lock();
@@ -265,23 +254,12 @@ int Manager::main()
}
}
- /* create the listener */
+ /* start the listener */
+ if (listener.start_detached())
{
- pthread_t listener_thd_id;
- pthread_attr_t listener_thd_attr;
- int rc;
-
- pthread_attr_init(&listener_thd_attr);
- pthread_attr_setdetachstate(&listener_thd_attr, PTHREAD_CREATE_DETACHED);
- rc= set_stacksize_n_create_thread(&listener_thd_id, &listener_thd_attr,
- listener, &listener_args);
- pthread_attr_destroy(&listener_thd_attr);
- if (rc)
- {
- log_error("manager(): set_stacksize_n_create_thread(listener) failed");
- stop_all(&guardian, &thread_registry);
- goto err;
- }
+ log_error("manager(): set_stacksize_n_create_thread(listener) failed");
+ stop_all(&guardian, &thread_registry);
+ goto err;
}
/*
diff --git a/server-tools/instance-manager/mysql_connection.cc b/server-tools/instance-manager/mysql_connection.cc
index 72081234c94..b6f62d0f6eb 100644
--- a/server-tools/instance-manager/mysql_connection.cc
+++ b/server-tools/instance-manager/mysql_connection.cc
@@ -23,7 +23,6 @@
#include <m_string.h>
#include <m_string.h>
#include <my_global.h>
-#include <mysql_com.h>
#include <mysql.h>
#include <my_sys.h>
#include <violite.h>
@@ -40,66 +39,15 @@
#include "user_map.h"
-Mysql_connection_thread_args::Mysql_connection_thread_args(
- struct st_vio *vio_arg,
- Thread_registry &thread_registry_arg,
- const User_map &user_map_arg,
- ulong connection_id_arg,
- Instance_map &instance_map_arg) :
- vio(vio_arg)
- ,thread_registry(thread_registry_arg)
- ,user_map(user_map_arg)
- ,connection_id(connection_id_arg)
- ,instance_map(instance_map_arg)
- {}
-
-/*
- MySQL connection - handle one connection with mysql command line client
- See also comments in mysqlmanager.cc to picture general Instance Manager
- architecture.
- We use conventional technique to work with classes without exceptions:
- class acquires all vital resource in init(); Thus if init() succeed,
- a user must call cleanup(). All other methods are valid only between
- init() and cleanup().
-*/
-
-class Mysql_connection_thread: public Mysql_connection_thread_args
-{
-public:
- Mysql_connection_thread(const Mysql_connection_thread_args &args);
-
- int init();
- void cleanup();
-
- void run();
-
- ~Mysql_connection_thread();
-private:
- Thread_info thread_info;
- NET net;
- struct rand_struct rand_st;
- char scramble[SCRAMBLE_LENGTH + 1];
- uint status;
- ulong client_capabilities;
-private:
- /* Names are conventionally the same as in mysqld */
- int check_connection();
- int do_command();
- int dispatch_command(enum enum_server_command command,
- const char *text, uint len);
-};
-
-
-Mysql_connection_thread::Mysql_connection_thread(
- const Mysql_connection_thread_args &args) :
- Mysql_connection_thread_args(args.vio,
- args.thread_registry,
- args.user_map,
- args.connection_id,
- args.instance_map)
- ,thread_info(pthread_self(), TRUE)
+Mysql_connection::Mysql_connection(Thread_registry *thread_registry_arg,
+ User_map *user_map_arg,
+ struct st_vio *vio_arg, ulong
+ connection_id_arg)
+ :vio(vio_arg),
+ connection_id(connection_id_arg),
+ thread_registry(thread_registry_arg),
+ user_map(user_map_arg)
{
- thread_registry.register_thread(&thread_info);
}
@@ -129,7 +77,7 @@ C_MODE_END
This function is complementary to cleanup().
*/
-int Mysql_connection_thread::init()
+int Mysql_connection::init()
{
/* Allocate buffers for network I/O */
if (my_net_init(&net, vio))
@@ -145,52 +93,46 @@ int Mysql_connection_thread::init()
create_random_string(scramble, SCRAMBLE_LENGTH, &rand_st);
/* We don't support transactions, every query is atomic */
status= SERVER_STATUS_AUTOCOMMIT;
+ thread_registry->register_thread(&thread_info);
return 0;
}
-void Mysql_connection_thread::cleanup()
+void Mysql_connection::cleanup()
{
net_end(&net);
+ thread_registry->unregister_thread(&thread_info);
}
-Mysql_connection_thread::~Mysql_connection_thread()
+Mysql_connection::~Mysql_connection()
{
/* vio_delete closes the socket if necessary */
vio_delete(vio);
- thread_registry.unregister_thread(&thread_info);
}
-void Mysql_connection_thread::run()
+void Mysql_connection::main()
{
log_info("accepted mysql connection %lu", (unsigned long) connection_id);
- my_thread_init();
-
if (check_connection())
- {
- my_thread_end();
return;
- }
log_info("connection %lu is checked successfully",
(unsigned long) connection_id);
vio_keepalive(vio, TRUE);
- while (!net.error && net.vio && !thread_registry.is_shutdown())
+ while (!net.error && net.vio && !thread_registry->is_shutdown())
{
if (do_command())
break;
}
-
- my_thread_end();
}
-int Mysql_connection_thread::check_connection()
+int Mysql_connection::check_connection()
{
ulong pkt_len=0; // to hold client reply length
@@ -279,7 +221,7 @@ int Mysql_connection_thread::check_connection()
net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1;
}
- if (user_map.authenticate(&user_name, password, scramble))
+ if (user_map->authenticate(&user_name, password, scramble))
{
net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1;
@@ -289,7 +231,7 @@ int Mysql_connection_thread::check_connection()
}
-int Mysql_connection_thread::do_command()
+int Mysql_connection::do_command()
{
char *packet;
ulong packet_length;
@@ -302,7 +244,7 @@ int Mysql_connection_thread::do_command()
/* Check if we can continue without closing the connection */
if (net.error != 3) // what is 3 - find out
return 1;
- if (thread_registry.is_shutdown())
+ if (thread_registry->is_shutdown())
return 1;
net_send_error(&net, net.last_errno);
net.error= 0;
@@ -310,7 +252,7 @@ int Mysql_connection_thread::do_command()
}
else
{
- if (thread_registry.is_shutdown())
+ if (thread_registry->is_shutdown())
return 1;
packet= (char*) net.read_pos;
enum enum_server_command command= (enum enum_server_command)
@@ -321,8 +263,8 @@ int Mysql_connection_thread::do_command()
}
}
-int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
- const char *packet, uint len)
+int Mysql_connection::dispatch_command(enum enum_server_command command,
+ const char *packet, uint len)
{
switch (command) {
case COM_QUIT: // client exit
@@ -374,19 +316,16 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
}
-pthread_handler_t mysql_connection(void *arg)
+void Mysql_connection::run()
{
- Mysql_connection_thread_args *args= (Mysql_connection_thread_args *) arg;
- Mysql_connection_thread mysql_connection_thread(*args);
- delete args;
- if (mysql_connection_thread.init())
- log_info("mysql_connection(): error initializing thread");
+ if (init())
+ log_info("Mysql_connection::run(): error initializing thread");
else
{
- mysql_connection_thread.run();
- mysql_connection_thread.cleanup();
+ main();
+ cleanup();
}
- return 0;
+ delete this;
}
/*
diff --git a/server-tools/instance-manager/mysql_connection.h b/server-tools/instance-manager/mysql_connection.h
index 3496cc05815..aceb2d62736 100644
--- a/server-tools/instance-manager/mysql_connection.h
+++ b/server-tools/instance-manager/mysql_connection.h
@@ -16,33 +16,60 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include <my_global.h>
-#include <my_pthread.h>
+#include "thread_registry.h"
+#include <mysql_com.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
#endif
-pthread_handler_t mysql_connection(void *arg);
-
-class Thread_registry;
-class User_map;
-class Instance_map;
struct st_vio;
+class User_map;
-struct Mysql_connection_thread_args
+/*
+ MySQL connection - handle one connection with mysql command line client
+ See also comments in mysqlmanager.cc to picture general Instance Manager
+ architecture.
+ We use conventional technique to work with classes without exceptions:
+ class acquires all vital resource in init(); Thus if init() succeed,
+ a user must call cleanup(). All other methods are valid only between
+ init() and cleanup().
+*/
+
+class Mysql_connection: public Thread
{
+public:
+ Mysql_connection(Thread_registry *thread_registry_arg,
+ User_map *user_map_arg,
+ struct st_vio *vio_arg,
+ ulong connection_id_arg);
+ virtual ~Mysql_connection();
+
+protected:
+ virtual void run();
+
+private:
struct st_vio *vio;
- Thread_registry &thread_registry;
- const User_map &user_map;
ulong connection_id;
- Instance_map &instance_map;
+ Thread_info thread_info;
+ Thread_registry *thread_registry;
+ User_map *user_map;
+ NET net;
+ struct rand_struct rand_st;
+ char scramble[SCRAMBLE_LENGTH + 1];
+ uint status;
+ ulong client_capabilities;
+private:
+ /* The main loop implementation triad */
+ int init();
+ void main();
+ void cleanup();
- Mysql_connection_thread_args(struct st_vio *vio_arg,
- Thread_registry &thread_registry_arg,
- const User_map &user_map_arg,
- ulong connection_id_arg,
- Instance_map &instance_map_arg);
+ /* Names are conventionally the same as in mysqld */
+ int check_connection();
+ int do_command();
+ int dispatch_command(enum enum_server_command command,
+ const char *text, uint len);
};
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MYSQL_CONNECTION_H
diff --git a/server-tools/instance-manager/priv.cc b/server-tools/instance-manager/priv.cc
index 7e3553444ac..934684a1a06 100644
--- a/server-tools/instance-manager/priv.cc
+++ b/server-tools/instance-manager/priv.cc
@@ -22,17 +22,6 @@
#include "log.h"
-#if defined(__ia64__) || defined(__ia64)
-/*
- We can live with 32K, but reserve 64K. Just to be safe.
- On ia64 we need to reserve double of the size.
-*/
-#define IM_THREAD_STACK_SIZE (128*1024L)
-#else
-#define IM_THREAD_STACK_SIZE (64*1024)
-#endif
-
-
/* the pid of the manager process (of the signal thread on the LinuxThreads) */
pid_t manager_pid;
@@ -66,33 +55,6 @@ unsigned long bytes_sent = 0L, bytes_received = 0L;
unsigned long mysqld_net_retry_count = 10L;
unsigned long open_files_limit;
-/*
- Change the stack size and start a thread. Return an error if either
- pthread_attr_setstacksize or pthread_create fails.
- Arguments are the same as for pthread_create().
-*/
-
-int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void *), void *arg)
-{
- int rc= 0;
-
-#ifndef __WIN__
-#ifndef PTHREAD_STACK_MIN
-#define PTHREAD_STACK_MIN 32768
-#endif
- /*
- Set stack size to be safe on the platforms with too small
- default thread stack.
- */
- rc= pthread_attr_setstacksize(attr,
- (size_t) (PTHREAD_STACK_MIN +
- IM_THREAD_STACK_SIZE));
-#endif
- if (!rc)
- rc= pthread_create(thread, attr, start_routine, arg);
- return rc;
-}
int create_pid_file(const char *pid_file_name, int pid)
diff --git a/server-tools/instance-manager/priv.h b/server-tools/instance-manager/priv.h
index 33be81bc3eb..7ac8ac73c6f 100644
--- a/server-tools/instance-manager/priv.h
+++ b/server-tools/instance-manager/priv.h
@@ -105,10 +105,6 @@ extern unsigned long bytes_sent, bytes_received;
extern unsigned long mysqld_net_retry_count;
extern unsigned long open_files_limit;
-
-int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void *), void *arg);
-
int create_pid_file(const char *pid_file_name, int pid);
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_PRIV_H
diff --git a/server-tools/instance-manager/thread_registry.cc b/server-tools/instance-manager/thread_registry.cc
index 10370e0981e..037f104f7bd 100644
--- a/server-tools/instance-manager/thread_registry.cc
+++ b/server-tools/instance-manager/thread_registry.cc
@@ -38,15 +38,13 @@ static void handle_signal(int __attribute__((unused)) sig_no)
}
#endif
-/*
- Thread_info initializer methods
-*/
+/* Thread_info initializer methods */
-Thread_info::Thread_info() {}
-Thread_info::Thread_info(pthread_t thread_id_arg,
- bool send_signal_on_shutdown_arg) :
- thread_id(thread_id_arg),
- send_signal_on_shutdown(send_signal_on_shutdown_arg) {}
+void Thread_info::init(bool send_signal_on_shutdown_arg)
+{
+ thread_id= pthread_self();
+ send_signal_on_shutdown= send_signal_on_shutdown_arg;
+}
/*
TODO: think about moving signal information (now it's shutdown_in_progress)
@@ -86,11 +84,14 @@ Thread_registry::~Thread_registry()
points to the last node.
*/
-void Thread_registry::register_thread(Thread_info *info)
+void Thread_registry::register_thread(Thread_info *info,
+ bool send_signal_on_shutdown)
{
log_info("Thread_registry: registering thread %d...",
(int) info->thread_id);
+ info->init(send_signal_on_shutdown);
+
#ifndef __WIN__
struct sigaction sa;
sa.sa_handler= handle_signal;
@@ -298,3 +299,80 @@ void Thread_registry::wait_for_threads_to_unregister()
}
}
}
+
+
+/*********************************************************************
+ class Thread
+*********************************************************************/
+
+#if defined(__ia64__) || defined(__ia64)
+/*
+ We can live with 32K, but reserve 64K. Just to be safe.
+ On ia64 we need to reserve double of the size.
+*/
+#define IM_THREAD_STACK_SIZE (128*1024L)
+#else
+#define IM_THREAD_STACK_SIZE (64*1024)
+#endif
+
+/*
+ Change the stack size and start a thread. Return an error if either
+ pthread_attr_setstacksize or pthread_create fails.
+ Arguments are the same as for pthread_create().
+*/
+
+static
+int set_stacksize_and_create_thread(pthread_t *thread, pthread_attr_t *attr,
+ void *(*start_routine)(void *), void *arg)
+{
+ int rc= 0;
+
+#ifndef __WIN__
+#ifndef PTHREAD_STACK_MIN
+#define PTHREAD_STACK_MIN 32768
+#endif
+ /*
+ Set stack size to be safe on the platforms with too small
+ default thread stack.
+ */
+ rc= pthread_attr_setstacksize(attr,
+ (size_t) (PTHREAD_STACK_MIN +
+ IM_THREAD_STACK_SIZE));
+#endif
+ if (!rc)
+ rc= pthread_create(thread, attr, start_routine, arg);
+ return rc;
+}
+
+
+Thread::~Thread()
+{
+}
+
+
+void *Thread::thread_func(void *arg)
+{
+ Thread *thread= (Thread *) arg;
+ my_thread_init();
+
+ thread->run();
+
+ my_thread_end();
+ return NULL;
+}
+
+
+bool Thread::start_detached()
+{
+ pthread_t thd_id;
+ pthread_attr_t attr;
+ int rc;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ rc= set_stacksize_and_create_thread(&thd_id, &attr,
+ Thread::thread_func, this);
+ pthread_attr_destroy(&attr);
+
+ return rc != 0;
+}
diff --git a/server-tools/instance-manager/thread_registry.h b/server-tools/instance-manager/thread_registry.h
index 3af7c8e0240..034ac1b0ca8 100644
--- a/server-tools/instance-manager/thread_registry.h
+++ b/server-tools/instance-manager/thread_registry.h
@@ -57,7 +57,7 @@
#pragma interface
#endif
-/*
+/**
Thread_info - repository entry for each worker thread
All entries comprise double-linked list like:
0 -- entry -- entry -- entry - 0
@@ -67,12 +67,10 @@
class Thread_info
{
public:
- Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg);
+ Thread_info() {}
friend class Thread_registry;
-
private:
- Thread_info();
-
+ void init(bool send_signal_on_shutdown);
private:
pthread_cond_t *current_cond;
Thread_info *prev, *next;
@@ -81,7 +79,26 @@ private:
};
-/*
+/**
+ A base class for a detached thread.
+*/
+
+class Thread
+{
+public:
+ Thread() {}
+ bool start_detached();
+protected:
+ virtual void run()= 0;
+ virtual ~Thread();
+private:
+ static void *thread_func(void *arg);
+ Thread(const Thread & /* rhs */); /* not implemented */
+ Thread &operator=(const Thread & /* rhs */); /* not implemented */
+};
+
+
+/**
Thread_registry - contains handles for each worker thread to deliver
signal information to workers.
*/
@@ -92,7 +109,7 @@ public:
Thread_registry();
~Thread_registry();
- void register_thread(Thread_info *info);
+ void register_thread(Thread_info *info, bool send_signal_on_shutdown= TRUE);
void unregister_thread(Thread_info *info);
void deliver_shutdown();
void request_shutdown();