diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
commit | e91bbca5fb080a8d988c156d78c7dc1b1daaad82 (patch) | |
tree | edeb15da451e956ae0d6874657c910ab0df111f8 /vio | |
parent | 5e7b949e61f4330e27013c8ec81fa3d450e5dce6 (diff) | |
download | mariadb-git-e91bbca5fb080a8d988c156d78c7dc1b1daaad82.tar.gz |
Initial threadpool implementation for MariaDB 5.5
Diffstat (limited to 'vio')
-rw-r--r-- | vio/vio.c | 25 | ||||
-rw-r--r-- | vio/vio_priv.h | 4 | ||||
-rw-r--r-- | vio/viosocket.c | 83 |
3 files changed, 106 insertions, 6 deletions
diff --git a/vio/vio.c b/vio/vio.c index b8bc7bdae08..aa0d2012afa 100644 --- a/vio/vio.c +++ b/vio/vio.c @@ -49,6 +49,25 @@ static my_bool has_no_data(Vio *vio __attribute__((unused))) return FALSE; } +#ifdef _WIN32 +my_bool vio_shared_memory_has_data(Vio *vio) +{ + return (vio->shared_memory_remain > 0); +} + +int vio_shared_memory_shutdown(Vio *vio, int how) +{ + SetEvent(vio->event_conn_closed); + SetEvent(vio->event_server_wrote); + return 0; +} + +int vio_pipe_shutdown(Vio *vio, int how) +{ + return vio_socket_shutdown(vio, how); /* cancels io */ +} +#endif + /* * Helper to fill most of the Vio* with defaults. */ @@ -89,6 +108,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =no_poll_read; vio->is_connected =vio_is_connected_pipe; vio->has_data =has_no_data; + vio->shutdown =vio_pipe_shutdown; vio->timeout=vio_win32_timeout; /* Set default timeout */ @@ -116,7 +136,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =no_poll_read; vio->is_connected =vio_is_connected_shared_memory; - vio->has_data =has_no_data; + vio->has_data =vio_shared_memory_has_data; + vio->shutdown =vio_shared_memory_shutdown; /* Currently, shared memory is on Windows only, hence the below is ok*/ vio->timeout= vio_win32_timeout; @@ -145,6 +166,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =vio_poll_read; vio->is_connected =vio_is_connected; vio->has_data =vio_ssl_has_data; + vio->shutdown =vio_socket_shutdown; DBUG_VOID_RETURN; } #endif /* HAVE_OPENSSL */ @@ -163,6 +185,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->timeout =vio_timeout; vio->poll_read =vio_poll_read; vio->is_connected =vio_is_connected; + vio->shutdown =vio_socket_shutdown; vio->has_data= (flags & VIO_BUFFERED_READ) ? vio_buff_has_data : has_no_data; DBUG_VOID_RETURN; diff --git a/vio/vio_priv.h b/vio/vio_priv.h index 702ba4de38a..3f62c508375 100644 --- a/vio/vio_priv.h +++ b/vio/vio_priv.h @@ -39,6 +39,7 @@ size_t vio_read_pipe(Vio *vio, uchar * buf, size_t size); size_t vio_write_pipe(Vio *vio, const uchar * buf, size_t size); my_bool vio_is_connected_pipe(Vio *vio); int vio_close_pipe(Vio * vio); +int vio_shutdown_pipe(Vio *vio,int how); #endif #ifdef HAVE_SMEM @@ -46,8 +47,11 @@ size_t vio_read_shared_memory(Vio *vio, uchar * buf, size_t size); size_t vio_write_shared_memory(Vio *vio, const uchar * buf, size_t size); my_bool vio_is_connected_shared_memory(Vio *vio); int vio_close_shared_memory(Vio * vio); +my_bool vio_shared_memory_has_data(Vio *vio); +int vio_shutdown_shared_memory(Vio *vio, int how); #endif +int vio_socket_shutdown(Vio *vio, int how); void vio_timeout(Vio *vio,uint which, uint timeout); my_bool vio_buff_has_data(Vio *vio); diff --git a/vio/viosocket.c b/vio/viosocket.c index 4772847abd8..1f129cb8e55 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -131,6 +131,60 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size) DBUG_RETURN(r); } +#ifdef _WIN32 +static void CALLBACK cancel_io_apc(ULONG_PTR data) +{ + CancelIo((HANDLE)data); +} + +/* + Cancel IO on Windows. + + On XP, issue CancelIo as asynchronous procedure call to the thread that started + IO. On Vista+, simpler cancelation is done with CancelIoEx. +*/ + +static int cancel_io(HANDLE handle, DWORD thread_id) +{ + static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *); + static volatile int first_time= 1; + int rc; + HANDLE thread_handle; + + if (first_time) + { + /* Try to load CancelIoEx using GetProcAddress */ + InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx, + GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL); + first_time =0; + } + + if (fp_CancelIoEx) + { + return fp_CancelIoEx(handle, NULL)? 0 :-1; + } + + thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id); + if (thread_handle) + { + rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle); + CloseHandle(thread_handle); + } + return rc; + +} +#endif + +int vio_socket_shutdown(Vio *vio, int how) +{ +#ifdef _WIN32 + return cancel_io((HANDLE)vio->sd, vio->thread_id); +#else + return shutdown(vio->sd, how); +#endif +} + + int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode, my_bool *old_mode) { @@ -726,6 +780,22 @@ void vio_timeout(Vio *vio, uint which, uint timeout) #ifdef __WIN__ +/* + Disable posting IO completion event to the port. + In some cases (synchronous timed IO) we want to skip IOCP notifications. +*/ +static void disable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = ((HANDLE)((ULONG_PTR) *handle|1)); +} + +/* Enable posting IO completion event to the port */ +static void enable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = (HANDLE)((ULONG_PTR) *handle & ~1); +} /* Finish pending IO on pipe. Honor wait timeout @@ -737,7 +807,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m DBUG_ENTER("pipe_complete_io"); - ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms); + ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE); /* WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed) or WAIT_TIMEOUT. @@ -767,7 +837,8 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) DBUG_ENTER("vio_read_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + + disable_iocp_notification(&vio->pipe_overlapped); if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read, &(vio->pipe_overlapped))) { @@ -777,13 +848,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) { if (GetLastError() != ERROR_IO_PENDING) { + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("error",("ReadFile() returned last error %d", GetLastError())); DBUG_RETURN((size_t)-1); } retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } @@ -796,7 +868,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) DBUG_ENTER("vio_write_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + disable_iocp_notification(&vio->pipe_overlapped); if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written, &(vio->pipe_overlapped))) { @@ -804,6 +876,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } else { + enable_iocp_notification(&vio->pipe_overlapped); if (GetLastError() != ERROR_IO_PENDING) { DBUG_PRINT("vio_error",("WriteFile() returned last error %d", @@ -812,7 +885,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } |