summaryrefslogtreecommitdiff
path: root/vio/viosocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'vio/viosocket.c')
-rw-r--r--vio/viosocket.c142
1 files changed, 130 insertions, 12 deletions
diff --git a/vio/viosocket.c b/vio/viosocket.c
index 56aa84df5fc..baefa1c6d06 100644
--- a/vio/viosocket.c
+++ b/vio/viosocket.c
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2001, 2012, Oracle and/or its affiliates
+ Copyright (c) 2012, Monty Program Ab
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@@ -29,6 +30,8 @@
#pragma comment(lib, "ws2_32.lib")
#endif
#include "vio_priv.h"
+#include "my_context.h"
+#include <mysql_async.h>
#ifdef FIONREAD_IN_SYS_FILIO
# include <sys/filio.h>
@@ -49,12 +52,26 @@ size_t vio_read(Vio * vio, uchar* buf, size_t size)
/* Ensure nobody uses vio_read_buff and vio_read simultaneously */
DBUG_ASSERT(vio->read_end == vio->read_pos);
+ if (vio->async_context && vio->async_context->active)
+ r= my_recv_async(vio->async_context, vio->sd, buf, size, vio->read_timeout);
+ else
+ {
+ if (vio->async_context)
+ {
+ /*
+ If switching from non-blocking to blocking API usage, set the socket
+ back to blocking mode.
+ */
+ my_bool old_mode;
+ vio_blocking(vio, TRUE, &old_mode);
+ }
#ifdef __WIN__
- r = recv(vio->sd, buf, size,0);
+ r = recv(vio->sd, buf, size,0);
#else
- errno=0; /* For linux */
- r = read(vio->sd, buf, size);
+ errno=0; /* For linux */
+ r = read(vio->sd, buf, size);
#endif /* __WIN__ */
+ }
#ifndef DBUG_OFF
if (r == (size_t) -1)
{
@@ -121,11 +138,26 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size)
DBUG_ENTER("vio_write");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
+ if (vio->async_context && vio->async_context->active)
+ r= my_send_async(vio->async_context, vio->sd, buf, size,
+ vio->write_timeout);
+ else
+ {
+ if (vio->async_context)
+ {
+ /*
+ If switching from non-blocking to blocking API usage, set the socket
+ back to blocking mode.
+ */
+ my_bool old_mode;
+ vio_blocking(vio, TRUE, &old_mode);
+ }
#ifdef __WIN__
- r = send(vio->sd, buf, size,0);
+ r = send(vio->sd, buf, size,0);
#else
- r = write(vio->sd, buf, size);
+ r = write(vio->sd, buf, size);
#endif /* __WIN__ */
+ }
#ifndef DBUG_OFF
if (r == (size_t) -1)
{
@@ -136,6 +168,61 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size)
DBUG_RETURN(r);
}
+#ifdef _WIN32
+static void CALLBACK cancel_io_apc(ULONG_PTR data)
+{
+ CancelIo((HANDLE)data);
+}
+
+/*
+ Cancel IO on Windows.
+
+ On XP, issue CancelIo as asynchronous procedure call to the thread that started
+ IO. On Vista+, simpler cancelation is done with CancelIoEx.
+*/
+
+int cancel_io(HANDLE handle, DWORD thread_id)
+{
+ static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *);
+ static volatile int first_time= 1;
+ int rc;
+ HANDLE thread_handle;
+
+ if (first_time)
+ {
+ /* Try to load CancelIoEx using GetProcAddress */
+ InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx,
+ GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL);
+ first_time =0;
+ }
+
+ if (fp_CancelIoEx)
+ {
+ return fp_CancelIoEx(handle, NULL)? 0 :-1;
+ }
+
+ thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id);
+ if (thread_handle)
+ {
+ rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle);
+ CloseHandle(thread_handle);
+ }
+ return rc;
+
+}
+#endif
+
+int vio_socket_shutdown(Vio *vio, int how)
+{
+ int ret= shutdown(vio->sd, how);
+#ifdef _WIN32
+ /* Cancel possible IO in progress (shutdown does not do that on Windows). */
+ (void) cancel_io((HANDLE)vio->sd, vio->thread_id);
+#endif
+ return ret;
+}
+
+
int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode,
my_bool *old_mode)
{
@@ -209,6 +296,11 @@ int vio_fastsend(Vio * vio __attribute__((unused)))
int r=0;
DBUG_ENTER("vio_fastsend");
+ if (vio->type == VIO_TYPE_NAMEDPIPE ||vio->type == VIO_TYPE_SHARED_MEMORY)
+ {
+ DBUG_RETURN(0);
+ }
+
#if defined(IPTOS_THROUGHPUT)
{
int tos = IPTOS_THROUGHPUT;
@@ -244,7 +336,7 @@ int vio_keepalive(Vio* vio, my_bool set_keep_alive)
DBUG_ENTER("vio_keepalive");
DBUG_PRINT("enter", ("sd: %d set_keep_alive: %d", vio->sd, (int)
set_keep_alive));
- if (vio->type != VIO_TYPE_NAMEDPIPE)
+ if (vio->type != VIO_TYPE_NAMEDPIPE && vio->type != VIO_TYPE_SHARED_MEMORY)
{
if (set_keep_alive)
opt = 1;
@@ -665,6 +757,8 @@ my_bool vio_poll_read(Vio *vio, uint timeout)
{
my_socket sd= vio->sd;
DBUG_ENTER("vio_poll_read");
+ if (vio->async_context && vio->async_context->active)
+ DBUG_RETURN(my_poll_read_async(vio->async_context, timeout));
#ifdef HAVE_OPENSSL
if (vio->type == VIO_TYPE_SSL)
sd= SSL_get_fd((SSL*) vio->ssl_arg);
@@ -753,10 +847,31 @@ void vio_timeout(Vio *vio, uint which, uint timeout)
thr_alarm or just run without read/write timeout(s)
*/
#endif
+ /* Make timeout values available for async operations. */
+ if (which)
+ vio->write_timeout= timeout;
+ else
+ vio->read_timeout= timeout;
}
#ifdef __WIN__
+/*
+ Disable posting IO completion event to the port.
+ In some cases (synchronous timed IO) we want to skip IOCP notifications.
+*/
+static void disable_iocp_notification(OVERLAPPED *overlapped)
+{
+ HANDLE *handle = &(overlapped->hEvent);
+ *handle = ((HANDLE)((ULONG_PTR) *handle|1));
+}
+
+/* Enable posting IO completion event to the port */
+static void enable_iocp_notification(OVERLAPPED *overlapped)
+{
+ HANDLE *handle = &(overlapped->hEvent);
+ *handle = (HANDLE)((ULONG_PTR) *handle & ~1);
+}
/*
Finish pending IO on pipe. Honor wait timeout
@@ -768,7 +883,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m
DBUG_ENTER("pipe_complete_io");
- ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms);
+ ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE);
/*
WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed)
or WAIT_TIMEOUT.
@@ -798,7 +913,8 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
DBUG_ENTER("vio_read_pipe");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
-
+
+ disable_iocp_notification(&vio->pipe_overlapped);
if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read,
&(vio->pipe_overlapped)))
{
@@ -808,13 +924,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
{
if (GetLastError() != ERROR_IO_PENDING)
{
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("error",("ReadFile() returned last error %d",
GetLastError()));
DBUG_RETURN((size_t)-1);
}
retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms);
}
-
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}
@@ -827,7 +944,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
DBUG_ENTER("vio_write_pipe");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
-
+ disable_iocp_notification(&vio->pipe_overlapped);
if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written,
&(vio->pipe_overlapped)))
{
@@ -835,6 +952,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
else
{
+ enable_iocp_notification(&vio->pipe_overlapped);
if (GetLastError() != ERROR_IO_PENDING)
{
DBUG_PRINT("vio_error",("WriteFile() returned last error %d",
@@ -843,7 +961,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms);
}
-
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}