diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2016-03-08 10:28:26 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2016-03-08 10:28:26 +0100 |
commit | 1a3db0e24f8e5be37a9aff2cce4681670f9de044 (patch) | |
tree | 3cdddc9d9857b1a97b200845b416613700855e65 | |
parent | a8d97fb8186968e193ce0f3c8d1ee416b115e9e0 (diff) | |
download | mariadb-git-1a3db0e24f8e5be37a9aff2cce4681670f9de044.tar.gz |
Fix threadpool after it was broken by MDEV-6150
-rw-r--r-- | mysql-test/r/connect2.result | 10 | ||||
-rw-r--r-- | mysql-test/t/connect2.test | 11 | ||||
-rw-r--r-- | sql/threadpool.h | 2 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 34 | ||||
-rw-r--r-- | sql/threadpool_unix.cc | 31 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 27 |
6 files changed, 63 insertions, 52 deletions
diff --git a/mysql-test/r/connect2.result b/mysql-test/r/connect2.result index 5e529db9376..6e9493d4d3e 100644 --- a/mysql-test/r/connect2.result +++ b/mysql-test/r/connect2.result @@ -5,11 +5,11 @@ select 1; 1 set global debug_dbug='+d,simulate_failed_connection_1'; connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET); -ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported" +Got one of the listed errors set global debug_dbug=@old_debug; set global debug_dbug='+d,simulate_failed_connection_2'; connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET); -ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported" +Got one of the listed errors set global debug_dbug=@old_debug; select 1; 1 @@ -19,11 +19,11 @@ select 1; 1 set global debug_dbug='+d,simulate_failed_connection_1'; connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET); -ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported" +Got one of the listed errors set global debug_dbug=@old_debug; set global debug_dbug='+d,simulate_failed_connection_2'; connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET); -ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported" +Got one of the listed errors set global debug_dbug=@old_debug; select 1; 1 @@ -37,7 +37,7 @@ select 1; 1 set global debug_dbug='+d,simulate_failed_connection_2'; connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET); -ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported" +Got one of the listed errors show status like "Threads_connected"; Variable_name Value Threads_connected 1 diff --git a/mysql-test/t/connect2.test b/mysql-test/t/connect2.test index b0d5c21b1cc..64cd15155da 100644 --- a/mysql-test/t/connect2.test +++ b/mysql-test/t/connect2.test @@ -16,14 +16,15 @@ disconnect con1; connection default; set global debug_dbug='+d,simulate_failed_connection_1'; --replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT ---error 2013 +--error 1041,2013 connect(con1,localhost,root,,test,,); connection default; set global debug_dbug=@old_debug; set global debug_dbug='+d,simulate_failed_connection_2'; --replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT ---error 2013 +--error 1041,2013 connect(con1,localhost,root,,test,,); +--enable_result_log connection default; set global debug_dbug=@old_debug; connect(con1,localhost,root,,test,,); @@ -38,13 +39,13 @@ disconnect con1; connection default; set global debug_dbug='+d,simulate_failed_connection_1'; --replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT ---error 2013 +--error 1041,2013 connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,); connection default; set global debug_dbug=@old_debug; set global debug_dbug='+d,simulate_failed_connection_2'; --replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT ---error 2013 +--error 1041,2013 connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,); connection default; set global debug_dbug=@old_debug; @@ -66,7 +67,7 @@ disconnect con2; connection default; set global debug_dbug='+d,simulate_failed_connection_2'; --replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT ---error 2013 +--error 1041,2013 connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,); connection default; diff --git a/sql/threadpool.h b/sql/threadpool.h index 93f349091f8..9cf6606ef60 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -30,7 +30,7 @@ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern void threadpool_cleanup_connection(THD *thd); extern void threadpool_remove_connection(THD *thd); extern int threadpool_process_request(THD *thd); -extern int threadpool_add_connection(THD *thd); +extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data); /* Functions used by scheduler. diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 63622d4a5f0..7cba3f014f4 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -94,7 +94,7 @@ struct Worker_thread_context /* Attach/associate the connection with the OS thread, */ -static bool thread_attach(THD* thd) +static void thread_attach(THD* thd) { pthread_setspecific(THR_KEY_mysys,thd->mysys_var); thd->thread_stack=(char*)&thd; @@ -103,13 +103,14 @@ static bool thread_attach(THD* thd) if (PSI_server) PSI_server->set_thread(thd->event_scheduler.m_psi); #endif - return 0; } -int threadpool_add_connection(THD *thd) +THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) { - int retval=1; + THD *thd= NULL; + int error=1; + Worker_thread_context worker_context; worker_context.save(); @@ -120,13 +121,23 @@ int threadpool_add_connection(THD *thd) pthread_setspecific(THR_KEY_mysys, 0); my_thread_init(); - thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); - if (!thd->mysys_var) + st_my_thread_var* mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); + DBUG_EXECUTE_IF("simulate_failed_connection_1", mysys_var= NULL; my_thread_end();); + if (!mysys_var ||!(thd= connect->create_thd())) { /* Out of memory? */ + connect->close_and_delete(); + if (mysys_var) + { + my_thread_end(); + } worker_context.restore(); - return 1; + return NULL; } + delete connect; + add_to_active_threads(thd); + thd->mysys_var= mysys_var; + thd->event_scheduler.data= scheduler_data; /* Create new PSI thread for use with the THD. */ #ifdef HAVE_PSI_INTERFACE @@ -157,14 +168,19 @@ int threadpool_add_connection(THD *thd) */ if (thd_is_connection_alive(thd)) { - retval= 0; + error= 0; thd->net.reading_or_writing= 1; thd->skip_wait_timeout= true; } } } + if (error) + { + threadpool_cleanup_connection(thd); + thd= NULL; + } worker_context.restore(); - return retval; + return thd; } /* diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index a43ae2162ae..9f319e5c89a 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -116,6 +116,7 @@ struct connection_t connection_t *next_in_queue; connection_t **prev_in_queue; ulonglong abs_wait_timeout; + CONNECT* connect; bool logged_in; bool bound_to_poll_descriptor; bool waiting; @@ -1215,6 +1216,7 @@ connection_t *alloc_connection() connection->logged_in= false; connection->bound_to_poll_descriptor= false; connection->abs_wait_timeout= ULONGLONG_MAX; + connection->thd= 0; } DBUG_RETURN(connection); } @@ -1228,26 +1230,19 @@ connection_t *alloc_connection() void tp_add_connection(CONNECT *connect) { connection_t *connection; - THD *thd; DBUG_ENTER("tp_add_connection"); - if (!(connection= alloc_connection()) || !(thd= connect->create_thd())) + connection= alloc_connection(); + if (!connection) { - my_free(connection); - connect->close_and_delete(); DBUG_VOID_RETURN; } - connection->thd= thd; - delete connect; - - add_to_active_threads(thd); + connection->connect= connect; - thd->event_scheduler.data= connection; - /* Assign connection to a group. */ thread_group_t *group= - &all_groups[thd->thread_id%group_count]; - + &all_groups[connect->thread_id%group_count]; + connection->thread_group=group; mysql_mutex_lock(&group->mutex); @@ -1271,9 +1266,12 @@ static void connection_abort(connection_t *connection) { DBUG_ENTER("connection_abort"); thread_group_t *group= connection->thread_group; - - threadpool_remove_connection(connection->thd); - + + if (connection->thd) + { + threadpool_remove_connection(connection->thd); + } + mysql_mutex_lock(&group->mutex); group->connection_count--; mysql_mutex_unlock(&group->mutex); @@ -1442,7 +1440,8 @@ static void handle_event(connection_t *connection) if (!connection->logged_in) { - err= threadpool_add_connection(connection->thd); + connection->thd = threadpool_add_connection(connection->connect, connection); + err= (connection->thd == NULL); connection->logged_in= true; } else diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index cd7cf5df7be..c7cde5f6981 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -226,11 +226,12 @@ struct connection_t PTP_WAIT shm_read; /* Callback instance, used to inform treadpool about long callbacks */ PTP_CALLBACK_INSTANCE callback_instance; + CONNECT* connect; bool logged_in; }; -void init_connection(connection_t *connection, THD *thd) +void init_connection(connection_t *connection, CONNECT *connect) { connection->logged_in = false; connection->handle= 0; @@ -240,11 +241,11 @@ void init_connection(connection_t *connection, THD *thd) connection->logged_in = false; connection->timeout= ULONGLONG_MAX; connection->callback_instance= 0; + connection->thd= 0; memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); InitializeThreadpoolEnvironment(&connection->callback_environ); SetThreadpoolCallbackPool(&connection->callback_environ, pool); - connection->thd = thd; - thd->event_scheduler.data= connection; + connection->connect= connect; } @@ -397,8 +398,8 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) { - if (threadpool_add_connection(connection->thd) == 0 - && init_io(connection, connection->thd) == 0 + if ((connection->thd= threadpool_add_connection(connection->connect, connection)) + && init_io(connection, connection->thd) == 0 && start_io(connection, instance) == 0) { return 0; @@ -660,22 +661,16 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, void tp_add_connection(CONNECT *connect) { - THD *thd; - connection_t *con; - - if (!(con = (connection_t *) malloc(sizeof(connection_t))) || - !(thd= connect->create_thd())) + connection_t *con; + con= (connection_t *)malloc(sizeof(connection_t)); + if (!con) { tp_log_warning("Allocation failed", "tp_add_connection"); - free(con); connect->close_and_delete(); return; } - delete connect; - add_to_active_threads(thd); - - init_connection(con, thd); + init_connection(con, connect); /* Try to login asynchronously, using threads in the pool */ PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); @@ -687,7 +682,7 @@ void tp_add_connection(CONNECT *connect) else { /* Likely memory pressure */ - threadpool_cleanup_connection(thd); + connect->close_and_delete(); } } |