From e91bbca5fb080a8d988c156d78c7dc1b1daaad82 Mon Sep 17 00:00:00 2001 From: Vladislav Vaintroub Date: Thu, 8 Dec 2011 19:17:49 +0100 Subject: Initial threadpool implementation for MariaDB 5.5 --- vio/viosocket.c | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 5 deletions(-) (limited to 'vio/viosocket.c') diff --git a/vio/viosocket.c b/vio/viosocket.c index 4772847abd8..1f129cb8e55 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -131,6 +131,60 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size) DBUG_RETURN(r); } +#ifdef _WIN32 +static void CALLBACK cancel_io_apc(ULONG_PTR data) +{ + CancelIo((HANDLE)data); +} + +/* + Cancel IO on Windows. + + On XP, issue CancelIo as asynchronous procedure call to the thread that started + IO. On Vista+, simpler cancelation is done with CancelIoEx. +*/ + +static int cancel_io(HANDLE handle, DWORD thread_id) +{ + static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *); + static volatile int first_time= 1; + int rc; + HANDLE thread_handle; + + if (first_time) + { + /* Try to load CancelIoEx using GetProcAddress */ + InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx, + GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL); + first_time =0; + } + + if (fp_CancelIoEx) + { + return fp_CancelIoEx(handle, NULL)? 0 :-1; + } + + thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id); + if (thread_handle) + { + rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle); + CloseHandle(thread_handle); + } + return rc; + +} +#endif + +int vio_socket_shutdown(Vio *vio, int how) +{ +#ifdef _WIN32 + return cancel_io((HANDLE)vio->sd, vio->thread_id); +#else + return shutdown(vio->sd, how); +#endif +} + + int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode, my_bool *old_mode) { @@ -726,6 +780,22 @@ void vio_timeout(Vio *vio, uint which, uint timeout) #ifdef __WIN__ +/* + Disable posting IO completion event to the port. + In some cases (synchronous timed IO) we want to skip IOCP notifications. +*/ +static void disable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = ((HANDLE)((ULONG_PTR) *handle|1)); +} + +/* Enable posting IO completion event to the port */ +static void enable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = (HANDLE)((ULONG_PTR) *handle & ~1); +} /* Finish pending IO on pipe. Honor wait timeout @@ -737,7 +807,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m DBUG_ENTER("pipe_complete_io"); - ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms); + ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE); /* WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed) or WAIT_TIMEOUT. @@ -767,7 +837,8 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) DBUG_ENTER("vio_read_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + + disable_iocp_notification(&vio->pipe_overlapped); if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read, &(vio->pipe_overlapped))) { @@ -777,13 +848,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) { if (GetLastError() != ERROR_IO_PENDING) { + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("error",("ReadFile() returned last error %d", GetLastError())); DBUG_RETURN((size_t)-1); } retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } @@ -796,7 +868,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) DBUG_ENTER("vio_write_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + disable_iocp_notification(&vio->pipe_overlapped); if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written, &(vio->pipe_overlapped))) { @@ -804,6 +876,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } else { + enable_iocp_notification(&vio->pipe_overlapped); if (GetLastError() != ERROR_IO_PENDING) { DBUG_PRINT("vio_error",("WriteFile() returned last error %d", @@ -812,7 +885,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } -- cgit v1.2.1