summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2016-03-08 10:28:26 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2016-03-08 10:28:26 +0100
commit1a3db0e24f8e5be37a9aff2cce4681670f9de044 (patch)
tree3cdddc9d9857b1a97b200845b416613700855e65
parenta8d97fb8186968e193ce0f3c8d1ee416b115e9e0 (diff)
downloadmariadb-git-1a3db0e24f8e5be37a9aff2cce4681670f9de044.tar.gz
Fix threadpool after it was broken by MDEV-6150
-rw-r--r--mysql-test/r/connect2.result10
-rw-r--r--mysql-test/t/connect2.test11
-rw-r--r--sql/threadpool.h2
-rw-r--r--sql/threadpool_common.cc34
-rw-r--r--sql/threadpool_unix.cc31
-rw-r--r--sql/threadpool_win.cc27
6 files changed, 63 insertions, 52 deletions
diff --git a/mysql-test/r/connect2.result b/mysql-test/r/connect2.result
index 5e529db9376..6e9493d4d3e 100644
--- a/mysql-test/r/connect2.result
+++ b/mysql-test/r/connect2.result
@@ -5,11 +5,11 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_1';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
-ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
+Got one of the listed errors
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
-ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
+Got one of the listed errors
set global debug_dbug=@old_debug;
select 1;
1
@@ -19,11 +19,11 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_1';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
-ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
+Got one of the listed errors
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
-ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
+Got one of the listed errors
set global debug_dbug=@old_debug;
select 1;
1
@@ -37,7 +37,7 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
-ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
+Got one of the listed errors
show status like "Threads_connected";
Variable_name Value
Threads_connected 1
diff --git a/mysql-test/t/connect2.test b/mysql-test/t/connect2.test
index b0d5c21b1cc..64cd15155da 100644
--- a/mysql-test/t/connect2.test
+++ b/mysql-test/t/connect2.test
@@ -16,14 +16,15 @@ disconnect con1;
connection default;
set global debug_dbug='+d,simulate_failed_connection_1';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT
---error 2013
+--error 1041,2013
connect(con1,localhost,root,,test,,);
connection default;
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT
---error 2013
+--error 1041,2013
connect(con1,localhost,root,,test,,);
+--enable_result_log
connection default;
set global debug_dbug=@old_debug;
connect(con1,localhost,root,,test,,);
@@ -38,13 +39,13 @@ disconnect con1;
connection default;
set global debug_dbug='+d,simulate_failed_connection_1';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
---error 2013
+--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
---error 2013
+--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
set global debug_dbug=@old_debug;
@@ -66,7 +67,7 @@ disconnect con2;
connection default;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
---error 2013
+--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 93f349091f8..9cf6606ef60 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -30,7 +30,7 @@ extern uint threadpool_oversubscribe; /* Maximum active threads in group */
extern void threadpool_cleanup_connection(THD *thd);
extern void threadpool_remove_connection(THD *thd);
extern int threadpool_process_request(THD *thd);
-extern int threadpool_add_connection(THD *thd);
+extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data);
/*
Functions used by scheduler.
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 63622d4a5f0..7cba3f014f4 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -94,7 +94,7 @@ struct Worker_thread_context
/*
Attach/associate the connection with the OS thread,
*/
-static bool thread_attach(THD* thd)
+static void thread_attach(THD* thd)
{
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd;
@@ -103,13 +103,14 @@ static bool thread_attach(THD* thd)
if (PSI_server)
PSI_server->set_thread(thd->event_scheduler.m_psi);
#endif
- return 0;
}
-int threadpool_add_connection(THD *thd)
+THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{
- int retval=1;
+ THD *thd= NULL;
+ int error=1;
+
Worker_thread_context worker_context;
worker_context.save();
@@ -120,13 +121,23 @@ int threadpool_add_connection(THD *thd)
pthread_setspecific(THR_KEY_mysys, 0);
my_thread_init();
- thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
- if (!thd->mysys_var)
+ st_my_thread_var* mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
+ DBUG_EXECUTE_IF("simulate_failed_connection_1", mysys_var= NULL; my_thread_end(););
+ if (!mysys_var ||!(thd= connect->create_thd()))
{
/* Out of memory? */
+ connect->close_and_delete();
+ if (mysys_var)
+ {
+ my_thread_end();
+ }
worker_context.restore();
- return 1;
+ return NULL;
}
+ delete connect;
+ add_to_active_threads(thd);
+ thd->mysys_var= mysys_var;
+ thd->event_scheduler.data= scheduler_data;
/* Create new PSI thread for use with the THD. */
#ifdef HAVE_PSI_INTERFACE
@@ -157,14 +168,19 @@ int threadpool_add_connection(THD *thd)
*/
if (thd_is_connection_alive(thd))
{
- retval= 0;
+ error= 0;
thd->net.reading_or_writing= 1;
thd->skip_wait_timeout= true;
}
}
}
+ if (error)
+ {
+ threadpool_cleanup_connection(thd);
+ thd= NULL;
+ }
worker_context.restore();
- return retval;
+ return thd;
}
/*
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index a43ae2162ae..9f319e5c89a 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -116,6 +116,7 @@ struct connection_t
connection_t *next_in_queue;
connection_t **prev_in_queue;
ulonglong abs_wait_timeout;
+ CONNECT* connect;
bool logged_in;
bool bound_to_poll_descriptor;
bool waiting;
@@ -1215,6 +1216,7 @@ connection_t *alloc_connection()
connection->logged_in= false;
connection->bound_to_poll_descriptor= false;
connection->abs_wait_timeout= ULONGLONG_MAX;
+ connection->thd= 0;
}
DBUG_RETURN(connection);
}
@@ -1228,26 +1230,19 @@ connection_t *alloc_connection()
void tp_add_connection(CONNECT *connect)
{
connection_t *connection;
- THD *thd;
DBUG_ENTER("tp_add_connection");
- if (!(connection= alloc_connection()) || !(thd= connect->create_thd()))
+ connection= alloc_connection();
+ if (!connection)
{
- my_free(connection);
- connect->close_and_delete();
DBUG_VOID_RETURN;
}
- connection->thd= thd;
- delete connect;
-
- add_to_active_threads(thd);
+ connection->connect= connect;
- thd->event_scheduler.data= connection;
-
/* Assign connection to a group. */
thread_group_t *group=
- &all_groups[thd->thread_id%group_count];
-
+ &all_groups[connect->thread_id%group_count];
+
connection->thread_group=group;
mysql_mutex_lock(&group->mutex);
@@ -1271,9 +1266,12 @@ static void connection_abort(connection_t *connection)
{
DBUG_ENTER("connection_abort");
thread_group_t *group= connection->thread_group;
-
- threadpool_remove_connection(connection->thd);
-
+
+ if (connection->thd)
+ {
+ threadpool_remove_connection(connection->thd);
+ }
+
mysql_mutex_lock(&group->mutex);
group->connection_count--;
mysql_mutex_unlock(&group->mutex);
@@ -1442,7 +1440,8 @@ static void handle_event(connection_t *connection)
if (!connection->logged_in)
{
- err= threadpool_add_connection(connection->thd);
+ connection->thd = threadpool_add_connection(connection->connect, connection);
+ err= (connection->thd == NULL);
connection->logged_in= true;
}
else
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index cd7cf5df7be..c7cde5f6981 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -226,11 +226,12 @@ struct connection_t
PTP_WAIT shm_read;
/* Callback instance, used to inform treadpool about long callbacks */
PTP_CALLBACK_INSTANCE callback_instance;
+ CONNECT* connect;
bool logged_in;
};
-void init_connection(connection_t *connection, THD *thd)
+void init_connection(connection_t *connection, CONNECT *connect)
{
connection->logged_in = false;
connection->handle= 0;
@@ -240,11 +241,11 @@ void init_connection(connection_t *connection, THD *thd)
connection->logged_in = false;
connection->timeout= ULONGLONG_MAX;
connection->callback_instance= 0;
+ connection->thd= 0;
memset(&connection->overlapped, 0, sizeof(OVERLAPPED));
InitializeThreadpoolEnvironment(&connection->callback_environ);
SetThreadpoolCallbackPool(&connection->callback_environ, pool);
- connection->thd = thd;
- thd->event_scheduler.data= connection;
+ connection->connect= connect;
}
@@ -397,8 +398,8 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{
- if (threadpool_add_connection(connection->thd) == 0
- && init_io(connection, connection->thd) == 0
+ if ((connection->thd= threadpool_add_connection(connection->connect, connection))
+ && init_io(connection, connection->thd) == 0
&& start_io(connection, instance) == 0)
{
return 0;
@@ -660,22 +661,16 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
void tp_add_connection(CONNECT *connect)
{
- THD *thd;
- connection_t *con;
-
- if (!(con = (connection_t *) malloc(sizeof(connection_t))) ||
- !(thd= connect->create_thd()))
+ connection_t *con;
+ con= (connection_t *)malloc(sizeof(connection_t));
+ if (!con)
{
tp_log_warning("Allocation failed", "tp_add_connection");
- free(con);
connect->close_and_delete();
return;
}
- delete connect;
- add_to_active_threads(thd);
-
- init_connection(con, thd);
+ init_connection(con, connect);
/* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
@@ -687,7 +682,7 @@ void tp_add_connection(CONNECT *connect)
else
{
/* Likely memory pressure */
- threadpool_cleanup_connection(thd);
+ connect->close_and_delete();
}
}