diff options
author | unknown <knielsen@knielsen-hq.org> | 2011-09-20 12:49:25 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2011-09-20 12:49:25 +0200 |
commit | a5b881594da4258257b18cc42f5ce7be3524e02c (patch) | |
tree | 6ddddaef439cce2f930abfab3929f9ad80d8c818 /vio/viosocket.c | |
parent | 1a344b87e4d153d52468307cc886b5f424cb2dbf (diff) | |
download | mariadb-git-a5b881594da4258257b18cc42f5ce7be3524e02c.tar.gz |
MWL#192: Non-blocking client API for libmysqlclient.
All client functions that can block on I/O have alternate _start() and
_cont() versions that do not block but return control back to the
application, which can then issue I/O wait in its own fashion and later
call back into the library to continue the operation.
Works behind the scenes by spawning a co-routine/fiber to run the
blocking operation and suspend it while waiting for I/O. This
co-routine/fiber use is invisible to applications.
For i368/x86_64 on GCC, uses very fast assembler co-routine support. On
Windows uses native Win32 Fibers. Falls back to POSIX ucontext on other
platforms. Assembler routines for more platforms are relatively easy to
add by extending mysys/my_context.c, eg. similar to the Lua lcoco
library.
For testing, mysqltest and mysql_client_test are extended with the
option --non-blocking-api. This causes the programs to use the
non-blocking API for database access. mysql-test-run.pl has a similar
option --non-blocking-api that uses this, as well as additional
testcases.
An example program tests/async_queries.c is included that uses the new
non-blocking API with libevent to show how, in a single-threaded
program, to issue many queries in parallel against a database.
client/async_example.c:
Fix const warning
******
Fix bug with wrong timeout value for poll().
include/Makefile.am:
Fix missing include for `make dist`
include/mysql.h:
Add prototypes for all non-blocking API calls.
include/mysql.h.pp:
Add prototypes for all non-blocking API calls.
mysys/my_context.c:
Fix type warning for makecontext() function pointer argument.
sql-common/mysql_async.c:
Fix crashes in the non-blocking API for functions that can take MYSQL argument
that is NULL.
tests/Makefile.am:
Add header file to `make dist`
tests/mysql_client_test.c:
Replace blocking calls with wrappers around the non-blocking calls, used in
mysql_client_test to test the new non-blocking API.
tests/nonblock-wrappers.h:
Replace blocking calls with wrappers around the non-blocking calls, used in
mysql_client_test to test the new non-blocking API.
Diffstat (limited to 'vio/viosocket.c')
-rw-r--r-- | vio/viosocket.c | 59 |
1 files changed, 54 insertions, 5 deletions
diff --git a/vio/viosocket.c b/vio/viosocket.c index f780764cbe4..878378bb837 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -21,6 +21,7 @@ */ #include "vio_priv.h" +#include "my_context.h" int vio_errno(Vio *vio __attribute__((unused))) { @@ -31,18 +32,34 @@ int vio_errno(Vio *vio __attribute__((unused))) size_t vio_read(Vio * vio, uchar* buf, size_t size) { size_t r; + extern ssize_t my_recv_async(struct mysql_async_context *b, int fd, + unsigned char *buf, size_t size, uint timeout); DBUG_ENTER("vio_read"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); /* Ensure nobody uses vio_read_buff and vio_read simultaneously */ DBUG_ASSERT(vio->read_end == vio->read_pos); + if (vio->async_context && vio->async_context->active) + r= my_recv_async(vio->async_context, vio->sd, buf, size, vio->read_timeout); + else + { + if (vio->async_context) + { + /* + If switching from non-blocking to blocking API usage, set the socket + back to blocking mode. + */ + my_bool old_mode; + vio_blocking(vio, TRUE, &old_mode); + } #ifdef __WIN__ - r = recv(vio->sd, buf, size,0); + r = recv(vio->sd, buf, size,0); #else - errno=0; /* For linux */ - r = read(vio->sd, buf, size); + errno=0; /* For linux */ + r = read(vio->sd, buf, size); #endif /* __WIN__ */ + } #ifndef DBUG_OFF if (r == (size_t) -1) { @@ -102,14 +119,32 @@ size_t vio_read_buff(Vio *vio, uchar* buf, size_t size) size_t vio_write(Vio * vio, const uchar* buf, size_t size) { size_t r; + extern ssize_t my_send_async(struct mysql_async_context *b, int fd, + const unsigned char *buf, size_t size, + uint timeout); DBUG_ENTER("vio_write"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); + if (vio->async_context && vio->async_context->active) + r= my_send_async(vio->async_context, vio->sd, buf, size, + vio->write_timeout); + else + { + if (vio->async_context) + { + /* + If switching from non-blocking to blocking API usage, set the socket + back to blocking mode. + */ + my_bool old_mode; + vio_blocking(vio, TRUE, &old_mode); + } #ifdef __WIN__ - r = send(vio->sd, buf, size,0); + r = send(vio->sd, buf, size,0); #else - r = write(vio->sd, buf, size); + r = write(vio->sd, buf, size); #endif /* __WIN__ */ + } #ifndef DBUG_OFF if (r == (size_t) -1) { @@ -359,12 +394,17 @@ void vio_in_addr(Vio *vio, struct in_addr *in) my_bool vio_poll_read(Vio *vio,uint timeout) { + extern my_bool my_poll_read_async(struct mysql_async_context *b, + uint timeout); #ifndef HAVE_POLL #if __WIN__ int res; struct fd_set fds; struct timeval tv; DBUG_ENTER("vio_poll"); + + if (vio->async_context && vio->async_context->active) + DBUG_RETURN(my_poll_read_async(vio->async_context, timeout)); fds.fd_count= 1; fds.fd_array[0]= vio->sd; tv.tv_sec= timeout; @@ -372,12 +412,16 @@ my_bool vio_poll_read(Vio *vio,uint timeout) res= select(1, &fds, NULL, NULL, &tv) ? 0 : 1; DBUG_RETURN(res); #else + if (vio->async_context && vio->async_context->active) + return my_poll_read_async(vio->async_context, timeout); return 0; #endif #else struct pollfd fds; int res; DBUG_ENTER("vio_poll"); + if (vio->async_context && vio->async_context->active) + DBUG_RETURN(my_poll_read_async(vio->async_context, timeout)); fds.fd=vio->sd; fds.events=POLLIN; fds.revents=0; @@ -425,6 +469,11 @@ void vio_timeout(Vio *vio, uint which, uint timeout) thr_alarm or just run without read/write timeout(s) */ #endif + /* Make timeout values available for async operations. */ + if (which) + vio->write_timeout= timeout; + else + vio->read_timeout= timeout; } |