diff options
author | unknown <knielsen@knielsen-hq.org> | 2011-09-20 12:49:25 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2011-09-20 12:49:25 +0200 |
commit | a5b881594da4258257b18cc42f5ce7be3524e02c (patch) | |
tree | 6ddddaef439cce2f930abfab3929f9ad80d8c818 | |
parent | 1a344b87e4d153d52468307cc886b5f424cb2dbf (diff) | |
download | mariadb-git-a5b881594da4258257b18cc42f5ce7be3524e02c.tar.gz |
MWL#192: Non-blocking client API for libmysqlclient.
All client functions that can block on I/O have alternate _start() and
_cont() versions that do not block but return control back to the
application, which can then issue I/O wait in its own fashion and later
call back into the library to continue the operation.
Works behind the scenes by spawning a co-routine/fiber to run the
blocking operation and suspend it while waiting for I/O. This
co-routine/fiber use is invisible to applications.
For i368/x86_64 on GCC, uses very fast assembler co-routine support. On
Windows uses native Win32 Fibers. Falls back to POSIX ucontext on other
platforms. Assembler routines for more platforms are relatively easy to
add by extending mysys/my_context.c, eg. similar to the Lua lcoco
library.
For testing, mysqltest and mysql_client_test are extended with the
option --non-blocking-api. This causes the programs to use the
non-blocking API for database access. mysql-test-run.pl has a similar
option --non-blocking-api that uses this, as well as additional
testcases.
An example program tests/async_queries.c is included that uses the new
non-blocking API with libevent to show how, in a single-threaded
program, to issue many queries in parallel against a database.
client/async_example.c:
Fix const warning
******
Fix bug with wrong timeout value for poll().
include/Makefile.am:
Fix missing include for `make dist`
include/mysql.h:
Add prototypes for all non-blocking API calls.
include/mysql.h.pp:
Add prototypes for all non-blocking API calls.
mysys/my_context.c:
Fix type warning for makecontext() function pointer argument.
sql-common/mysql_async.c:
Fix crashes in the non-blocking API for functions that can take MYSQL argument
that is NULL.
tests/Makefile.am:
Add header file to `make dist`
tests/mysql_client_test.c:
Replace blocking calls with wrappers around the non-blocking calls, used in
mysql_client_test to test the new non-blocking API.
tests/nonblock-wrappers.h:
Replace blocking calls with wrappers around the non-blocking calls, used in
mysql_client_test to test the new non-blocking API.
41 files changed, 4389 insertions, 38 deletions
diff --git a/.bzrignore b/.bzrignore index 6b18d09723b..cd92df693fc 100644 --- a/.bzrignore +++ b/.bzrignore @@ -350,6 +350,7 @@ client/ssl_test client/thimble client/thread_test client/tmp.diff +client/async_example client_debug/* client_release/* client_test @@ -681,6 +682,7 @@ libmysqld/unireg.cc libmysqld/discover_xt.cc libmysqld/ha_pbxt.cc libmysqld/myxt_xt.cc +libmysqld/mysql_async.c libmysqltest/*.ds? libmysqltest/*.vcproj libmysqltest/mytest.c @@ -1867,6 +1869,7 @@ tests/bug25714 tests/client_test tests/connect_test tests/mysql_client_test +tests/async_queries thr_insert_test/* thr_test/* thread_test @@ -1952,6 +1955,7 @@ client/sql_list.cc client/sql_list.h libmysqld/client_plugin.c sql/client_plugin.c +sql/mysql_async.c *.dgcov libmysqld/create_options.cc storage/pbxt/bin/xtstat diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index ecfd3a0f0ce..53c445c21d8 100755 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -68,5 +68,8 @@ MYSQL_ADD_EXECUTABLE(mysqlslap mysqlslap.c DESTINATION bin) SET_SOURCE_FILES_PROPERTIES(mysqlslap.c PROPERTIES COMPILE_FLAGS "-DTHREADS") TARGET_LINK_LIBRARIES(mysqlslap mysqlclient mysys zlib wsock32 dbug) +MYSQL_ADD_EXECUTABLE(async_example async_example.c) +TARGET_LINK_LIBRARIES(async_example mysqlclient mysys zlib wsock32 dbug) + MYSQL_ADD_EXECUTABLE(echo echo.c COMPONENT Test) diff --git a/client/Makefile.am b/client/Makefile.am index f3f964ace7b..f4d9c8faf73 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -51,6 +51,8 @@ bin_PROGRAMS = mysql \ mysqltest \ mysql_upgrade +noinst_PROGRAMS = async_example + mysql_SOURCES = mysql.cc readline.cc sql_string.cc \ completion_hash.cc mysql_LDADD = @readline_link@ @TERMCAP_LIB@ \ @@ -96,6 +98,11 @@ mysqltest_LDADD = $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) \ mysql_upgrade_SOURCES= mysql_upgrade.c \ $(top_srcdir)/mysys/my_getpagesize.c +async_example_SOURCES= async_example.c +async_example_LDADD = $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) \ + @CLIENT_EXTRA_LDFLAGS@ \ + $(LIBMYSQLCLIENT_LA) + # Fix for mit-threads DEFS = -DMYSQL_CLIENT_NO_THREADS \ -DDEFAULT_MYSQL_HOME='"$(prefix)"' \ diff --git a/client/async_example.c b/client/async_example.c new file mode 100644 index 00000000000..de9d455171c --- /dev/null +++ b/client/async_example.c @@ -0,0 +1,207 @@ +/* + Copyright 2011 Kristian Nielsen and Monty Program Ab. + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + + +#ifndef __WIN__ +#include <poll.h> +#else +#include <WinSock2.h> +#endif + +#include <stdlib.h> +#include <stdio.h> +#include <mysql.h> + +#define SL(s) (s), sizeof(s) + +static const char *my_groups[]= { "client", NULL }; + +static int +wait_for_mysql(MYSQL *mysql, int status) +{ +#ifdef __WIN__ + fd_set rs, ws, es; + int res; + struct timeval tv, *timeout; + my_socket s= mysql_get_socket(mysql); + FD_ZERO(&rs); + FD_ZERO(&ws); + FD_ZERO(&es); + if (status & MYSQL_WAIT_READ) + FD_SET(s, &rs); + if (status & MYSQL_WAIT_WRITE) + FD_SET(s, &ws); + if (status & MYSQL_WAIT_EXCEPT) + FD_SET(s, &es); + if (status & MYSQL_WAIT_TIMEOUT) + { + tv.tv_sec= mysql_get_timeout_value(mysql); + tv.tv_usec= 0; + timeout= &tv; + } + else + timeout= NULL; + res= select(1, &rs, &ws, &es, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res == SOCKET_ERROR) + { + /* + In a real event framework, we should handle errors and re-try the select. + */ + return MYSQL_WAIT_TIMEOUT; + } + else + { + int status= 0; + if (FD_ISSET(s, &rs)) + status|= MYSQL_WAIT_READ; + if (FD_ISSET(s, &ws)) + status|= MYSQL_WAIT_WRITE; + if (FD_ISSET(s, &es)) + status|= MYSQL_WAIT_EXCEPT; + return status; + } +#else + struct pollfd pfd; + int timeout; + int res; + + pfd.fd= mysql_get_socket(mysql); + pfd.events= + (status & MYSQL_WAIT_READ ? POLLIN : 0) | + (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | + (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); + if (status & MYSQL_WAIT_TIMEOUT) + timeout= 1000*mysql_get_timeout_value(mysql); + else + timeout= -1; + res= poll(&pfd, 1, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res < 0) + { + /* + In a real event framework, we should handle EINTR and re-try the poll. + */ + return MYSQL_WAIT_TIMEOUT; + } + else + { + int status= 0; + if (pfd.revents & POLLIN) + status|= MYSQL_WAIT_READ; + if (pfd.revents & POLLOUT) + status|= MYSQL_WAIT_WRITE; + if (pfd.revents & POLLPRI) + status|= MYSQL_WAIT_EXCEPT; + return status; + } +#endif +} + +static void +fatal(MYSQL *mysql, const char *msg) +{ + fprintf(stderr, "%s: %s\n", msg, mysql_error(mysql)); + exit(1); +} + +static void +doit(const char *host, const char *user, const char *password) +{ + int err; + MYSQL mysql, *ret; + MYSQL_RES *res; + MYSQL_ROW row; + int status; + + mysql_init(&mysql); + mysql_options(&mysql, MYSQL_READ_DEFAULT_GROUP, "myapp"); + + /* Returns 0 when done, else flag for what to wait for when need to block. */ + status= mysql_real_connect_start(&ret, &mysql, host, user, password, NULL, + 0, NULL, 0); + while (status) + { + status= wait_for_mysql(&mysql, status); + status= mysql_real_connect_cont(&ret, &mysql, status); + } + + if (!ret) + fatal(&mysql, "Failed to mysql_real_connect()"); + + status= mysql_real_query_start(&err, &mysql, SL("SHOW STATUS")); + while (status) + { + status= wait_for_mysql(&mysql, status); + status= mysql_real_query_cont(&err, &mysql, status); + } + if (err) + fatal(&mysql, "mysql_real_query() returns error"); + + /* This method cannot block. */ + res= mysql_use_result(&mysql); + if (!res) + fatal(&mysql, "mysql_use_result() returns error"); + + for (;;) + { + status= mysql_fetch_row_start(&row, res); + while (status) + { + status= wait_for_mysql(&mysql, status); + status= mysql_fetch_row_cont(&row, res, status); + } + if (!row) + break; + printf("%s: %s\n", row[0], row[1]); + } + if (mysql_errno(&mysql)) + fatal(&mysql, "Got error while retrieving rows"); + mysql_free_result(res); + + /* I suppose this must be non-blocking too. */ + mysql_close(&mysql); +} + +int +main(int argc, char *argv[]) +{ + int err; + + if (argc != 4) + { + fprintf(stderr, "Usage: %s <host> <user> <password>\n", argv[0]); + exit(1); + } + + err= mysql_library_init(argc, argv, (char **)my_groups); + if (err) + { + fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n", err); + exit(1); + } + + doit(argv[1], argv[2], argv[3]); + + mysql_library_end(); + + return 0; +} diff --git a/client/mysqltest.cc b/client/mysqltest.cc index 772dd69c089..7aeaa48519d 100644 --- a/client/mysqltest.cc +++ b/client/mysqltest.cc @@ -60,6 +60,12 @@ #define SIGNAL_FMT "signal %d" #endif +static my_bool non_blocking_api_enabled= 0; +#if !defined(EMBEDDED_LIBRARY) +#define WRAP_NONBLOCK_ENABLED non_blocking_api_enabled +#include "../tests/nonblock-wrappers.h" +#endif + /* Use cygwin for --exec and --system before 5.0 */ #if MYSQL_VERSION_ID < 50000 #define USE_CYGWIN @@ -84,7 +90,7 @@ enum { OPT_PS_PROTOCOL, OPT_SP_PROTOCOL, OPT_CURSOR_PROTOCOL, OPT_VIEW_PROTOCOL, OPT_MAX_CONNECT_RETRIES, OPT_MAX_CONNECTIONS, OPT_MARK_PROGRESS, OPT_LOG_DIR, OPT_TAIL_LINES, - OPT_GLOBAL_SUBST, OPT_MY_CONNECT_TIMEOUT + OPT_GLOBAL_SUBST, OPT_MY_CONNECT_TIMEOUT, OPT_NON_BLOCKING_API }; static int record= 0, opt_sleep= -1; @@ -305,6 +311,7 @@ enum enum_commands { Q_LOWERCASE, Q_START_TIMER, Q_END_TIMER, Q_CHARACTER_SET, Q_DISABLE_PS_PROTOCOL, Q_ENABLE_PS_PROTOCOL, + Q_ENABLE_NON_BLOCKING_API, Q_DISABLE_NON_BLOCKING_API, Q_DISABLE_RECONNECT, Q_ENABLE_RECONNECT, Q_IF, Q_DISABLE_PARSING, Q_ENABLE_PARSING, @@ -386,6 +393,8 @@ const char *command_names[]= "character_set", "disable_ps_protocol", "enable_ps_protocol", + "enable_non_blocking_api", + "disable_non_blocking_api", "disable_reconnect", "enable_reconnect", "if", @@ -5235,7 +5244,10 @@ void do_connect(struct st_command *command) int con_port= opt_port; char *con_options; my_bool con_ssl= 0, con_compress= 0; - my_bool con_pipe= 0, con_shm= 0; + my_bool con_pipe= 0; +#ifdef HAVE_SMEM + my_bool con_shm= 0; +#endif struct st_connection* con_slot; static DYNAMIC_STRING ds_connection_name; @@ -5324,7 +5336,11 @@ void do_connect(struct st_command *command) else if (length == 4 && !strncmp(con_options, "PIPE", 4)) con_pipe= 1; else if (length == 3 && !strncmp(con_options, "SHM", 3)) +#ifdef HAVE_SMEM con_shm= 1; +#else + { } +#endif else die("Illegal option to connect: %.*s", (int) (end - con_options), con_options); @@ -6146,6 +6162,10 @@ static struct my_option my_long_options[] = "Use prepared-statement protocol for communication.", &ps_protocol, &ps_protocol, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"non-blocking-api", OPT_NON_BLOCKING_API, + "Use the non-blocking client API for communication.", + &non_blocking_api_enabled, &non_blocking_api_enabled, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"quiet", 's', "Suppress all normal output.", &silent, &silent, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"record", 'r', "Record output of test_file into result file.", @@ -8130,6 +8150,7 @@ int main(int argc, char **argv) next_con= connections + 1; var_set_int("$PS_PROTOCOL", ps_protocol); + var_set_int("$NON_BLOCKING_API", non_blocking_api_enabled); var_set_int("$SP_PROTOCOL", sp_protocol); var_set_int("$VIEW_PROTOCOL", view_protocol); var_set_int("$CURSOR_PROTOCOL", cursor_protocol); @@ -8538,6 +8559,12 @@ int main(int argc, char **argv) case Q_ENABLE_PS_PROTOCOL: ps_protocol_enabled= ps_protocol; break; + case Q_DISABLE_NON_BLOCKING_API: + non_blocking_api_enabled= 0; + break; + case Q_ENABLE_NON_BLOCKING_API: + non_blocking_api_enabled= 1; + break; case Q_DISABLE_RECONNECT: set_reconnect(cur_con->mysql, 0); break; diff --git a/dbug/dbug.c b/dbug/dbug.c index 0ea0b0df6ad..6fa7da37504 100644 --- a/dbug/dbug.c +++ b/dbug/dbug.c @@ -404,6 +404,27 @@ static CODE_STATE *code_state(void) return cs; } +void +dbug_swap_code_state(void **code_state_store) +{ + CODE_STATE *cs, **cs_ptr; + + if (!(cs_ptr= (CODE_STATE**) my_thread_var_dbug())) + return; + cs= *cs_ptr; + *cs_ptr= *code_state_store; + *code_state_store= cs; +} + +void dbug_free_code_state(void **code_state_store) +{ + if (*code_state_store) + { + free(*code_state_store); + *code_state_store= NULL; + } +} + #else /* !THREAD */ static CODE_STATE static_code_state= @@ -424,6 +445,35 @@ static CODE_STATE *code_state(void) return &static_code_state; } +void +dbug_swap_code_state(void **code_state_store) +{ + CODE_STATE temp, *cs; + + if (!(cs= *code_state_store)) + { + cs= (CODE_STATE *)DbugMalloc(sizeof(*cs)); + cs->process= db_process ? db_process : "dbug"; + cs->func="?func"; + cs->file="?file"; + cs->stack=&init_settings; + *code_state_store= cs; + } + memcpy(&temp, cs, sizeof(*cs)); + memcpy(cs, &static_code_state, sizeof(*cs)); + memcpy(&static_code_state, &temp, sizeof(*cs)); +} + +void +dbug_free_code_state(void **code_state_store) +{ + if (*code_state_store) + { + free(*code_state_store); + *code_state_store= NULL; + } +} + #define pthread_mutex_lock(A) {} #define pthread_mutex_unlock(A) {} #endif diff --git a/include/Makefile.am b/include/Makefile.am index e23fd94a7fc..e13e7755670 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -46,7 +46,7 @@ noinst_HEADERS = config-win.h config-netware.h lf.h my_bit.h \ atomic/rwlock.h atomic/x86-gcc.h \ atomic/generic-msvc.h \ atomic/gcc_builtins.h my_libwrap.h my_stacktrace.h \ - wqueue.h waiting_threads.h + wqueue.h waiting_threads.h my_context.h EXTRA_DIST = mysql.h.pp mysql/plugin_auth.h.pp mysql/client_plugin.h.pp CMakeLists.txt diff --git a/include/my_context.h b/include/my_context.h new file mode 100644 index 00000000000..e19ee89a8be --- /dev/null +++ b/include/my_context.h @@ -0,0 +1,222 @@ +/* + Copyright 2011 Kristian Nielsen + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* + Simple API for spawning a co-routine, to be used for async libmysqlclient. + + Idea is that by implementing this interface using whatever facilities are + available for given platform, we can use the same code for the generic + libmysqlclient-async code. + + (This particular implementation uses Posix ucontext swapcontext().) +*/ + +#ifdef __WIN__ +#define MY_CONTEXT_USE_WIN32_FIBERS 1 +#elif defined(__GNUC__) && __GNUC__ >= 3 && defined(__x86_64__) +#define MY_CONTEXT_USE_X86_64_GCC_ASM +#elif defined(__GNUC__) && __GNUC__ >= 3 && defined(__i386__) +#define MY_CONTEXT_USE_I386_GCC_ASM +#else +#define MY_CONTEXT_USE_UCONTEXT +#endif + +#ifdef MY_CONTEXT_USE_WIN32_FIBERS +struct my_context { + void (*user_func)(void *); + void *user_arg; + void *app_fiber; + void *lib_fiber; + int return_value; +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_UCONTEXT +#include <ucontext.h> + +struct my_context { + void (*user_func)(void *); + void *user_data; + void *stack; + size_t stack_size; + ucontext_t base_context; + ucontext_t spawned_context; + int active; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_X86_64_GCC_ASM +#include <stdint.h> + +struct my_context { + uint64_t save[9]; + void *stack_top; + void *stack_bot; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_I386_GCC_ASM +#include <stdint.h> + +struct my_context { + uint64_t save[7]; + void *stack_top; + void *stack_bot; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +/* + Initialize an asynchroneous context object. + Returns 0 on success, non-zero on failure. +*/ +extern int my_context_init(struct my_context *c, size_t stack_size); + +/* Free an asynchroneous context object, deallocating any resources used. */ +extern void my_context_destroy(struct my_context *c); + +/* + Spawn an asynchroneous context. The context will run the supplied user + function, passing the supplied user data pointer. + + The context must have been initialised with my_context_init() prior to + this call. + + The user function may call my_context_yield(), which will cause this + function to return 1. Then later my_context_continue() may be called, which + will resume the asynchroneous context by returning from the previous + my_context_yield() call. + + When the user function returns, this function returns 0. + + In case of error, -1 is returned. +*/ +extern int my_context_spawn(struct my_context *c, void (*f)(void *), void *d); + +/* + Suspend an asynchroneous context started with my_context_spawn. + + When my_context_yield() is called, execution immediately returns from the + last my_context_spawn() or my_context_continue() call. Then when later + my_context_continue() is called, execution resumes by returning from this + my_context_yield() call. + + Returns 0 if ok, -1 in case of error. +*/ +extern int my_context_yield(struct my_context *c); + +/* + Resume an asynchroneous context. The context was spawned by + my_context_spawn(), and later suspended inside my_context_yield(). + + The asynchroneous context may be repeatedly suspended with + my_context_yield() and resumed with my_context_continue(). + + Each time it is suspended, this function returns 1. When the originally + spawned user function returns, this function returns 0. + + In case of error, -1 is returned. +*/ +extern int my_context_continue(struct my_context *c); + + +struct mysql_async_context { + /* + This is set to the value that should be returned from foo_start() or + foo_cont() when a call is suspended. + It is also set to the event(s) that triggered when a suspended call is + resumed, eg. whether we woke up due to connection completed or timeout + in mysql_real_connect_cont(). + */ + unsigned int ret_status; + /* + This is set to the result of the whole asynchronous operation when it + completes. It uses a union, as different calls have different return + types. + */ + union { + void *r_ptr; + const void *r_const_ptr; + int r_int; + my_bool r_my_bool; + } ret_result; + /* + The timeout value, for suspended calls that need to wake up on a timeout + (eg. mysql_real_connect_start(). + */ + unsigned int timeout_value; + /* + This flag is set when we are executing inside some asynchronous call + foo_start() or foo_cont(). It is used to decide whether to use the + synchronous or asynchronous version of calls that may block such as + recv(). + + Note that this flag is not set when a call is suspended, eg. after + returning from foo_start() and before re-entering foo_cont(). + */ + my_bool active; + /* + This flag is set when an asynchronous operation is in progress, but + suspended. Ie. it is set when foo_start() or foo_cont() returns because + the operation needs to block, suspending the operation. + + It is used to give an error (rather than crash) if the application + attempts to call some foo_cont() method when no suspended operation foo is + in progress. + */ + my_bool suspended; + /* + If non-NULL, this is a pointer to a callback hook that will be invoked with + the user data argument just before the context is suspended, and just after + it is resumed. + */ + void (*suspend_resume_hook)(my_bool suspend, void *user_data); + void *suspend_resume_hook_user_data; + /* + This is used to save the execution contexts so that we can suspend an + operation and switch back to the application context, to resume the + suspended context later when the application re-invokes us with + foo_cont(). + */ + struct my_context async_context; +}; diff --git a/include/my_dbug.h b/include/my_dbug.h index ef30c95e32a..a58c6588d61 100644 --- a/include/my_dbug.h +++ b/include/my_dbug.h @@ -83,6 +83,8 @@ extern void _db_lock_file_(void); extern void _db_unlock_file_(void); extern FILE *_db_fp_(void); extern void _db_flush_(); +extern void dbug_swap_code_state(void **code_state_store); +extern void dbug_free_code_state(void **code_state_store); #ifdef __cplusplus diff --git a/include/mysql.h b/include/mysql.h index 19aab89283b..b1ef4720879 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -264,6 +264,8 @@ typedef struct character_set struct st_mysql_methods; struct st_mysql_stmt; +struct st_mysql_extension; + typedef struct st_mysql { NET net; /* Communication parameters */ @@ -318,7 +320,7 @@ typedef struct st_mysql my_bool *unbuffered_fetch_owner; /* needed for embedded server - no net buffer to store the 'info' */ char *info_buffer; - void *extension; + struct st_mysql_extension *extension; } MYSQL; @@ -376,6 +378,19 @@ typedef struct st_mysql_parameters void *extension; } MYSQL_PARAMETERS; +/* + Flag bits, the asynchronous methods return a combination of these ORed + together to let the application know when to resume the suspended operation. +*/ +typedef enum { + MYSQL_WAIT_READ= 1, /* Wait for data to be available on socket to read */ + /* mysql_get_socket_fd() will return socket descriptor*/ + MYSQL_WAIT_WRITE= 2, /* Wait for socket to be ready to write data */ + MYSQL_WAIT_EXCEPT= 4, /* Wait for select() to mark exception on socket */ + MYSQL_WAIT_TIMEOUT= 8 /* Wait until timeout occurs. Value of timeout can be */ + /* obtained from mysql_get_timeout_value() */ +} MYSQL_ASYNC_STATUS; + #if !defined(MYSQL_SERVER) && !defined(EMBEDDED_LIBRARY) #define max_allowed_packet (*mysql_get_parameters()->p_max_allowed_packet) #define net_buffer_length (*mysql_get_parameters()->p_net_buffer_length) @@ -437,6 +452,10 @@ const char * STDCALL mysql_info(MYSQL *mysql); unsigned long STDCALL mysql_thread_id(MYSQL *mysql); const char * STDCALL mysql_character_set_name(MYSQL *mysql); int STDCALL mysql_set_character_set(MYSQL *mysql, const char *csname); +int STDCALL mysql_set_character_set_start(int *ret, MYSQL *mysql, + const char *csname); +int STDCALL mysql_set_character_set_cont(int *ret, MYSQL *mysql, + int status); MYSQL * STDCALL mysql_init(MYSQL *mysql); my_bool STDCALL mysql_ssl_set(MYSQL *mysql, const char *key, @@ -445,6 +464,12 @@ my_bool STDCALL mysql_ssl_set(MYSQL *mysql, const char *key, const char * STDCALL mysql_get_ssl_cipher(MYSQL *mysql); my_bool STDCALL mysql_change_user(MYSQL *mysql, const char *user, const char *passwd, const char *db); +int STDCALL mysql_change_user_start(my_bool *ret, MYSQL *mysql, + const char *user, + const char *passwd, + const char *db); +int STDCALL mysql_change_user_cont(my_bool *ret, MYSQL *mysql, + int status); MYSQL * STDCALL mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, @@ -452,13 +477,44 @@ MYSQL * STDCALL mysql_real_connect(MYSQL *mysql, const char *host, unsigned int port, const char *unix_socket, unsigned long clientflag); +int STDCALL mysql_real_connect_start(MYSQL **ret, MYSQL *mysql, + const char *host, + const char *user, + const char *passwd, + const char *db, + unsigned int port, + const char *unix_socket, + unsigned long clientflag); +int STDCALL mysql_real_connect_cont(MYSQL **ret, MYSQL *mysql, + int status); int STDCALL mysql_select_db(MYSQL *mysql, const char *db); +int STDCALL mysql_select_db_start(int *ret, MYSQL *mysql, + const char *db); +int STDCALL mysql_select_db_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_query(MYSQL *mysql, const char *q); +int STDCALL mysql_query_start(int *ret, MYSQL *mysql, + const char *q); +int STDCALL mysql_query_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_send_query(MYSQL *mysql, const char *q, unsigned long length); +int STDCALL mysql_send_query_start(int *ret, MYSQL *mysql, + const char *q, + unsigned long length); +int STDCALL mysql_send_query_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_real_query(MYSQL *mysql, const char *q, unsigned long length); +int STDCALL mysql_real_query_start(int *ret, MYSQL *mysql, + const char *q, + unsigned long length); +int STDCALL mysql_real_query_cont(int *ret, MYSQL *mysql, + int status); MYSQL_RES * STDCALL mysql_store_result(MYSQL *mysql); +int STDCALL mysql_store_result_start(MYSQL_RES **ret, MYSQL *mysql); +int STDCALL mysql_store_result_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * STDCALL mysql_use_result(MYSQL *mysql); /* perform query on master */ @@ -526,15 +582,39 @@ int STDCALL mysql_add_slave(MYSQL* mysql, const char* host, int STDCALL mysql_shutdown(MYSQL *mysql, enum mysql_enum_shutdown_level shutdown_level); +int STDCALL mysql_shutdown_start(int *ret, MYSQL *mysql, + enum mysql_enum_shutdown_level + shutdown_level); +int STDCALL mysql_shutdown_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_dump_debug_info(MYSQL *mysql); +int STDCALL mysql_dump_debug_info_start(int *ret, MYSQL *mysql); +int STDCALL mysql_dump_debug_info_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_refresh(MYSQL *mysql, unsigned int refresh_options); +int STDCALL mysql_refresh_start(int *ret, MYSQL *mysql, + unsigned int refresh_options); +int STDCALL mysql_refresh_cont(int *ret, MYSQL *mysql, int status); int STDCALL mysql_kill(MYSQL *mysql,unsigned long pid); +int STDCALL mysql_kill_start(int *ret, MYSQL *mysql, + unsigned long pid); +int STDCALL mysql_kill_cont(int *ret, MYSQL *mysql, int status); int STDCALL mysql_set_server_option(MYSQL *mysql, enum enum_mysql_set_option option); +int STDCALL mysql_set_server_option_start(int *ret, MYSQL *mysql, + enum enum_mysql_set_option + option); +int STDCALL mysql_set_server_option_cont(int *ret, MYSQL *mysql, + int status); int STDCALL mysql_ping(MYSQL *mysql); +int STDCALL mysql_ping_start(int *ret, MYSQL *mysql); +int STDCALL mysql_ping_cont(int *ret, MYSQL *mysql, int status); const char * STDCALL mysql_stat(MYSQL *mysql); +int STDCALL mysql_stat_start(const char **ret, MYSQL *mysql); +int STDCALL mysql_stat_cont(const char **ret, MYSQL *mysql, + int status); const char * STDCALL mysql_get_server_info(MYSQL *mysql); const char * STDCALL mysql_get_server_name(MYSQL *mysql); const char * STDCALL mysql_get_client_info(void); @@ -543,11 +623,25 @@ const char * STDCALL mysql_get_host_info(MYSQL *mysql); unsigned long STDCALL mysql_get_server_version(MYSQL *mysql); unsigned int STDCALL mysql_get_proto_info(MYSQL *mysql); MYSQL_RES * STDCALL mysql_list_dbs(MYSQL *mysql,const char *wild); +int STDCALL mysql_list_dbs_start(MYSQL_RES **ret, MYSQL *mysql, + const char *wild); +int STDCALL mysql_list_dbs_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * STDCALL mysql_list_tables(MYSQL *mysql,const char *wild); +int STDCALL mysql_list_tables_start(MYSQL_RES **ret, MYSQL *mysql, + const char *wild); +int STDCALL mysql_list_tables_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * STDCALL mysql_list_processes(MYSQL *mysql); +int STDCALL mysql_list_processes_start(MYSQL_RES **ret, + MYSQL *mysql); +int STDCALL mysql_list_processes_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); int STDCALL mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg); void STDCALL mysql_free_result(MYSQL_RES *result); +int STDCALL mysql_free_result_start(MYSQL_RES *result); +int STDCALL mysql_free_result_cont(MYSQL_RES *result, int status); void STDCALL mysql_data_seek(MYSQL_RES *result, my_ulonglong offset); MYSQL_ROW_OFFSET STDCALL mysql_row_seek(MYSQL_RES *result, @@ -555,10 +649,19 @@ MYSQL_ROW_OFFSET STDCALL mysql_row_seek(MYSQL_RES *result, MYSQL_FIELD_OFFSET STDCALL mysql_field_seek(MYSQL_RES *result, MYSQL_FIELD_OFFSET offset); MYSQL_ROW STDCALL mysql_fetch_row(MYSQL_RES *result); +int STDCALL mysql_fetch_row_start(MYSQL_ROW *ret, + MYSQL_RES *result); +int STDCALL mysql_fetch_row_cont(MYSQL_ROW *ret, MYSQL_RES *result, + int status); unsigned long * STDCALL mysql_fetch_lengths(MYSQL_RES *result); MYSQL_FIELD * STDCALL mysql_fetch_field(MYSQL_RES *result); MYSQL_RES * STDCALL mysql_list_fields(MYSQL *mysql, const char *table, const char *wild); +int STDCALL mysql_list_fields_start(MYSQL_RES **ret, MYSQL *mysql, + const char *table, + const char *wild); +int STDCALL mysql_list_fields_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); unsigned long STDCALL mysql_escape_string(char *to,const char *from, unsigned long from_length); unsigned long STDCALL mysql_hex_string(char *to,const char *from, @@ -584,6 +687,10 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf, int res_buf_size); my_bool STDCALL mysql_read_query_result(MYSQL *mysql); +int STDCALL mysql_read_query_result_start(my_bool *ret, + MYSQL *mysql); +int STDCALL mysql_read_query_result_cont(my_bool *ret, + MYSQL *mysql, int status); /* @@ -760,16 +867,25 @@ enum enum_stmt_attr_type STMT_ATTR_PREFETCH_ROWS }; - MYSQL_STMT * STDCALL mysql_stmt_init(MYSQL *mysql); int STDCALL mysql_stmt_prepare(MYSQL_STMT *stmt, const char *query, unsigned long length); +int STDCALL mysql_stmt_prepare_start(int *ret, MYSQL_STMT *stmt, + const char *query, unsigned long length); +int STDCALL mysql_stmt_prepare_cont(int *ret, MYSQL_STMT *stmt, int status); int STDCALL mysql_stmt_execute(MYSQL_STMT *stmt); +int STDCALL mysql_stmt_execute_start(int *ret, MYSQL_STMT *stmt); +int STDCALL mysql_stmt_execute_cont(int *ret, MYSQL_STMT *stmt, int status); int STDCALL mysql_stmt_fetch(MYSQL_STMT *stmt); +int STDCALL mysql_stmt_fetch_start(int *ret, MYSQL_STMT *stmt); +int STDCALL mysql_stmt_fetch_cont(int *ret, MYSQL_STMT *stmt, int status); int STDCALL mysql_stmt_fetch_column(MYSQL_STMT *stmt, MYSQL_BIND *bind_arg, unsigned int column, unsigned long offset); int STDCALL mysql_stmt_store_result(MYSQL_STMT *stmt); +int STDCALL mysql_stmt_store_result_start(int *ret, MYSQL_STMT *stmt); +int STDCALL mysql_stmt_store_result_cont(int *ret, MYSQL_STMT *stmt, + int status); unsigned long STDCALL mysql_stmt_param_count(MYSQL_STMT * stmt); my_bool STDCALL mysql_stmt_attr_set(MYSQL_STMT *stmt, enum enum_stmt_attr_type attr_type, @@ -780,12 +896,25 @@ my_bool STDCALL mysql_stmt_attr_get(MYSQL_STMT *stmt, my_bool STDCALL mysql_stmt_bind_param(MYSQL_STMT * stmt, MYSQL_BIND * bnd); my_bool STDCALL mysql_stmt_bind_result(MYSQL_STMT * stmt, MYSQL_BIND * bnd); my_bool STDCALL mysql_stmt_close(MYSQL_STMT * stmt); +int STDCALL mysql_stmt_close_start(my_bool *ret, MYSQL_STMT *stmt); +int STDCALL mysql_stmt_close_cont(my_bool *ret, MYSQL_STMT * stmt, int status); my_bool STDCALL mysql_stmt_reset(MYSQL_STMT * stmt); +int STDCALL mysql_stmt_reset_start(my_bool *ret, MYSQL_STMT * stmt); +int STDCALL mysql_stmt_reset_cont(my_bool *ret, MYSQL_STMT *stmt, int status); my_bool STDCALL mysql_stmt_free_result(MYSQL_STMT *stmt); +int STDCALL mysql_stmt_free_result_start(my_bool *ret, MYSQL_STMT *stmt); +int STDCALL mysql_stmt_free_result_cont(my_bool *ret, MYSQL_STMT *stmt, + int status); my_bool STDCALL mysql_stmt_send_long_data(MYSQL_STMT *stmt, unsigned int param_number, const char *data, unsigned long length); +int STDCALL mysql_stmt_send_long_data_start(my_bool *ret, MYSQL_STMT *stmt, + unsigned int param_number, + const char *data, + unsigned long len); +int STDCALL mysql_stmt_send_long_data_cont(my_bool *ret, MYSQL_STMT *stmt, + int status); MYSQL_RES *STDCALL mysql_stmt_result_metadata(MYSQL_STMT *stmt); MYSQL_RES *STDCALL mysql_stmt_param_metadata(MYSQL_STMT *stmt); unsigned int STDCALL mysql_stmt_errno(MYSQL_STMT * stmt); @@ -801,12 +930,24 @@ my_ulonglong STDCALL mysql_stmt_insert_id(MYSQL_STMT *stmt); unsigned int STDCALL mysql_stmt_field_count(MYSQL_STMT *stmt); my_bool STDCALL mysql_commit(MYSQL * mysql); +int STDCALL mysql_commit_start(my_bool *ret, MYSQL * mysql); +int STDCALL mysql_commit_cont(my_bool *ret, MYSQL * mysql, int status); my_bool STDCALL mysql_rollback(MYSQL * mysql); +int STDCALL mysql_rollback_start(my_bool *ret, MYSQL * mysql); +int STDCALL mysql_rollback_cont(my_bool *ret, MYSQL * mysql, int status); my_bool STDCALL mysql_autocommit(MYSQL * mysql, my_bool auto_mode); +int STDCALL mysql_autocommit_start(my_bool *ret, MYSQL * mysql, + my_bool auto_mode); +int STDCALL mysql_autocommit_cont(my_bool *ret, MYSQL * mysql, int status); my_bool STDCALL mysql_more_results(MYSQL *mysql); int STDCALL mysql_next_result(MYSQL *mysql); +int STDCALL mysql_next_result_start(int *ret, MYSQL *mysql); +int STDCALL mysql_next_result_cont(int *ret, MYSQL *mysql, int status); void STDCALL mysql_close(MYSQL *sock); - +int STDCALL mysql_close_start(MYSQL *sock); +int STDCALL mysql_close_cont(MYSQL *sock, int status); +my_socket STDCALL mysql_get_socket(const MYSQL *mysql); +unsigned int STDCALL mysql_get_timeout_value(const MYSQL *mysql); /* status return codes */ #define MYSQL_NO_DATA 100 @@ -817,7 +958,20 @@ void STDCALL mysql_close(MYSQL *sock); #ifdef USE_OLD_FUNCTIONS MYSQL * STDCALL mysql_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd); +int STDCALL mysql_connect_start(MYSQL **ret, MYSQL *mysql, + const char *host, const char *user, + const char *passwd); +int STDCALL mysql_connect_cont(MYSQL **ret, MYSQL *mysql, + int status); int STDCALL mysql_create_db(MYSQL *mysql, const char *DB); +int STDCALL mysql_create_db_start(int *ret, MYSQL *mysql, + const char *DB); +int STDCALL mysql_create_db_cont(int *ret, MYSQL *mysql, + int status); +int STDCALL mysql_drop_db(MYSQL *mysql, const char *DB); +int STDCALL mysql_drop_db_start(int *ret, MYSQL *mysql, + const char *DB); +int STDCALL mysql_drop_db_cont(int *ret, MYSQL *mysql, int status); int STDCALL mysql_drop_db(MYSQL *mysql, const char *DB); #define mysql_reload(mysql) mysql_refresh((mysql),REFRESH_GRANT) #endif diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 6e71f886eba..f66d92c06a1 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -318,6 +318,7 @@ typedef struct character_set } MY_CHARSET_INFO; struct st_mysql_methods; struct st_mysql_stmt; +struct st_mysql_extension; typedef struct st_mysql { NET net; @@ -353,7 +354,7 @@ typedef struct st_mysql void *thd; my_bool *unbuffered_fetch_owner; char *info_buffer; - void *extension; + struct st_mysql_extension *extension; } MYSQL; typedef struct st_mysql_res { my_ulonglong row_count; @@ -391,6 +392,12 @@ typedef struct st_mysql_parameters unsigned long *p_net_buffer_length; void *extension; } MYSQL_PARAMETERS; +typedef enum { + MYSQL_WAIT_READ= 1, + MYSQL_WAIT_WRITE= 2, + MYSQL_WAIT_EXCEPT= 4, + MYSQL_WAIT_TIMEOUT= 8 +} MYSQL_ASYNC_STATUS; int mysql_server_init(int argc, char **argv, char **groups); void mysql_server_end(void); MYSQL_PARAMETERS * mysql_get_parameters(void); @@ -415,6 +422,10 @@ const char * mysql_info(MYSQL *mysql); unsigned long mysql_thread_id(MYSQL *mysql); const char * mysql_character_set_name(MYSQL *mysql); int mysql_set_character_set(MYSQL *mysql, const char *csname); +int mysql_set_character_set_start(int *ret, MYSQL *mysql, + const char *csname); +int mysql_set_character_set_cont(int *ret, MYSQL *mysql, + int status); MYSQL * mysql_init(MYSQL *mysql); my_bool mysql_ssl_set(MYSQL *mysql, const char *key, const char *cert, const char *ca, @@ -422,6 +433,12 @@ my_bool mysql_ssl_set(MYSQL *mysql, const char *key, const char * mysql_get_ssl_cipher(MYSQL *mysql); my_bool mysql_change_user(MYSQL *mysql, const char *user, const char *passwd, const char *db); +int mysql_change_user_start(my_bool *ret, MYSQL *mysql, + const char *user, + const char *passwd, + const char *db); +int mysql_change_user_cont(my_bool *ret, MYSQL *mysql, + int status); MYSQL * mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, @@ -429,13 +446,44 @@ MYSQL * mysql_real_connect(MYSQL *mysql, const char *host, unsigned int port, const char *unix_socket, unsigned long clientflag); +int mysql_real_connect_start(MYSQL **ret, MYSQL *mysql, + const char *host, + const char *user, + const char *passwd, + const char *db, + unsigned int port, + const char *unix_socket, + unsigned long clientflag); +int mysql_real_connect_cont(MYSQL **ret, MYSQL *mysql, + int status); int mysql_select_db(MYSQL *mysql, const char *db); +int mysql_select_db_start(int *ret, MYSQL *mysql, + const char *db); +int mysql_select_db_cont(int *ret, MYSQL *mysql, + int status); int mysql_query(MYSQL *mysql, const char *q); +int mysql_query_start(int *ret, MYSQL *mysql, + const char *q); +int mysql_query_cont(int *ret, MYSQL *mysql, + int status); int mysql_send_query(MYSQL *mysql, const char *q, unsigned long length); +int mysql_send_query_start(int *ret, MYSQL *mysql, + const char *q, + unsigned long length); +int mysql_send_query_cont(int *ret, MYSQL *mysql, + int status); int mysql_real_query(MYSQL *mysql, const char *q, unsigned long length); +int mysql_real_query_start(int *ret, MYSQL *mysql, + const char *q, + unsigned long length); +int mysql_real_query_cont(int *ret, MYSQL *mysql, + int status); MYSQL_RES * mysql_store_result(MYSQL *mysql); +int mysql_store_result_start(MYSQL_RES **ret, MYSQL *mysql); +int mysql_store_result_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * mysql_use_result(MYSQL *mysql); my_bool mysql_master_query(MYSQL *mysql, const char *q, unsigned long length); @@ -478,15 +526,39 @@ int mysql_add_slave(MYSQL* mysql, const char* host, int mysql_shutdown(MYSQL *mysql, enum mysql_enum_shutdown_level shutdown_level); +int mysql_shutdown_start(int *ret, MYSQL *mysql, + enum mysql_enum_shutdown_level + shutdown_level); +int mysql_shutdown_cont(int *ret, MYSQL *mysql, + int status); int mysql_dump_debug_info(MYSQL *mysql); +int mysql_dump_debug_info_start(int *ret, MYSQL *mysql); +int mysql_dump_debug_info_cont(int *ret, MYSQL *mysql, + int status); int mysql_refresh(MYSQL *mysql, unsigned int refresh_options); +int mysql_refresh_start(int *ret, MYSQL *mysql, + unsigned int refresh_options); +int mysql_refresh_cont(int *ret, MYSQL *mysql, int status); int mysql_kill(MYSQL *mysql,unsigned long pid); +int mysql_kill_start(int *ret, MYSQL *mysql, + unsigned long pid); +int mysql_kill_cont(int *ret, MYSQL *mysql, int status); int mysql_set_server_option(MYSQL *mysql, enum enum_mysql_set_option option); +int mysql_set_server_option_start(int *ret, MYSQL *mysql, + enum enum_mysql_set_option + option); +int mysql_set_server_option_cont(int *ret, MYSQL *mysql, + int status); int mysql_ping(MYSQL *mysql); +int mysql_ping_start(int *ret, MYSQL *mysql); +int mysql_ping_cont(int *ret, MYSQL *mysql, int status); const char * mysql_stat(MYSQL *mysql); +int mysql_stat_start(const char **ret, MYSQL *mysql); +int mysql_stat_cont(const char **ret, MYSQL *mysql, + int status); const char * mysql_get_server_info(MYSQL *mysql); const char * mysql_get_server_name(MYSQL *mysql); const char * mysql_get_client_info(void); @@ -495,11 +567,25 @@ const char * mysql_get_host_info(MYSQL *mysql); unsigned long mysql_get_server_version(MYSQL *mysql); unsigned int mysql_get_proto_info(MYSQL *mysql); MYSQL_RES * mysql_list_dbs(MYSQL *mysql,const char *wild); +int mysql_list_dbs_start(MYSQL_RES **ret, MYSQL *mysql, + const char *wild); +int mysql_list_dbs_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * mysql_list_tables(MYSQL *mysql,const char *wild); +int mysql_list_tables_start(MYSQL_RES **ret, MYSQL *mysql, + const char *wild); +int mysql_list_tables_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); MYSQL_RES * mysql_list_processes(MYSQL *mysql); +int mysql_list_processes_start(MYSQL_RES **ret, + MYSQL *mysql); +int mysql_list_processes_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); int mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg); void mysql_free_result(MYSQL_RES *result); +int mysql_free_result_start(MYSQL_RES *result); +int mysql_free_result_cont(MYSQL_RES *result, int status); void mysql_data_seek(MYSQL_RES *result, my_ulonglong offset); MYSQL_ROW_OFFSET mysql_row_seek(MYSQL_RES *result, @@ -507,10 +593,19 @@ MYSQL_ROW_OFFSET mysql_row_seek(MYSQL_RES *result, MYSQL_FIELD_OFFSET mysql_field_seek(MYSQL_RES *result, MYSQL_FIELD_OFFSET offset); MYSQL_ROW mysql_fetch_row(MYSQL_RES *result); +int mysql_fetch_row_start(MYSQL_ROW *ret, + MYSQL_RES *result); +int mysql_fetch_row_cont(MYSQL_ROW *ret, MYSQL_RES *result, + int status); unsigned long * mysql_fetch_lengths(MYSQL_RES *result); MYSQL_FIELD * mysql_fetch_field(MYSQL_RES *result); MYSQL_RES * mysql_list_fields(MYSQL *mysql, const char *table, const char *wild); +int mysql_list_fields_start(MYSQL_RES **ret, MYSQL *mysql, + const char *table, + const char *wild); +int mysql_list_fields_cont(MYSQL_RES **ret, MYSQL *mysql, + int status); unsigned long mysql_escape_string(char *to,const char *from, unsigned long from_length); unsigned long mysql_hex_string(char *to,const char *from, @@ -536,6 +631,10 @@ int mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf, int res_buf_size); my_bool mysql_read_query_result(MYSQL *mysql); +int mysql_read_query_result_start(my_bool *ret, + MYSQL *mysql); +int mysql_read_query_result_cont(my_bool *ret, + MYSQL *mysql, int status); enum enum_mysql_stmt_state { MYSQL_STMT_INIT_DONE= 1, MYSQL_STMT_PREPARE_DONE, MYSQL_STMT_EXECUTE_DONE, @@ -605,12 +704,22 @@ enum enum_stmt_attr_type MYSQL_STMT * mysql_stmt_init(MYSQL *mysql); int mysql_stmt_prepare(MYSQL_STMT *stmt, const char *query, unsigned long length); +int mysql_stmt_prepare_start(int *ret, MYSQL_STMT *stmt, + const char *query, unsigned long length); +int mysql_stmt_prepare_cont(int *ret, MYSQL_STMT *stmt, int status); int mysql_stmt_execute(MYSQL_STMT *stmt); +int mysql_stmt_execute_start(int *ret, MYSQL_STMT *stmt); +int mysql_stmt_execute_cont(int *ret, MYSQL_STMT *stmt, int status); int mysql_stmt_fetch(MYSQL_STMT *stmt); +int mysql_stmt_fetch_start(int *ret, MYSQL_STMT *stmt); +int mysql_stmt_fetch_cont(int *ret, MYSQL_STMT *stmt, int status); int mysql_stmt_fetch_column(MYSQL_STMT *stmt, MYSQL_BIND *bind_arg, unsigned int column, unsigned long offset); int mysql_stmt_store_result(MYSQL_STMT *stmt); +int mysql_stmt_store_result_start(int *ret, MYSQL_STMT *stmt); +int mysql_stmt_store_result_cont(int *ret, MYSQL_STMT *stmt, + int status); unsigned long mysql_stmt_param_count(MYSQL_STMT * stmt); my_bool mysql_stmt_attr_set(MYSQL_STMT *stmt, enum enum_stmt_attr_type attr_type, @@ -621,12 +730,25 @@ my_bool mysql_stmt_attr_get(MYSQL_STMT *stmt, my_bool mysql_stmt_bind_param(MYSQL_STMT * stmt, MYSQL_BIND * bnd); my_bool mysql_stmt_bind_result(MYSQL_STMT * stmt, MYSQL_BIND * bnd); my_bool mysql_stmt_close(MYSQL_STMT * stmt); +int mysql_stmt_close_start(my_bool *ret, MYSQL_STMT *stmt); +int mysql_stmt_close_cont(my_bool *ret, MYSQL_STMT * stmt, int status); my_bool mysql_stmt_reset(MYSQL_STMT * stmt); +int mysql_stmt_reset_start(my_bool *ret, MYSQL_STMT * stmt); +int mysql_stmt_reset_cont(my_bool *ret, MYSQL_STMT *stmt, int status); my_bool mysql_stmt_free_result(MYSQL_STMT *stmt); +int mysql_stmt_free_result_start(my_bool *ret, MYSQL_STMT *stmt); +int mysql_stmt_free_result_cont(my_bool *ret, MYSQL_STMT *stmt, + int status); my_bool mysql_stmt_send_long_data(MYSQL_STMT *stmt, unsigned int param_number, const char *data, unsigned long length); +int mysql_stmt_send_long_data_start(my_bool *ret, MYSQL_STMT *stmt, + unsigned int param_number, + const char *data, + unsigned long len); +int mysql_stmt_send_long_data_cont(my_bool *ret, MYSQL_STMT *stmt, + int status); MYSQL_RES * mysql_stmt_result_metadata(MYSQL_STMT *stmt); MYSQL_RES * mysql_stmt_param_metadata(MYSQL_STMT *stmt); unsigned int mysql_stmt_errno(MYSQL_STMT * stmt); @@ -641,8 +763,21 @@ my_ulonglong mysql_stmt_affected_rows(MYSQL_STMT *stmt); my_ulonglong mysql_stmt_insert_id(MYSQL_STMT *stmt); unsigned int mysql_stmt_field_count(MYSQL_STMT *stmt); my_bool mysql_commit(MYSQL * mysql); +int mysql_commit_start(my_bool *ret, MYSQL * mysql); +int mysql_commit_cont(my_bool *ret, MYSQL * mysql, int status); my_bool mysql_rollback(MYSQL * mysql); +int mysql_rollback_start(my_bool *ret, MYSQL * mysql); +int mysql_rollback_cont(my_bool *ret, MYSQL * mysql, int status); my_bool mysql_autocommit(MYSQL * mysql, my_bool auto_mode); +int mysql_autocommit_start(my_bool *ret, MYSQL * mysql, + my_bool auto_mode); +int mysql_autocommit_cont(my_bool *ret, MYSQL * mysql, int status); my_bool mysql_more_results(MYSQL *mysql); int mysql_next_result(MYSQL *mysql); +int mysql_next_result_start(int *ret, MYSQL *mysql); +int mysql_next_result_cont(int *ret, MYSQL *mysql, int status); void mysql_close(MYSQL *sock); +int mysql_close_start(MYSQL *sock); +int mysql_close_cont(MYSQL *sock, int status); +my_socket mysql_get_socket(const MYSQL *mysql); +unsigned int mysql_get_timeout_value(const MYSQL *mysql); diff --git a/include/sql_common.h b/include/sql_common.h index 6b66ae2fd81..8bb33e3779c 100644 --- a/include/sql_common.h +++ b/include/sql_common.h @@ -26,11 +26,18 @@ extern const char *unknown_sqlstate; extern const char *cant_connect_sqlstate; extern const char *not_error_sqlstate; + +struct mysql_async_context; + struct st_mysql_options_extention { char *plugin_dir; char *default_auth; }; +struct st_mysql_extension { + struct mysql_async_context *async_context; +}; + typedef struct st_mysql_methods { my_bool (*read_query_result)(MYSQL *mysql); @@ -102,6 +109,10 @@ void mysql_client_plugin_deinit(); struct st_mysql_client_plugin; extern struct st_mysql_client_plugin *mysql_client_builtins[]; +/* Non-blocking client API. */ +void my_context_install_suspend_resume_hook(struct mysql_async_context *b, + void (*)(my_bool, void *), void *); + #ifdef __cplusplus } #endif diff --git a/include/violite.h b/include/violite.h index c61ce4855c6..d5c9c083bdd 100644 --- a/include/violite.h +++ b/include/violite.h @@ -194,6 +194,8 @@ struct st_vio char *read_pos; /* start of unfetched data in the read buffer */ char *read_end; /* end of unfetched data */ + struct mysql_async_context *async_context; /* For non-blocking API */ + uint read_timeout, write_timeout; /* function pointers. They are similar for socket/SSL/whatever */ void (*viodelete)(Vio*); int (*vioerrno)(Vio*); diff --git a/libmysql/CMakeLists.txt b/libmysql/CMakeLists.txt index 4ac0b9a01ee..fc781a6dffc 100755 --- a/libmysql/CMakeLists.txt +++ b/libmysql/CMakeLists.txt @@ -95,7 +95,8 @@ SET(CLIENT_SOURCES ../mysys/array.c ../strings/bchange.c ../strings/bmove.c ../strings/strtoll.c ../strings/strtoull.c ../strings/strxmov.c ../strings/strxnmov.c ../mysys/thr_mutex.c ../mysys/typelib.c ../vio/vio.c ../vio/viosocket.c ../vio/viossl.c ../vio/viosslfactories.c ../strings/xml.c ../mysys/mf_qsort.c - ../mysys/my_getsystime.c ../mysys/my_sync.c ../sql-common/client_plugin.c ${LIB_SOURCES}) + ../mysys/my_getsystime.c ../mysys/my_sync.c ../sql-common/client_plugin.c + ../sql-common/mysql_async.c ../mysys/my_context.c ${LIB_SOURCES}) # Need to set USE_TLS for building the DLL, since __declspec(thread) # approach to thread local storage does not work properly in DLLs. diff --git a/libmysql/Makefile.shared b/libmysql/Makefile.shared index f9698f91a21..93bc527f915 100644 --- a/libmysql/Makefile.shared +++ b/libmysql/Makefile.shared @@ -71,9 +71,10 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \ mf_iocache2.lo my_seek.lo my_sleep.lo \ my_pread.lo mf_cache.lo md5.lo sha1.lo my_rnd.lo \ my_getopt.lo my_port.lo \ - my_rename.lo my_chsize.lo my_sync.lo my_getsystime.lo + my_rename.lo my_chsize.lo my_sync.lo my_getsystime.lo \ + my_context.lo sqlobjects = net.lo -sql_cmn_objects = pack.lo client.lo my_time.lo client_plugin.lo +sql_cmn_objects = pack.lo client.lo my_time.lo client_plugin.lo mysql_async.lo # Not needed in the minimum library mysysobjects2 = my_lib.lo mf_qsort.lo diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index 412902f5746..c751c5fa8c0 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -104,6 +104,7 @@ SET(LIBMYSQLD_SOURCES libmysqld.c emb_qcache.cc lib_sql.cc ../sql-common/my_user.c ../sql-common/pack.c ../sql/password.c ../sql/discover.cc ../sql/derror.cc ../sql/field.cc ../sql/field_conv.cc ../sql-common/client_plugin.c + ../sql-common/mysql_async.c ../sql/filesort.cc ../sql/gstream.cc ../sql/ha_partition.cc ../sql/handler.cc ../sql/hash_filo.cc ../sql/hostname.cc ../sql/init.cc ../sql/item_buff.cc ../sql/item_cmpfunc.cc diff --git a/libmysqld/Makefile.am b/libmysqld/Makefile.am index ef17a0a6923..926b339003d 100644 --- a/libmysqld/Makefile.am +++ b/libmysqld/Makefile.am @@ -41,7 +41,7 @@ pkglib_LTLIBRARIES = libmysqld.la SUBDIRS = . examples libmysqld_sources= libmysqld.c lib_sql.cc emb_qcache.cc libmysqlsources = errmsg.c get_password.c libmysql.c client.c pack.c \ - my_time.c client_plugin.c + my_time.c client_plugin.c mysql_async.c noinst_HEADERS = embedded_priv.h emb_qcache.h diff --git a/mysql-test/mysql-test-run.pl b/mysql-test/mysql-test-run.pl index 88f175b636f..19b9b97e22d 100755 --- a/mysql-test/mysql-test-run.pl +++ b/mysql-test/mysql-test-run.pl @@ -185,6 +185,7 @@ my $opt_ps_protocol; my $opt_sp_protocol; my $opt_cursor_protocol; my $opt_view_protocol; +my $opt_non_blocking_api; our $opt_debug; our $opt_debug_server; @@ -964,6 +965,7 @@ sub command_line_setup { 'sp-protocol' => \$opt_sp_protocol, 'view-protocol' => \$opt_view_protocol, 'cursor-protocol' => \$opt_cursor_protocol, + 'non-blocking-api' => \$opt_non_blocking_api, 'ssl|with-openssl' => \$opt_ssl, 'skip-ssl' => \$opt_skip_ssl, 'compress' => \$opt_compress, @@ -5468,6 +5470,11 @@ sub start_mysqltest ($) { mtr_add_arg($args, "--cursor-protocol"); } + if ( $opt_non_blocking_api ) + { + mtr_add_arg($args, "--non-blocking-api"); + } + if ( $opt_strace_client ) { $exe= $opt_strace_client || "strace"; @@ -5915,6 +5922,7 @@ Options to control what engine/variation to run (implies --ps-protocol) view-protocol Create a view to execute all non updating queries sp-protocol Create a stored procedure to execute all queries + non-blocking-api Use the non-blocking client API compress Use the compressed protocol between client and server ssl Use ssl protocol between client and server skip-ssl Dont start server with support for ssl connections diff --git a/mysql-test/r/mysql_client_test_nonblock.result b/mysql-test/r/mysql_client_test_nonblock.result new file mode 100644 index 00000000000..edda7980e97 --- /dev/null +++ b/mysql-test/r/mysql_client_test_nonblock.result @@ -0,0 +1,5 @@ +SET @old_general_log= @@global.general_log; +SET @old_slow_query_log= @@global.slow_query_log; +ok +SET @@global.general_log= @old_general_log; +SET @@global.slow_query_log= @old_slow_query_log; diff --git a/mysql-test/r/non_blocking_api.result b/mysql-test/r/non_blocking_api.result new file mode 100644 index 00000000000..470e3e067f6 --- /dev/null +++ b/mysql-test/r/non_blocking_api.result @@ -0,0 +1,9 @@ +CREATE TABLE t1 (a INT PRIMARY KEY); +INSERT INTO t1 VALUES (1); +SELECT * FROM t1; +a +1 +SELECT * FROM t1; +a +1 +DROP TABLE t1; diff --git a/mysql-test/t/mysql_client_test_nonblock-master.opt b/mysql-test/t/mysql_client_test_nonblock-master.opt new file mode 100644 index 00000000000..4c683f7f0a2 --- /dev/null +++ b/mysql-test/t/mysql_client_test_nonblock-master.opt @@ -0,0 +1 @@ +--log=$MYSQLTEST_VARDIR/log/master.log --log-output=FILE,TABLE diff --git a/mysql-test/t/mysql_client_test_nonblock.test b/mysql-test/t/mysql_client_test_nonblock.test new file mode 100644 index 00000000000..f212fe6abda --- /dev/null +++ b/mysql-test/t/mysql_client_test_nonblock.test @@ -0,0 +1,23 @@ +# This runs the mysql_client_test using the non-blocking API. + +# This test should work in embedded server after we fix mysqltest +-- source include/not_embedded.inc + +SET @old_general_log= @@global.general_log; +SET @old_slow_query_log= @@global.slow_query_log; + +# We run with different binaries for normal and --embedded-server +# +# If this test fails with "command "$MYSQL_CLIENT_TEST" failed", +# you should either run mysql_client_test separartely against a running +# server or run mysql-test-run --debug mysql_client_test and check +# var/log/mysql_client_test.trace + +--exec echo "$MYSQL_CLIENT_TEST --non-blocking-api" > $MYSQLTEST_VARDIR/log/mysql_client_test.out.log 2>&1 +--exec $MYSQL_CLIENT_TEST --non-blocking-api --getopt-ll-test=25600M >> $MYSQLTEST_VARDIR/log/mysql_client_test.out.log 2>&1 + +# End of 4.1 tests +echo ok; + +SET @@global.general_log= @old_general_log; +SET @@global.slow_query_log= @old_slow_query_log; diff --git a/mysql-test/t/named_pipe.test b/mysql-test/t/named_pipe.test index e88fd8e1ef8..7c623abddb0 100644 --- a/mysql-test/t/named_pipe.test +++ b/mysql-test/t/named_pipe.test @@ -2,6 +2,8 @@ # in order to optimize things we skip this test on all # other platforms --source include/windows.inc +# Named pipe does not support the non-blocking API. +--disable_non_blocking_api # Only run this test if named pipe is avaliable let $nmp= query_get_value("SHOW VARIABLES LIKE 'named_pipe'", Value, 1); diff --git a/mysql-test/t/non_blocking_api.test b/mysql-test/t/non_blocking_api.test new file mode 100644 index 00000000000..b9909a1c8a4 --- /dev/null +++ b/mysql-test/t/non_blocking_api.test @@ -0,0 +1,18 @@ +# Test mixing the use of blocking and non-blocking API in a single connection. + +--enable_non_blocking_api +connect (con_nonblock,localhost,root,,test); +--disable_non_blocking_api +connect (con_normal,localhost,root,,test); + +connection con_nonblock; +CREATE TABLE t1 (a INT PRIMARY KEY); +--enable_non_blocking_api +INSERT INTO t1 VALUES (1); +--disable_non_blocking_api +SELECT * FROM t1; +--enable_non_blocking_api +SELECT * FROM t1; + +connection con_normal; +DROP TABLE t1; diff --git a/mysys/CMakeLists.txt b/mysys/CMakeLists.txt index 9ab19222caf..cf222cb9819 100644 --- a/mysys/CMakeLists.txt +++ b/mysys/CMakeLists.txt @@ -44,6 +44,7 @@ SET(MYSYS_SOURCES array.c charset-def.c charset.c checksum.c default.c default_ lf_alloc-pin.c lf_dynarray.c lf_hash.c my_atomic.c my_getncpus.c my_rnd.c my_uuid.c wqueue.c waiting_threads.c + my_context.c ) IF(NOT SOURCE_SUBLIBS) diff --git a/mysys/Makefile.am b/mysys/Makefile.am index 1dd4cc0f780..050788531fb 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -58,7 +58,7 @@ libmysys_la_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \ my_compare.c my_netware.c my_largepage.c \ my_memmem.c stacktrace.c \ my_windac.c my_access.c base64.c my_libwrap.c \ - wqueue.c + wqueue.c my_context.c libmysys_la_LDFLAGS = $(AM_LDFLAGS) @WRAPLIBS@ libmysys_la_LIBADD = $(ZLIB_LIBS) diff --git a/mysys/my_context.c b/mysys/my_context.c new file mode 100644 index 00000000000..cc49c0cdbb9 --- /dev/null +++ b/mysys/my_context.c @@ -0,0 +1,749 @@ +/* + Copyright 2011 Kristian Nielsen + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* + Implementation of async context spawning using Posix ucontext and + swapcontext(). +*/ + +#include <stdio.h> +#include <errno.h> + +#include "mysys_priv.h" +#include "my_context.h" + +#ifdef HAVE_VALGRIND_VALGRIND_H +#include <valgrind/valgrind.h> +#endif + +#ifdef MY_CONTEXT_USE_UCONTEXT +/* + The makecontext() only allows to pass integers into the created context :-( + We want to pass pointers, so we do it this kinda hackish way. + Anyway, it should work everywhere, and at least it does not break strict + aliasing. +*/ +union pass_void_ptr_as_2_int { + int a[2]; + void *p; +}; + + +/* + We use old-style function definition here, as this is passed to + makecontext(). And the type of the makecontext() argument does not match + the actual type (as the actual type can differ from call to call). +*/ +static void +my_context_spawn_internal(i0, i1) +int i0, i1; +{ + int err; + struct my_context *c; + union pass_void_ptr_as_2_int u; + + u.a[0]= i0; + u.a[1]= i1; + c= (struct my_context *)u.p; + + (*c->user_func)(c->user_data); + c->active= 0; + err= setcontext(&c->base_context); + fprintf(stderr, "Aieie, setcontext() failed: %d (errno=%d)\n", err, errno); +} + + +int +my_context_continue(struct my_context *c) +{ + int err; + + if (!c->active) + return 0; + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + err= swapcontext(&c->base_context, &c->spawned_context); +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + if (err) + { + fprintf(stderr, "Aieie, swapcontext() failed: %d (errno=%d)\n", + err, errno); + return -1; + } + + return c->active; +} + + +int +my_context_spawn(struct my_context *c, void (*f)(void *), void *d) +{ + int err; + union pass_void_ptr_as_2_int u; + + err= getcontext(&c->spawned_context); + if (err) + return -1; + c->spawned_context.uc_stack.ss_sp= c->stack; + c->spawned_context.uc_stack.ss_size= c->stack_size; + c->spawned_context.uc_link= NULL; + c->user_func= f; + c->user_data= d; + c->active= 1; + u.p= c; + makecontext(&c->spawned_context, my_context_spawn_internal, 2, + u.a[0], u.a[1]); + + return my_context_continue(c); +} + + +int +my_context_yield(struct my_context *c) +{ + int err; + + if (!c->active) + return -1; + + err= swapcontext(&c->spawned_context, &c->base_context); + if (err) + return -1; + return 0; +} + +int +my_context_init(struct my_context *c, size_t stack_size) +{ + if (2*sizeof(int) < sizeof(void *)) + { + fprintf(stderr, + "Error: Unable to store pointer in 2 ints on this architecture\n"); + return -1; + } + if (!(c->stack= malloc(stack_size))) + return -1; /* Out of memory */ + c->stack_size= stack_size; +#ifdef HAVE_VALGRIND_VALGRIND_H + c->valgrind_stack_id= + VALGRIND_STACK_REGISTER(c->stack, ((unsigned char *)(c->stack))+stack_size); +#endif +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif + return 0; +} + +void +my_context_destroy(struct my_context *c) +{ + if (c->stack) + { +#ifdef HAVE_VALGRIND_VALGRIND_H + VALGRIND_STACK_DEREGISTER(c->valgrind_stack_id); +#endif + free(c->stack); + } +#ifndef DBUG_OFF + dbug_free_code_state(&c->dbug_state); +#endif +} + +#endif /* MY_CONTEXT_USE_UCONTEXT */ + + +#ifdef MY_CONTEXT_USE_X86_64_GCC_ASM +/* + GCC-amd64 implementation of my_context. + + This is slightly optimized in the common case where we never yield + (eg. fetch next row and it is already fully received in buffer). In this + case we do not need to restore registers at return (though we still need to + save them as we cannot know if we will yield or not in advance). +*/ + +#include <stdint.h> +#include <stdlib.h> + +/* + Layout of saved registers etc. + Since this is accessed through gcc inline assembler, it is simpler to just + use numbers than to try to define nice constants or structs. + + 0 0 %rsp + 1 8 %rbp + 2 16 %rbx + 3 24 %r12 + 4 32 %r13 + 5 40 %r14 + 6 48 %r15 + 7 56 %rip for done + 8 64 %rip for yield/continue +*/ + +int +my_context_spawn(struct my_context *c, void (*f)(void *), void *d) +{ + int ret; + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + /* + There are 6 callee-save registers we need to save and restore when + suspending and continuing, plus stack pointer %rsp and instruction pointer + %rip. + + However, if we never suspend, the user-supplied function will in any case + restore the 6 callee-save registers, so we can avoid restoring them in + this case. + */ + __asm__ __volatile__ + ( + "movq %%rsp, (%[save])\n\t" + "movq %[stack], %%rsp\n\t" + "movq %%rbp, 8(%[save])\n\t" + "movq %%rbx, 16(%[save])\n\t" + "movq %%r12, 24(%[save])\n\t" + "movq %%r13, 32(%[save])\n\t" + "movq %%r14, 40(%[save])\n\t" + "movq %%r15, 48(%[save])\n\t" + "leaq 1f(%%rip), %%rax\n\t" + "leaq 2f(%%rip), %%rcx\n\t" + "movq %%rax, 56(%[save])\n\t" + "movq %%rcx, 64(%[save])\n\t" + /* + Constraint below puts the argument to the user function into %rdi, as + needed for the calling convention. + */ + "callq *%[f]\n\t" + "jmpq *56(%[save])\n" + /* + Come here when operation is done. + We do not need to restore callee-save registers, as the called function + will do this for us if needed. + */ + "1:\n\t" + "movq (%[save]), %%rsp\n\t" + "xorl %[ret], %[ret]\n\t" + "jmp 3f\n" + /* Come here when operation was suspended. */ + "2:\n\t" + "movl $1, %[ret]\n" + "3:\n" + : [ret] "=a" (ret), + [f] "+S" (f), + /* Need this in %rdi to follow calling convention. */ + [d] "+D" (d) + : [stack] "a" (c->stack_top), + /* Need this in callee-save register to preserve in function call. */ + [save] "b" (&c->save[0]) + : "rcx", "rdx", "r8", "r9", "r10", "r11", "memory", "cc" + ); + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + return ret; +} + +int +my_context_continue(struct my_context *c) +{ + int ret; + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + __asm__ __volatile__ + ( + "movq (%[save]), %%rax\n\t" + "movq %%rsp, (%[save])\n\t" + "movq %%rax, %%rsp\n\t" + "movq 8(%[save]), %%rax\n\t" + "movq %%rbp, 8(%[save])\n\t" + "movq %%rax, %%rbp\n\t" + "movq 24(%[save]), %%rax\n\t" + "movq %%r12, 24(%[save])\n\t" + "movq %%rax, %%r12\n\t" + "movq 32(%[save]), %%rax\n\t" + "movq %%r13, 32(%[save])\n\t" + "movq %%rax, %%r13\n\t" + "movq 40(%[save]), %%rax\n\t" + "movq %%r14, 40(%[save])\n\t" + "movq %%rax, %%r14\n\t" + "movq 48(%[save]), %%rax\n\t" + "movq %%r15, 48(%[save])\n\t" + "movq %%rax, %%r15\n\t" + + "leaq 1f(%%rip), %%rax\n\t" + "leaq 2f(%%rip), %%rcx\n\t" + "movq %%rax, 56(%[save])\n\t" + "movq 64(%[save]), %%rax\n\t" + "movq %%rcx, 64(%[save])\n\t" + + "movq 16(%[save]), %%rcx\n\t" + "movq %%rbx, 16(%[save])\n\t" + "movq %%rcx, %%rbx\n\t" + + "jmpq *%%rax\n" + /* + Come here when operation is done. + Be sure to use the same callee-save register for %[save] here and in + my_context_spawn(), so we preserve the value correctly at this point. + */ + "1:\n\t" + "movq (%[save]), %%rsp\n\t" + "movq 8(%[save]), %%rbp\n\t" + /* %rbx is preserved from my_context_spawn() in this case. */ + "movq 24(%[save]), %%r12\n\t" + "movq 32(%[save]), %%r13\n\t" + "movq 40(%[save]), %%r14\n\t" + "movq 48(%[save]), %%r15\n\t" + "xorl %[ret], %[ret]\n\t" + "jmp 3f\n" + /* Come here when operation is suspended. */ + "2:\n\t" + "movl $1, %[ret]\n" + "3:\n" + : [ret] "=a" (ret) + : /* Need this in callee-save register to preserve in function call. */ + [save] "b" (&c->save[0]) + : "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "memory", "cc" + ); + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + return ret; +} + +int +my_context_yield(struct my_context *c) +{ + uint64_t *save= &c->save[0]; + __asm__ __volatile__ + ( + "movq (%[save]), %%rax\n\t" + "movq %%rsp, (%[save])\n\t" + "movq %%rax, %%rsp\n\t" + "movq 8(%[save]), %%rax\n\t" + "movq %%rbp, 8(%[save])\n\t" + "movq %%rax, %%rbp\n\t" + "movq 16(%[save]), %%rax\n\t" + "movq %%rbx, 16(%[save])\n\t" + "movq %%rax, %%rbx\n\t" + "movq 24(%[save]), %%rax\n\t" + "movq %%r12, 24(%[save])\n\t" + "movq %%rax, %%r12\n\t" + "movq 32(%[save]), %%rax\n\t" + "movq %%r13, 32(%[save])\n\t" + "movq %%rax, %%r13\n\t" + "movq 40(%[save]), %%rax\n\t" + "movq %%r14, 40(%[save])\n\t" + "movq %%rax, %%r14\n\t" + "movq 48(%[save]), %%rax\n\t" + "movq %%r15, 48(%[save])\n\t" + "movq %%rax, %%r15\n\t" + "movq 64(%[save]), %%rax\n\t" + "leaq 1f(%%rip), %%rcx\n\t" + "movq %%rcx, 64(%[save])\n\t" + + "jmpq *%%rax\n" + + "1:\n" + : [save] "+D" (save) + : + : "rax", "rcx", "rdx", "rsi", "r8", "r9", "r10", "r11", "memory", "cc" + ); + return 0; +} + +int +my_context_init(struct my_context *c, size_t stack_size) +{ + if (!(c->stack_bot= malloc(stack_size))) + return -1; /* Out of memory */ + c->stack_top= ((unsigned char *)(c->stack_bot)) + stack_size; +#ifdef HAVE_VALGRIND_VALGRIND_H + c->valgrind_stack_id= + VALGRIND_STACK_REGISTER(c->stack_bot, c->stack_top); +#endif +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif + return 0; +} + +void +my_context_destroy(struct my_context *c) +{ + if (c->stack_bot) + { + free(c->stack_bot); +#ifdef HAVE_VALGRIND_VALGRIND_H + VALGRIND_STACK_DEREGISTER(c->valgrind_stack_id); +#endif + } +#ifndef DBUG_OFF + dbug_free_code_state(&c->dbug_state); +#endif +} + +#endif /* MY_CONTEXT_USE_X86_64_GCC_ASM */ + + +#ifdef MY_CONTEXT_USE_I386_GCC_ASM +/* + GCC-i386 implementation of my_context. + + This is slightly optimized in the common case where we never yield + (eg. fetch next row and it is already fully received in buffer). In this + case we do not need to restore registers at return (though we still need to + save them as we cannot know if we will yield or not in advance). +*/ + +#include <stdint.h> +#include <stdlib.h> + +/* + Layout of saved registers etc. + Since this is accessed through gcc inline assembler, it is simpler to just + use numbers than to try to define nice constants or structs. + + 0 0 %esp + 1 4 %ebp + 2 8 %ebx + 3 12 %esi + 4 16 %edi + 5 20 %eip for done + 6 24 %eip for yield/continue +*/ + +int +my_context_spawn(struct my_context *c, void (*f)(void *), void *d) +{ + int ret; + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + /* + There are 4 callee-save registers we need to save and restore when + suspending and continuing, plus stack pointer %esp and instruction pointer + %eip. + + However, if we never suspend, the user-supplied function will in any case + restore the 4 callee-save registers, so we can avoid restoring them in + this case. + */ + __asm__ __volatile__ + ( + "movl %%esp, (%[save])\n\t" + "movl %[stack], %%esp\n\t" + /* Push the parameter on the stack. */ + "pushl %[d]\n\t" + "movl %%ebp, 4(%[save])\n\t" + "movl %%ebx, 8(%[save])\n\t" + "movl %%esi, 12(%[save])\n\t" + "movl %%edi, 16(%[save])\n\t" + /* Get label addresses in -fPIC-compatible way (no pc-relative on 32bit) */ + "call 1f\n" + "1:\n\t" + "popl %%eax\n\t" + "addl $(2f-1b), %%eax\n\t" + "movl %%eax, 20(%[save])\n\t" + "addl $(3f-2f), %%eax\n\t" + "movl %%eax, 24(%[save])\n\t" + "call *%[f]\n\t" + "jmp *20(%[save])\n" + /* + Come here when operation is done. + We do not need to restore callee-save registers, as the called function + will do this for us if needed. + */ + "2:\n\t" + "movl (%[save]), %%esp\n\t" + "xorl %[ret], %[ret]\n\t" + "jmp 4f\n" + /* Come here when operation was suspended. */ + "3:\n\t" + "movl $1, %[ret]\n" + "4:\n" + : [ret] "=a" (ret) + : [stack] "a" (c->stack_top), + /* Need this in callee-save register to preserve across function call. */ + [save] "D" (&c->save[0]), + [f] "m" (f), + [d] "m" (d) + : "ecx", "edx", "memory", "cc" + ); + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + return ret; +} + +int +my_context_continue(struct my_context *c) +{ + int ret; + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + __asm__ __volatile__ + ( + "movl (%[save]), %%eax\n\t" + "movl %%esp, (%[save])\n\t" + "movl %%eax, %%esp\n\t" + "movl 4(%[save]), %%eax\n\t" + "movl %%ebp, 4(%[save])\n\t" + "movl %%eax, %%ebp\n\t" + "movl 8(%[save]), %%eax\n\t" + "movl %%ebx, 8(%[save])\n\t" + "movl %%eax, %%ebx\n\t" + "movl 12(%[save]), %%eax\n\t" + "movl %%esi, 12(%[save])\n\t" + "movl %%eax, %%esi\n\t" + + "movl 24(%[save]), %%eax\n\t" + "call 1f\n" + "1:\n\t" + "popl %%ecx\n\t" + "addl $(2f-1b), %%ecx\n\t" + "movl %%ecx, 20(%[save])\n\t" + "addl $(3f-2f), %%ecx\n\t" + "movl %%ecx, 24(%[save])\n\t" + + /* Must restore %edi last as it is also our %[save] register. */ + "movl 16(%[save]), %%ecx\n\t" + "movl %%edi, 16(%[save])\n\t" + "movl %%ecx, %%edi\n\t" + + "jmp *%%eax\n" + /* + Come here when operation is done. + Be sure to use the same callee-save register for %[save] here and in + my_context_spawn(), so we preserve the value correctly at this point. + */ + "2:\n\t" + "movl (%[save]), %%esp\n\t" + "movl 4(%[save]), %%ebp\n\t" + "movl 8(%[save]), %%ebx\n\t" + "movl 12(%[save]), %%esi\n\t" + "movl 16(%[save]), %%edi\n\t" + "xorl %[ret], %[ret]\n\t" + "jmp 4f\n" + /* Come here when operation is suspended. */ + "3:\n\t" + "movl $1, %[ret]\n" + "4:\n" + : [ret] "=a" (ret) + : /* Need this in callee-save register to preserve in function call. */ + [save] "D" (&c->save[0]) + : "ecx", "edx", "memory", "cc" + ); + +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + + return ret; +} + +int +my_context_yield(struct my_context *c) +{ + uint64_t *save= &c->save[0]; + __asm__ __volatile__ + ( + "movl (%[save]), %%eax\n\t" + "movl %%esp, (%[save])\n\t" + "movl %%eax, %%esp\n\t" + "movl 4(%[save]), %%eax\n\t" + "movl %%ebp, 4(%[save])\n\t" + "movl %%eax, %%ebp\n\t" + "movl 8(%[save]), %%eax\n\t" + "movl %%ebx, 8(%[save])\n\t" + "movl %%eax, %%ebx\n\t" + "movl 12(%[save]), %%eax\n\t" + "movl %%esi, 12(%[save])\n\t" + "movl %%eax, %%esi\n\t" + "movl 16(%[save]), %%eax\n\t" + "movl %%edi, 16(%[save])\n\t" + "movl %%eax, %%edi\n\t" + + "movl 24(%[save]), %%eax\n\t" + "call 1f\n" + "1:\n\t" + "popl %%ecx\n\t" + "addl $(2f-1b), %%ecx\n\t" + "movl %%ecx, 24(%[save])\n\t" + + "jmp *%%eax\n" + + "2:\n" + : [save] "+d" (save) + : + : "eax", "ecx", "memory", "cc" + ); + return 0; +} + +int +my_context_init(struct my_context *c, size_t stack_size) +{ + if (!(c->stack_bot= malloc(stack_size))) + return -1; /* Out of memory */ + c->stack_top= ((unsigned char *)(c->stack_bot)) + stack_size; +#ifdef HAVE_VALGRIND_VALGRIND_H + c->valgrind_stack_id= + VALGRIND_STACK_REGISTER(c->stack_bot, c->stack_top); +#endif +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif + return 0; +} + +void +my_context_destroy(struct my_context *c) +{ + if (c->stack_bot) + { + free(c->stack_bot); +#ifdef HAVE_VALGRIND_VALGRIND_H + VALGRIND_STACK_DEREGISTER(c->valgrind_stack_id); +#endif + } +#ifndef DBUG_OFF + dbug_free_code_state(&c->dbug_state); +#endif +} + +#endif /* MY_CONTEXT_USE_I386_GCC_ASM */ + + +#ifdef MY_CONTEXT_USE_WIN32_FIBERS +int +my_context_yield(struct my_context *c) +{ + c->return_value= 1; + SwitchToFiber(c->app_fiber); + return 0; +} + + +static void WINAPI +my_context_trampoline(void *p) +{ + struct my_context *c= (struct my_context *)p; + /* + Reuse the Fiber by looping infinitely, each time we are scheduled we + spawn the appropriate function and switch back when it is done. + + This way we avoid the overhead of CreateFiber() for every asynchroneous + operation. + */ + for(;;) + { + (*(c->user_func))(c->user_arg); + c->return_value= 0; + SwitchToFiber(c->app_fiber); + } +} + +int +my_context_init(struct my_context *c, size_t stack_size) +{ +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif + c->lib_fiber= CreateFiber(stack_size, my_context_trampoline, c); + if (c->lib_fiber) + return 0; + else + return -1; +} + +void +my_context_destroy(struct my_context *c) +{ +#ifndef DBUG_OFF + dbug_free_code_state(&c->dbug_state); +#endif + if (c->lib_fiber) + { + DeleteFiber(c->lib_fiber); + c->lib_fiber= NULL; + } +} + +int +my_context_spawn(struct my_context *c, void (*f)(void *), void *d) +{ + void *current_fiber; + c->user_func= f; + c->user_arg= d; + /* + This seems to be a common trick to run ConvertThreadToFiber() only on the + first occurence in a thread, in a way that works on multiple Windows + versions. + */ + current_fiber= GetCurrentFiber(); + if (current_fiber == NULL || current_fiber == (void *)0x1e00) + current_fiber= ConvertThreadToFiber(c); + c->app_fiber= current_fiber; +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + SwitchToFiber(c->lib_fiber); +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + return c->return_value; +} + +int +my_context_continue(struct my_context *c) +{ +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + SwitchToFiber(c->lib_fiber); +#ifndef DBUG_OFF + dbug_swap_code_state(&c->dbug_state); +#endif + return c->return_value; +} + +#endif /* MY_CONTEXT_USE_WIN32_FIBERS */ diff --git a/server-tools/instance-manager/CMakeLists.txt b/server-tools/instance-manager/CMakeLists.txt index 8e96a3f6425..97627d08597 100755 --- a/server-tools/instance-manager/CMakeLists.txt +++ b/server-tools/instance-manager/CMakeLists.txt @@ -26,7 +26,8 @@ ADD_EXECUTABLE(mysqlmanager buffer.cc command.cc commands.cc guardian.cc instanc user_management_commands.cc ../../mysys/my_rnd.c ../../sql/net_serv.cc ../../sql-common/pack.c ../../sql/password.c ../../sql/sql_state.c ../../sql-common/client.c ../../libmysql/get_password.c - ../../libmysql/errmsg.c ../../sql-common/client_plugin.c) + ../../libmysql/errmsg.c ../../sql-common/client_plugin.c + ../../sql-common/mysql_async.c) ADD_DEPENDENCIES(mysqlmanager GenError) TARGET_LINK_LIBRARIES(mysqlmanager debug dbug mysys strings taocrypt vio yassl zlib wsock32) diff --git a/server-tools/instance-manager/Makefile.am b/server-tools/instance-manager/Makefile.am index dce7c77232d..42213993597 100644 --- a/server-tools/instance-manager/Makefile.am +++ b/server-tools/instance-manager/Makefile.am @@ -47,6 +47,7 @@ libnet_a_LIBADD= $(top_builddir)/sql/password.$(OBJEXT) \ $(top_builddir)/sql/sql_state.$(OBJEXT) \ $(top_builddir)/sql/mini_client_errors.$(OBJEXT)\ $(top_builddir)/sql/client.$(OBJEXT) \ + $(top_builddir)/sql/mysql_async.$(OBJEXT) \ $(top_builddir)/sql/client_plugin.$(OBJEXT) CLEANFILES= net_serv.cc client_settings.h diff --git a/sql-common/Makefile.am b/sql-common/Makefile.am index 2f5a049085f..379cff832ce 100644 --- a/sql-common/Makefile.am +++ b/sql-common/Makefile.am @@ -14,4 +14,5 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## Process this file with automake to create Makefile.in -EXTRA_DIST = client.c pack.c my_time.c my_user.c client_plugin.c +EXTRA_DIST = client.c pack.c my_time.c my_user.c client_plugin.c \ + mysql_async.c diff --git a/sql-common/client.c b/sql-common/client.c index 28b3cf274bc..f56a6d5111f 100644 --- a/sql-common/client.c +++ b/sql-common/client.c @@ -108,6 +108,7 @@ my_bool net_flush(NET *net); #include "client_settings.h" #include <sql_common.h> #include <mysql/client_plugin.h> +#include "my_context.h" #define native_password_plugin_name "mysql_native_password" #define old_password_plugin_name "mysql_old_password" @@ -1050,6 +1051,15 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd) return 0; } +#define mysql_extension_get(MYSQL, X) \ + ((MYSQL)->extension ? (MYSQL)->extension->X : NULL) +#define mysql_extension_set(MYSQL, X, VAL) \ + if (!(MYSQL)->extension) \ + (MYSQL)->extension= (struct st_mysql_extension *) \ + my_malloc(sizeof(struct st_mysql_extension), \ + MYF(MY_WME | MY_ZEROFILL)); \ + (MYSQL)->extension->X= VAL; + #define extension_set_string(OPTS, X, STR) \ if ((OPTS)->extension) \ my_free((OPTS)->extension->X, MYF(MY_ALLOW_ZERO_PTR)); \ @@ -1266,6 +1276,36 @@ void mysql_read_default_options(struct st_mysql_options *options, DBUG_VOID_RETURN; } +/* + Fetch the context for asynchronous API calls, allocating a new one if + necessary. +*/ +#define STACK_SIZE (4096*15) + +struct mysql_async_context * +mysql_get_async_context(MYSQL *mysql) +{ + struct mysql_async_context *b; + if ((b= mysql_extension_get(mysql, async_context))) + return b; + + if (!(b= (struct mysql_async_context *) + my_malloc(sizeof(*b), MYF(MY_ZEROFILL)))) + { + set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate); + return NULL; + } + if (my_context_init(&b->async_context, STACK_SIZE)) + { + my_free(b, MYF(0)); + return NULL; + } + mysql_extension_set(mysql, async_context, b) + if (mysql->net.vio) + mysql->net.vio->async_context= b; + return b; +} + /************************************************************************** Get column lengths of the current row @@ -2537,6 +2577,26 @@ int run_plugin_auth(MYSQL *mysql, char *data, uint data_len, } +static int +connect_sync_or_async(MYSQL *mysql, NET *net, my_socket fd, + const struct sockaddr *name, uint namelen) +{ + extern int my_connect_async(struct mysql_async_context *b, my_socket fd, + const struct sockaddr *name, uint namelen, + uint timeout); + struct mysql_async_context *actxt= mysql_extension_get(mysql, async_context); + + if (actxt && actxt->active) + { + my_bool old_mode; + vio_blocking(net->vio, FALSE, &old_mode); + return my_connect_async(actxt, fd, name, namelen, + mysql->options.connect_timeout); + } + else + return my_connect(fd, name, namelen, mysql->options.connect_timeout); +} + MYSQL * STDCALL CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, const char *passwd, const char *db, @@ -2552,6 +2612,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, struct sockaddr_in sock_addr; ulong pkt_length; NET *net= &mysql->net; + struct mysql_async_context *actxt; #ifdef MYSQL_SERVER thr_alarm_t alarmed; ALARM alarm_buff; @@ -2681,8 +2742,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, bzero((char*) &UNIXaddr,sizeof(UNIXaddr)); UNIXaddr.sun_family = AF_UNIX; strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1); - if (my_connect(sock,(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr), - mysql->options.connect_timeout)) + if (connect_sync_or_async(mysql, net, sock, + (struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr))) { DBUG_PRINT("error",("Got error %d on connect to local server", socket_errno)); @@ -2763,8 +2824,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, if ((int) (ip_addr = inet_addr(host)) != (int) INADDR_NONE) { memcpy_fixed(&sock_addr.sin_addr,&ip_addr,sizeof(ip_addr)); - status= my_connect(sock, (struct sockaddr *) &sock_addr, - sizeof(sock_addr), mysql->options.connect_timeout); + status= connect_sync_or_async(mysql, net, sock, + (struct sockaddr *) &sock_addr, + sizeof(sock_addr)); } else { @@ -2795,8 +2857,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, min(sizeof(sock_addr.sin_addr), (size_t) hp->h_length)); DBUG_PRINT("info",("Trying %s...", (my_inet_ntoa(sock_addr.sin_addr, ipaddr), ipaddr))); - status= my_connect(sock, (struct sockaddr *) &sock_addr, - sizeof(sock_addr), mysql->options.connect_timeout); + status= connect_sync_or_async(mysql, net, sock, + (struct sockaddr *) &sock_addr, + sizeof(sock_addr)); } my_gethostbyname_r_free(); @@ -2818,6 +2881,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, goto error; } + if ((actxt= mysql_extension_get(mysql, async_context)) && actxt->active) + net->vio->async_context= actxt; + if (my_net_init(net, net->vio)) { vio_delete(net->vio); @@ -3077,9 +3143,34 @@ static void mysql_fix_pointers(MYSQL* mysql, MYSQL* old_mysql) #endif +struct my_hook_data { + MYSQL *orig_mysql; + MYSQL *new_mysql; + /* This is always NULL currently, but restoring does not hurt just in case. */ + Vio *orig_vio; +}; +/* + Callback hook to make the new VIO accessible via the old MYSQL to calling + application when suspending a non-blocking call during automatic reconnect. +*/ +static void +my_suspend_hook(my_bool suspend, void *data) +{ + struct my_hook_data *hook_data= (struct my_hook_data *)data; + if (suspend) + { + hook_data->orig_vio= hook_data->orig_mysql->net.vio; + hook_data->orig_mysql->net.vio= hook_data->new_mysql->net.vio; + } + else + hook_data->orig_mysql->net.vio= hook_data->orig_vio; +} + my_bool mysql_reconnect(MYSQL *mysql) { MYSQL tmp_mysql; + struct my_hook_data hook_data; + struct mysql_async_context *ctxt= NULL; DBUG_ENTER("mysql_reconnect"); DBUG_ASSERT(mysql); DBUG_PRINT("enter", ("mysql->reconnect: %d", mysql->reconnect)); @@ -3093,14 +3184,34 @@ my_bool mysql_reconnect(MYSQL *mysql) DBUG_RETURN(1); } mysql_init(&tmp_mysql); + tmp_mysql.extension= mysql->extension; tmp_mysql.options= mysql->options; tmp_mysql.options.my_cnf_file= tmp_mysql.options.my_cnf_group= 0; tmp_mysql.rpl_pivot= mysql->rpl_pivot; + /* + If we are automatically re-connecting inside a non-blocking API call, we + may need to suspend and yield to the user application during the reconnect. + If so, the user application will need access to the new VIO already then + so that it can correctly wait for I/O to become ready. + To achieve this, we temporarily install a hook that will temporarily put in + the VIO while we are suspended. + (The vio will be put in the original MYSQL permanently once we successfully + reconnect, or be discarded if we fail to reconnect.) + */ + if ((ctxt= mysql_extension_get(mysql, async_context)) && ctxt->active) + { + hook_data.orig_mysql= mysql; + hook_data.new_mysql= &tmp_mysql; + hook_data.orig_vio= mysql->net.vio; + my_context_install_suspend_resume_hook(ctxt, my_suspend_hook, &hook_data); + } if (!mysql_real_connect(&tmp_mysql,mysql->host,mysql->user,mysql->passwd, mysql->db, mysql->port, mysql->unix_socket, mysql->client_flag | CLIENT_REMEMBER_OPTIONS)) { + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); mysql->net.last_errno= tmp_mysql.net.last_errno; strmov(mysql->net.last_error, tmp_mysql.net.last_error); strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate); @@ -3109,13 +3220,18 @@ my_bool mysql_reconnect(MYSQL *mysql) if (mysql_set_character_set(&tmp_mysql, mysql->charset->csname)) { DBUG_PRINT("error", ("mysql_set_character_set() failed")); + tmp_mysql.extension= NULL; bzero((char*) &tmp_mysql.options,sizeof(tmp_mysql.options)); mysql_close(&tmp_mysql); + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); mysql->net.last_errno= tmp_mysql.net.last_errno; strmov(mysql->net.last_error, tmp_mysql.net.last_error); strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate); DBUG_RETURN(1); } + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); DBUG_PRINT("info", ("reconnect succeded")); tmp_mysql.reconnect= 1; @@ -3125,7 +3241,11 @@ my_bool mysql_reconnect(MYSQL *mysql) tmp_mysql.stmts= mysql->stmts; mysql->stmts= 0; - /* Don't free options as these are now used in tmp_mysql */ + /* + Don't free options as these are now used in tmp_mysql. + Same with extension. + */ + mysql->extension= NULL; bzero((char*) &mysql->options,sizeof(mysql->options)); mysql->free_me=0; mysql_close(mysql); @@ -3204,6 +3324,21 @@ static void mysql_close_free_options(MYSQL *mysql) } +static void +mysql_close_free_extension(MYSQL *mysql) +{ + if (mysql->extension) + { + if (mysql->extension->async_context) + { + my_context_destroy(&mysql->extension->async_context->async_context); + my_free(mysql->extension->async_context, MYF(0)); + } + my_free(mysql->extension, MYF(0)); + mysql->extension= NULL; + } +} + static void mysql_close_free(MYSQL *mysql) { my_free((uchar*) mysql->host_info,MYF(MY_ALLOW_ZERO_PTR)); @@ -3304,6 +3439,33 @@ void mysql_detach_stmt_list(LIST **stmt_list __attribute__((unused)), (As some clients call this after mysql_real_connect() fails) */ +/* + mysql_close() can actually block, at least in theory, if the socket buffer + is full when sending the COM_QUIT command. + + On the other hand, the latter part of mysql_close() needs to free the stack + used for non-blocking operation of blocking stuff, so that later part can + _not_ be done non-blocking. + + Therefore, mysql_pre_close() is used to run the parts of mysql_close() that + may block. It can be called before mysql_close(), and in that case + mysql_close() is guaranteed not to need to block. +*/ +void mysql_pre_close(MYSQL *mysql) +{ + if (!mysql) + return; + /* If connection is still up, send a QUIT message */ + if (mysql->net.vio != 0) + { + free_old_query(mysql); + mysql->status=MYSQL_STATUS_READY; /* Force command */ + mysql->reconnect=0; + simple_command(mysql,COM_QUIT,(uchar*) 0,0,1); + end_server(mysql); /* Sets mysql->net.vio= 0 */ + } +} + void STDCALL mysql_close(MYSQL *mysql) { DBUG_ENTER("mysql_close"); @@ -3311,16 +3473,9 @@ void STDCALL mysql_close(MYSQL *mysql) if (mysql) /* Some simple safety */ { - /* If connection is still up, send a QUIT message */ - if (mysql->net.vio != 0) - { - free_old_query(mysql); - mysql->status=MYSQL_STATUS_READY; /* Force command */ - mysql->reconnect=0; - simple_command(mysql,COM_QUIT,(uchar*) 0,0,1); - end_server(mysql); /* Sets mysql->net.vio= 0 */ - } + mysql_pre_close(mysql); mysql_close_free_options(mysql); + mysql_close_free_extension(mysql); mysql_close_free(mysql); mysql_detach_stmt_list(&mysql->stmts, "mysql_close"); #ifndef TO_BE_DELETED @@ -3941,3 +4096,12 @@ static int old_password_auth_client(MYSQL_PLUGIN_VIO *vio, MYSQL *mysql) return CR_OK; } + +my_socket STDCALL +mysql_get_socket(const MYSQL *mysql) +{ + if (mysql->net.vio) + return mysql->net.vio->sd; + else + return INVALID_SOCKET; +} diff --git a/sql-common/mysql_async.c b/sql-common/mysql_async.c new file mode 100644 index 00000000000..a8e699e5012 --- /dev/null +++ b/sql-common/mysql_async.c @@ -0,0 +1,1431 @@ +/* + Copyright 2011 Kristian Nielsen + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* + MySQL non-blocking client library functions. +*/ + +#include "my_global.h" +#include "my_sys.h" +#include "mysql.h" +#include "errmsg.h" +#include "sql_common.h" +#include "my_context.h" +#include "violite.h" + + +#ifdef __WIN__ +/* + Windows does not support MSG_DONTWAIT for send()/recv(). So we need to ensure + that the socket is non-blocking at the start of every operation. +*/ +#define WIN_SET_NONBLOCKING(mysql) { \ + my_bool old_mode__; \ + if ((mysql)->net.vio) vio_blocking((mysql)->net.vio, FALSE, &old_mode__); \ + } +#else +#define WIN_SET_NONBLOCKING(mysql) +#endif + +extern struct mysql_async_context *mysql_get_async_context(MYSQL *mysql); + + +void +my_context_install_suspend_resume_hook(struct mysql_async_context *b, + void (*hook)(my_bool, void *), + void *user_data) +{ + b->suspend_resume_hook= hook; + b->suspend_resume_hook_user_data= user_data; +} + + +/* Asynchronous connect(); socket must already be set non-blocking. */ +int +my_connect_async(struct mysql_async_context *b, my_socket fd, + const struct sockaddr *name, uint namelen, uint timeout) +{ + int res; +#ifdef __WIN__ + int s_err_size; +#else + socklen_t s_err_size; +#endif + + /* + Start to connect asynchronously. + If this will block, we suspend the call and return control to the + application context. The application will then resume us when the socket + polls ready for write, indicating that the connection attempt completed. + */ + res= connect(fd, name, namelen); +#ifdef __WIN__ + if (res != 0) + { + int wsa_err= WSAGetLastError(); + if (wsa_err != WSAEWOULDBLOCK) + return res; +#else + if (res < 0) + { + if (errno != EINPROGRESS && errno != EALREADY && errno != EAGAIN) + return res; +#endif + b->timeout_value= timeout; + b->ret_status= MYSQL_WAIT_WRITE | + (timeout ? MYSQL_WAIT_TIMEOUT : 0); +#ifdef __WIN__ + b->ret_status|= MYSQL_WAIT_EXCEPT; +#endif + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + if (b->ret_status & MYSQL_WAIT_TIMEOUT) + return -1; + + s_err_size= sizeof(int); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*) &res, &s_err_size) != 0) + return -1; + if (res) + { + errno= res; + return -1; + } + } + return res; +} + +ssize_t +my_recv_async(struct mysql_async_context *b, int fd, + unsigned char *buf, size_t size, uint timeout) +{ + ssize_t res; + + for (;;) + { + res= recv(fd, buf, size, +#ifdef __WIN__ + 0 +#else + MSG_DONTWAIT +#endif + ); + if (res >= 0 || +#ifdef __WIN__ + WSAGetLastError() != WSAEWOULDBLOCK +#else + (errno != EAGAIN && errno != EINTR) +#endif + ) + return res; + b->ret_status= MYSQL_WAIT_READ; + if (timeout) + { + b->ret_status|= MYSQL_WAIT_TIMEOUT; + b->timeout_value= timeout; + } + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + if (b->ret_status & MYSQL_WAIT_TIMEOUT) + return -1; + } +} + +ssize_t +my_send_async(struct mysql_async_context *b, int fd, + const unsigned char *buf, size_t size, uint timeout) +{ + ssize_t res; + + for (;;) + { + res= send(fd, buf, size, +#ifdef __WIN__ + 0 +#else + MSG_DONTWAIT +#endif + ); + if (res >= 0 || +#ifdef __WIN__ + WSAGetLastError() != WSAEWOULDBLOCK +#else + (errno != EAGAIN && errno != EINTR) +#endif + ) + return res; + b->ret_status= MYSQL_WAIT_WRITE; + if (timeout) + { + b->ret_status|= MYSQL_WAIT_TIMEOUT; + b->timeout_value= timeout; + } + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + if (b->ret_status & MYSQL_WAIT_TIMEOUT) + return -1; + } +} + + +my_bool +my_poll_read_async(struct mysql_async_context *b, uint timeout) +{ + b->ret_status= MYSQL_WAIT_READ | MYSQL_WAIT_TIMEOUT; + b->timeout_value= timeout; + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + return (b->ret_status & MYSQL_WAIT_READ) ? 0 : 1; +} + + +#ifdef HAVE_OPENSSL +int +my_ssl_read_async(struct mysql_async_context *b, SSL *ssl, + void *buf, int size) +{ + int res, ssl_err; + + for (;;) + { + res= SSL_read(ssl, buf, size); + if (res >= 0) + return res; + ssl_err= SSL_get_error(ssl, res); + if (ssl_err == SSL_ERROR_WANT_READ) + b->ret_status= MYSQL_WAIT_READ; + else if (ssl_err == SSL_ERROR_WANT_WRITE) + b->ret_status= MYSQL_WAIT_WRITE; + else + return res; + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + } +} + +int +my_ssl_write_async(struct mysql_async_context *b, SSL *ssl, + const void *buf, int size) +{ + int res, ssl_err; + + for (;;) + { + res= SSL_write(ssl, buf, size); + if (res >= 0) + return res; + ssl_err= SSL_get_error(ssl, res); + if (ssl_err == SSL_ERROR_WANT_READ) + b->ret_status= MYSQL_WAIT_READ; + else if (ssl_err == SSL_ERROR_WANT_WRITE) + b->ret_status= MYSQL_WAIT_WRITE; + else + return res; + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + } +} +#endif /* HAVE_OPENSSL */ + +unsigned int STDCALL +mysql_get_timeout_value(const MYSQL *mysql) +{ + if (mysql->extension && mysql->extension->async_context) + return mysql->extension->async_context->timeout_value; + else + return 0; +} + +/* + Now create non-blocking definitions for all the calls that may block. + + Each call FOO gives rise to FOO_start() that prepares the MYSQL object for + doing non-blocking calls that can suspend operation mid-way, and then starts + the call itself. And a FOO_start_internal trampoline to assist with running + the real call in a co-routine that can be suspended. And a FOO_cont() that + can continue a suspended operation. +*/ + +#define MK_ASYNC_CALLS(call__, decl_args__, invoke_args__, cont_arg__, mysql_val__, parms_mysql_val__, parms_assign__, ret_type__, err_val__, ok_val__, extra1__) \ +static void \ +call__ ## _start_internal(void *d) \ +{ \ + struct call__ ## _params *parms; \ + ret_type__ ret; \ + struct mysql_async_context *b; \ + \ + parms= (struct call__ ## _params *)d; \ + b= (parms_mysql_val__)->extension->async_context; \ + \ + ret= call__ invoke_args__; \ + b->ret_result. ok_val__ = ret; \ + b->ret_status= 0; \ +} \ +int STDCALL \ +call__ ## _start decl_args__ \ +{ \ + int res; \ + struct mysql_async_context *b; \ + struct call__ ## _params parms; \ + \ + extra1__ \ + if (!(b= mysql_get_async_context((mysql_val__)))) \ + { \ + *ret= err_val__; \ + return 0; \ + } \ + parms_assign__ \ + \ + b->active= 1; \ + res= my_context_spawn(&b->async_context, call__ ## _start_internal, &parms);\ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + *ret= err_val__; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + b->suspended= 1; \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + *ret= b->ret_result. ok_val__; \ + return 0; \ + } \ +} \ +int STDCALL \ +call__ ## _cont(ret_type__ *ret, cont_arg__, int ready_status) \ +{ \ + int res; \ + struct mysql_async_context *b; \ + \ + b= (mysql_val__)->extension->async_context; \ + if (!b || !b->suspended) \ + { \ + set_mysql_error((mysql_val__), CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate);\ + *ret= err_val__; \ + return 0; \ + } \ + \ + b->active= 1; \ + b->ret_status= ready_status; \ + res= my_context_continue(&b->async_context); \ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + *ret= err_val__; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + *ret= b->ret_result. ok_val__; \ + return 0; \ + } \ +} + +#define MK_ASYNC_CALLS_VOID_RETURN(call__, decl_args__, invoke_args__, cont_arg__, mysql_val__, parms_mysql_val__, parms_assign__, extra1__) \ +static void \ +call__ ## _start_internal(void *d) \ +{ \ + struct call__ ## _params *parms; \ + struct mysql_async_context *b; \ + \ + parms= (struct call__ ## _params *)d; \ + b= (parms_mysql_val__)->extension->async_context; \ + \ + call__ invoke_args__; \ + b->ret_status= 0; \ +} \ +int STDCALL \ +call__ ## _start decl_args__ \ +{ \ + int res; \ + struct mysql_async_context *b; \ + struct call__ ## _params parms; \ + \ + extra1__ \ + if (!(b= mysql_get_async_context((mysql_val__)))) \ + { \ + return 0; \ + } \ + parms_assign__ \ + \ + b->active= 1; \ + res= my_context_spawn(&b->async_context, call__ ## _start_internal, &parms);\ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + b->suspended= 1; \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + return 0; \ + } \ +} \ +int STDCALL \ +call__ ## _cont(cont_arg__, int ready_status) \ +{ \ + int res; \ + struct mysql_async_context *b; \ + \ + b= (mysql_val__)->extension->async_context; \ + if (!b || !b->suspended) \ + { \ + set_mysql_error((mysql_val__), CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate);\ + return 0; \ + } \ + \ + b->active= 1; \ + b->ret_status= ready_status; \ + res= my_context_continue(&b->async_context); \ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + return 0; \ + } \ +} + +struct mysql_real_connect_params { + MYSQL *mysql; + const char *host; + const char *user; + const char *passwd; + const char *db; + unsigned int port; + const char *unix_socket; + unsigned long client_flags; +}; +MK_ASYNC_CALLS( + mysql_real_connect, + (MYSQL **ret, MYSQL *mysql, const char *host, const char *user, + const char *passwd, const char *db, unsigned int port, + const char *unix_socket, unsigned long client_flags), + (parms->mysql, parms->host, parms->user, parms->passwd, parms->db, + parms->port, parms->unix_socket, parms->client_flags), + MYSQL *mysql, + mysql, + parms->mysql, + { + parms.mysql= mysql; + parms.host= host; + parms.user= user; + parms.passwd= passwd; + parms.db= db; + parms.port= port; + parms.unix_socket= unix_socket; + parms.client_flags= client_flags; + }, + MYSQL *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_real_query_params { + MYSQL *mysql; + const char *stmt_str; + unsigned long length; +}; +MK_ASYNC_CALLS( + mysql_real_query, + (int *ret, MYSQL *mysql, const char *stmt_str, unsigned long length), + (parms->mysql, parms->stmt_str, parms->length), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.stmt_str= stmt_str; + parms.length= length; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_fetch_row_params { + MYSQL_RES *result; +}; +MK_ASYNC_CALLS( + mysql_fetch_row, + (MYSQL_ROW *ret, MYSQL_RES *result), + (parms->result), + MYSQL_RES *result, + result->handle, + parms->result->handle, + { + WIN_SET_NONBLOCKING(result->handle) + parms.result= result; + }, + MYSQL_ROW, + NULL, + r_ptr, + /* + If we already fetched all rows from server (eg. mysql_store_result()), + then result->handle will be NULL and we cannot suspend. But that is fine, + since in this case mysql_fetch_row cannot block anyway. Just return + directly. + */ + if (!result->handle) + { + *ret= mysql_fetch_row(result); + return 0; + } +) + +struct mysql_set_character_set_params { + MYSQL *mysql; + const char *csname; +}; +MK_ASYNC_CALLS( + mysql_set_character_set, + (int *ret, MYSQL *mysql, const char *csname), + (parms->mysql, parms->csname), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.csname= csname; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_select_db_params { + MYSQL *mysql; + const char *db; +}; +MK_ASYNC_CALLS( + mysql_select_db, + (int *ret, MYSQL *mysql, const char *db), + (parms->mysql, parms->db), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.db= db; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_send_query_params { + MYSQL *mysql; + const char *q; + unsigned long length; +}; +MK_ASYNC_CALLS( + mysql_send_query, + (int *ret, MYSQL *mysql, const char *q, unsigned long length), + (parms->mysql, parms->q, parms->length), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.q= q; + parms.length= length; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_store_result_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_store_result, + (MYSQL_RES **ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + MYSQL_RES *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_free_result_params { + MYSQL_RES *result; +}; +MK_ASYNC_CALLS_VOID_RETURN( + mysql_free_result, + (MYSQL_RES *result), + (parms->result), + MYSQL_RES *result, + result->handle, + parms->result->handle, + { + WIN_SET_NONBLOCKING(result->handle) + parms.result= result; + }, + /* + mysql_free_result() can have NULL in result->handle (this happens when all + rows have been fetched and mysql_fetch_row() returned NULL.) + So we cannot suspend, but it does not matter, as in this case + mysql_free_result() cannot block. + It is also legitimate to have NULL result, which will do nothing. + */ + if (!result || !result->handle) + { + mysql_free_result(result); + return 0; + }) + +struct mysql_pre_close_params { + MYSQL *sock; +}; +/* + We need special handling for mysql_close(), as the first part may block, + while the last part needs to free our extra library context stack. + + So we do the first part (mysql_pre_close()) non-blocking, but the last part + blocking. +*/ +extern void mysql_pre_close(MYSQL *mysql); +MK_ASYNC_CALLS_VOID_RETURN( + mysql_pre_close, + (MYSQL *sock), + (parms->sock), + MYSQL *sock, + sock, + parms->sock, + { + WIN_SET_NONBLOCKING(sock) + parms.sock= sock; + }, + /* Nothing */) +int STDCALL +mysql_close_start(MYSQL *sock) +{ + int res; + + /* It is legitimate to have NULL sock argument, which will do nothing. */ + if (sock) + { + res= mysql_pre_close_start(sock); + /* If we need to block, return now and do the rest in mysql_close_cont(). */ + if (res) + return res; + } + mysql_close(sock); + return 0; +} +int STDCALL +mysql_close_cont(MYSQL *sock, int ready_status) +{ + int res; + + res= mysql_pre_close_cont(sock, ready_status); + if (res) + return res; + mysql_close(sock); + return 0; +} + +#ifdef USE_OLD_FUNCTIONS +struct mysql_connect_params { + MYSQL *mysql; + const char *host; + const char *user; + const char *passwd; +}; +MK_ASYNC_CALLS( + mysql_connect, + (MYSQL **ret, MYSQL *mysql, const char *host, const char *user, const char *passwd), + (parms->mysql, parms->host, parms->user, parms->passwd), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.host= host; + parms.user= user; + parms.passwd= passwd; + }, + MYSQL *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_create_db_params { + MYSQL *mysql; + const char *DB; +}; +MK_ASYNC_CALLS( + mysql_create_db, + (int *ret, MYSQL *mysql, const char *DB), + (parms->mysql, parms->DB), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.DB= DB; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_drop_db_params { + MYSQL *mysql; + const char *DB; +}; +MK_ASYNC_CALLS( + mysql_drop_db, + (int *ret, MYSQL *mysql, const char *DB), + (parms->mysql, parms->DB), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.DB= DB; + }, + int, + 1, + r_int, + /* Nothing */) + +#endif + +/* + These following are not available inside the server (neither blocking or + non-blocking). +*/ +#ifndef MYSQL_SERVER +struct mysql_change_user_params { + MYSQL *mysql; + const char *user; + const char *passwd; + const char *db; +}; +MK_ASYNC_CALLS( + mysql_change_user, + (my_bool *ret, MYSQL *mysql, const char *user, const char *passwd, const char *db), + (parms->mysql, parms->user, parms->passwd, parms->db), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.user= user; + parms.passwd= passwd; + parms.db= db; + }, + my_bool, + TRUE, + r_my_bool, + /* Nothing */) + +struct mysql_query_params { + MYSQL *mysql; + const char *q; +}; +MK_ASYNC_CALLS( + mysql_query, + (int *ret, MYSQL *mysql, const char *q), + (parms->mysql, parms->q), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.q= q; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_shutdown_params { + MYSQL *mysql; + enum mysql_enum_shutdown_level shutdown_level; +}; +MK_ASYNC_CALLS( + mysql_shutdown, + (int *ret, MYSQL *mysql, enum mysql_enum_shutdown_level shutdown_level), + (parms->mysql, parms->shutdown_level), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.shutdown_level= shutdown_level; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_dump_debug_info_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_dump_debug_info, + (int *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_refresh_params { + MYSQL *mysql; + unsigned int refresh_options; +}; +MK_ASYNC_CALLS( + mysql_refresh, + (int *ret, MYSQL *mysql, unsigned int refresh_options), + (parms->mysql, parms->refresh_options), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.refresh_options= refresh_options; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_kill_params { + MYSQL *mysql; + unsigned long pid; +}; +MK_ASYNC_CALLS( + mysql_kill, + (int *ret, MYSQL *mysql, unsigned long pid), + (parms->mysql, parms->pid), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.pid= pid; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_set_server_option_params { + MYSQL *mysql; + enum enum_mysql_set_option option; +}; +MK_ASYNC_CALLS( + mysql_set_server_option, + (int *ret, MYSQL *mysql, enum enum_mysql_set_option option), + (parms->mysql, parms->option), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.option= option; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_ping_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_ping, + (int *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + int, + 1, + r_int, + /* Nothing */) + +struct mysql_stat_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_stat, + (const char **ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + const char *, + NULL, + r_const_ptr, + /* Nothing */) + +struct mysql_list_dbs_params { + MYSQL *mysql; + const char *wild; +}; +MK_ASYNC_CALLS( + mysql_list_dbs, + (MYSQL_RES **ret, MYSQL *mysql, const char *wild), + (parms->mysql, parms->wild), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.wild= wild; + }, + MYSQL_RES *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_list_tables_params { + MYSQL *mysql; + const char *wild; +}; +MK_ASYNC_CALLS( + mysql_list_tables, + (MYSQL_RES **ret, MYSQL *mysql, const char *wild), + (parms->mysql, parms->wild), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.wild= wild; + }, + MYSQL_RES *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_list_processes_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_list_processes, + (MYSQL_RES **ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + MYSQL_RES *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_list_fields_params { + MYSQL *mysql; + const char *table; + const char *wild; +}; +MK_ASYNC_CALLS( + mysql_list_fields, + (MYSQL_RES **ret, MYSQL *mysql, const char *table, const char *wild), + (parms->mysql, parms->table, parms->wild), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.table= table; + parms.wild= wild; + }, + MYSQL_RES *, + NULL, + r_ptr, + /* Nothing */) + +struct mysql_read_query_result_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_read_query_result, + (my_bool *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + my_bool, + TRUE, + r_my_bool, + /* Nothing */) + +struct mysql_stmt_prepare_params { + MYSQL_STMT *stmt; + const char *query; + unsigned long length; +}; +MK_ASYNC_CALLS( + mysql_stmt_prepare, + (int *ret, MYSQL_STMT *stmt, const char *query, unsigned long length), + (parms->stmt, parms->query, parms->length), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + parms.query= query; + parms.length= length; + }, + int, + 1, + r_int, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_prepare(stmt, query, length); + return 0; + }) + +struct mysql_stmt_execute_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_execute, + (int *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + int, + 1, + r_int, + /* + If eg. mysql_change_user(), stmt->mysql will be NULL. + In this case, we cannot block. + */ + if (!stmt->mysql) + { + *ret= mysql_stmt_execute(stmt); + return 0; + }) + +struct mysql_stmt_fetch_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_fetch, + (int *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + int, + 1, + r_int, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_fetch(stmt); + return 0; + }) + +struct mysql_stmt_store_result_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_store_result, + (int *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + int, + 1, + r_int, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_store_result(stmt); + return 0; + }) + +struct mysql_stmt_close_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_close, + (my_bool *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + my_bool, + TRUE, + r_my_bool, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_close(stmt); + return 0; + }) + +struct mysql_stmt_reset_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_reset, + (my_bool *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + my_bool, + TRUE, + r_my_bool, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_reset(stmt); + return 0; + }) + +struct mysql_stmt_free_result_params { + MYSQL_STMT *stmt; +}; +MK_ASYNC_CALLS( + mysql_stmt_free_result, + (my_bool *ret, MYSQL_STMT *stmt), + (parms->stmt), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + }, + my_bool, + TRUE, + r_my_bool, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_free_result(stmt); + return 0; + }) + +struct mysql_stmt_send_long_data_params { + MYSQL_STMT *stmt; + unsigned int param_number; + const char *data; + unsigned long length; +}; +MK_ASYNC_CALLS( + mysql_stmt_send_long_data, + (my_bool *ret, MYSQL_STMT *stmt, unsigned int param_number, const char *data, unsigned long length), + (parms->stmt, parms->param_number, parms->data, parms->length), + MYSQL_STMT *stmt, + stmt->mysql, + parms->stmt->mysql, + { + WIN_SET_NONBLOCKING(stmt->mysql) + parms.stmt= stmt; + parms.param_number= param_number; + parms.data= data; + parms.length= length; + }, + my_bool, + TRUE, + r_my_bool, + /* If stmt->mysql==NULL then we will not block so can call directly. */ + if (!stmt->mysql) + { + *ret= mysql_stmt_send_long_data(stmt, param_number, data, length); + return 0; + }) + +struct mysql_commit_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_commit, + (my_bool *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + my_bool, + TRUE, + r_my_bool, + /* Nothing */) + +struct mysql_rollback_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_rollback, + (my_bool *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + my_bool, + TRUE, + r_my_bool, + /* Nothing */) + +struct mysql_autocommit_params { + MYSQL *mysql; + my_bool auto_mode; +}; +MK_ASYNC_CALLS( + mysql_autocommit, + (my_bool *ret, MYSQL *mysql, my_bool auto_mode), + (parms->mysql, parms->auto_mode), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + parms.auto_mode= auto_mode; + }, + my_bool, + TRUE, + r_my_bool, + /* Nothing */) + +struct mysql_next_result_params { + MYSQL *mysql; +}; +MK_ASYNC_CALLS( + mysql_next_result, + (int *ret, MYSQL *mysql), + (parms->mysql), + MYSQL *mysql, + mysql, + parms->mysql, + { + WIN_SET_NONBLOCKING(mysql) + parms.mysql= mysql; + }, + int, + 1, + r_int, + /* Nothing */) +#endif + + +/* + The following functions can newer block, and so do not have special + non-blocking versions: + + mysql_num_rows() + mysql_num_fields() + mysql_eof() + mysql_fetch_field_direct() + mysql_fetch_fields() + mysql_row_tell() + mysql_field_tell() + mysql_field_count() + mysql_affected_rows() + mysql_insert_id() + mysql_errno() + mysql_error() + mysql_sqlstate() + mysql_warning_count() + mysql_info() + mysql_thread_id() + mysql_character_set_name() + mysql_init() + mysql_ssl_set() + mysql_get_ssl_cipher() + mysql_use_result() + mysql_get_character_set_info() + mysql_set_local_infile_handler() + mysql_set_local_infile_default() + mysql_get_server_info() + mysql_get_server_name() + mysql_get_client_info() + mysql_get_client_version() + mysql_get_host_info() + mysql_get_server_version() + mysql_get_proto_info() + mysql_options() + mysql_data_seek() + mysql_row_seek() + mysql_field_seek() + mysql_fetch_lengths() + mysql_fetch_field() + mysql_escape_string() + mysql_hex_string() + mysql_real_escape_string() + mysql_debug() + myodbc_remove_escape() + mysql_thread_safe() + mysql_embedded() + mariadb_connection() + mysql_stmt_init() + mysql_stmt_fetch_column() + mysql_stmt_param_count() + mysql_stmt_attr_set() + mysql_stmt_attr_get() + mysql_stmt_bind_param() + mysql_stmt_bind_result() + mysql_stmt_result_metadata() + mysql_stmt_param_metadata() + mysql_stmt_errno() + mysql_stmt_error() + mysql_stmt_sqlstate() + mysql_stmt_row_seek() + mysql_stmt_row_tell() + mysql_stmt_data_seek() + mysql_stmt_num_rows() + mysql_stmt_affected_rows() + mysql_stmt_insert_id() + mysql_stmt_field_count() + mysql_more_results() + mysql_get_socket() + mysql_get_timeout_value() +*/ diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index cf60b888229..e9c606646a2 100755 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -53,6 +53,7 @@ SET (SQL_SOURCE log_event_old.cc rpl_record_old.cc message.h mf_iocache.cc my_decimal.cc ../sql-common/my_time.c mysqld.cc net_serv.cc ../sql-common/client_plugin.c + ../sql-common/mysql_async.c nt_servc.cc nt_servc.h opt_range.cc opt_range.h opt_sum.cc ../sql-common/pack.c parse_file.cc password.c procedure.cc protocol.cc records.cc repl_failsafe.cc rpl_filter.cc set_var.cc diff --git a/sql/Makefile.am b/sql/Makefile.am index bfd136b978f..59c4fb9a620 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -127,7 +127,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ sql_servers.cc event_parse_data.cc \ opt_table_elimination.cc create_options.cc -nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c client_plugin.c +nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c \ + my_user.c client_plugin.c mysql_async.c libndb_la_CPPFLAGS= @ndbcluster_includes@ libndb_la_SOURCES= ha_ndbcluster.cc \ @@ -169,6 +170,8 @@ link_sources: @LN_CP_F@ $(top_srcdir)/sql-common/pack.c pack.c rm -f client.c @LN_CP_F@ $(top_srcdir)/sql-common/client.c client.c + rm -f mysql_async.c + @LN_CP_F@ $(top_srcdir)/sql-common/mysql_async.c mysql_async.c rm -f client_plugin.c @LN_CP_F@ $(top_srcdir)/sql-common/client_plugin.c client_plugin.c rm -f my_time.c diff --git a/tests/Makefile.am b/tests/Makefile.am index 5283edacf25..3610d97135f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,6 +24,7 @@ else LIBMYSQLCLIENT_LA = $(top_builddir)/libmysql/libmysqlclient.la endif +noinst_HEADERS = nonblock-wrappers.h EXTRA_DIST = auto_increment.res auto_increment.tst \ function.res function.tst lock_test.pl lock_test.res \ export.pl big_record.pl \ @@ -36,6 +37,9 @@ EXTRA_DIST = auto_increment.res auto_increment.tst \ bin_PROGRAMS = mysql_client_test noinst_PROGRAMS = insert_test select_test thread_test bug25714 +if HAVE_LIBEVENT +noinst_PROGRAMS += async_queries +endif INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \ $(openssl_includes) @@ -52,6 +56,10 @@ select_test_SOURCES= select_test.c insert_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) select_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) +async_queries_SOURCES= async_queries.c +async_queries_CFLAGS= $(AM_CFLAGS) @libevent_includes@ +async_queries_LDADD= $(LDADD) @libevent_libs@ + bug25714_SOURCES= bug25714.c bug25714_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) diff --git a/tests/async_queries.c b/tests/async_queries.c new file mode 100644 index 00000000000..677208f11f5 --- /dev/null +++ b/tests/async_queries.c @@ -0,0 +1,435 @@ +/* + Copyright 2011 Kristian Nielsen and Monty Program Ab. + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + + +/* + Run a set of queries in parallel against a server using the non-blocking + API, and compare to running same queries with the normal blocking API. +*/ + +#include <sys/time.h> +#include <event.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <my_global.h> +#include <my_sys.h> +#include <mysql.h> +#include <my_getopt.h> + + +#define SL(s) (s), sizeof(s) +static const char *my_groups[]= { "client", NULL }; + +/* Maintaining a list of queries to run. */ +struct query_entry { + struct query_entry *next; + char *query; + int index; +}; +static struct query_entry *query_list; +static struct query_entry **tail_ptr= &query_list; +static int query_counter= 0; + + +/* State kept for each connection. */ +struct state_data { + int ST; /* State machine current state */ + struct event ev_mysql; + MYSQL mysql; + MYSQL_RES *result; + MYSQL *ret; + int err; + MYSQL_ROW row; + struct query_entry *query_element; + int index; +}; + + +static const char *opt_db= NULL; +static const char *opt_user= NULL; +static const char *opt_password= NULL; +static int tty_password= 0; +static const char *opt_host= NULL; +static const char *opt_socket= NULL; +static unsigned int opt_port= 0; +static unsigned int opt_connections= 5; +static const char *opt_query_file= NULL; + +static struct my_option options[] = +{ + {"database", 'D', "Database to use", &opt_db, &opt_db, + 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"help", '?', "Display this help and exit", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, + 0, 0, 0, 0, 0}, + {"host", 'h', "Connect to host", &opt_host, &opt_host, + 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"password", 'p', + "Password to use when connecting to server. If password is not given it's asked from the tty.", + 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, + {"port", 'P', "Port number to use for connection.", + &opt_port, &opt_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"socket", 'S', "Socket file to use for connection", + &opt_socket, &opt_socket, 0, GET_STR, + REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"user", 'u', "User for login if not current user", &opt_user, + &opt_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"connections", 'n', "Number of simultaneous connections/queries.", + &opt_connections, &opt_connections, 0, GET_UINT, REQUIRED_ARG, + 0, 0, 0, 0, 0, 0}, + {"queryfile", 'q', "Name of file containing extra queries to run", + &opt_query_file, &opt_query_file, 0, GET_STR, REQUIRED_ARG, + 0, 0, 0, 0, 0, 0}, + { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} +}; + +static void +fatal(struct state_data *sd, const char *msg) +{ + fprintf(stderr, "%s: %s\n", msg, (sd ? mysql_error(&sd->mysql) : "")); + exit(1); +} + + +static void state_machine_handler(int fd, short event, void *arg); + +static void +next_event(int new_st, int status, struct state_data *sd) +{ + short wait_event= 0; + struct timeval tv, *ptv; + int fd; + + if (status & MYSQL_WAIT_READ) + wait_event|= EV_READ; + if (status & MYSQL_WAIT_WRITE) + wait_event|= EV_WRITE; + if (wait_event) + fd= mysql_get_socket(&sd->mysql); + else + fd= -1; + if (status & MYSQL_WAIT_TIMEOUT) + { + tv.tv_sec= mysql_get_timeout_value(&sd->mysql); + tv.tv_usec= 0; + ptv= &tv; + } + else + ptv= NULL; + event_set(&sd->ev_mysql, fd, wait_event, state_machine_handler, sd); + event_add(&sd->ev_mysql, ptv); + sd->ST= new_st; +} + +static int +mysql_status(short event) +{ + int status= 0; + if (event & EV_READ) + status|= MYSQL_WAIT_READ; + if (event & EV_WRITE) + status|= MYSQL_WAIT_WRITE; + if (event & EV_TIMEOUT) + status|= MYSQL_WAIT_TIMEOUT; + return status; +} + + +static int num_active_connections; + +/* Shortcut for going to new state immediately without waiting. */ +#define NEXT_IMMEDIATE(sd_, new_st) do { sd_->ST= new_st; goto again; } while (0) + +static void +state_machine_handler(int fd __attribute__((unused)), short event, void *arg) +{ + struct state_data *sd= arg; + int status; + +again: + switch(sd->ST) + { + case 0: + /* Initial state, start making the connection. */ + status= mysql_real_connect_start(&sd->ret, &sd->mysql, opt_host, opt_user, opt_password, opt_db, opt_port, opt_socket, 0); + if (status) + /* Wait for connect to complete. */ + next_event(1, status, sd); + else + NEXT_IMMEDIATE(sd, 9); + break; + + case 1: + status= mysql_real_connect_cont(&sd->ret, &sd->mysql, mysql_status(event)); + if (status) + next_event(1, status, sd); + else + NEXT_IMMEDIATE(sd, 9); + break; + + case 9: + if (!sd->ret) + fatal(sd, "Failed to mysql_real_connect()"); + NEXT_IMMEDIATE(sd, 10); + break; + + case 10: + /* Now run the next query. */ + sd->query_element= query_list; + if (!sd->query_element) + { + /* No more queries, end the connection. */ + NEXT_IMMEDIATE(sd, 40); + } + query_list= query_list->next; + + sd->index= sd->query_element->index; + printf("%d ! %s\n", sd->index, sd->query_element->query); + status= mysql_real_query_start(&sd->err, &sd->mysql, sd->query_element->query, + strlen(sd->query_element->query)); + if (status) + next_event(11, status, sd); + else + NEXT_IMMEDIATE(sd, 20); + break; + + case 11: + status= mysql_real_query_cont(&sd->err, &sd->mysql, mysql_status(event)); + if (status) + next_event(11, status, sd); + else + NEXT_IMMEDIATE(sd, 20); + break; + + case 20: + free(sd->query_element->query); + free(sd->query_element); + if (sd->err) + { + printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql)); + NEXT_IMMEDIATE(sd, 10); + } + else + { + sd->result= mysql_use_result(&sd->mysql); + if (!sd->result) + fatal(sd, "mysql_use_result() returns error"); + NEXT_IMMEDIATE(sd, 30); + } + break; + + case 30: + status= mysql_fetch_row_start(&sd->row, sd->result); + if (status) + next_event(31, status, sd); + else + NEXT_IMMEDIATE(sd, 39); + break; + + case 31: + status= mysql_fetch_row_cont(&sd->row, sd->result, mysql_status(event)); + if (status) + next_event(31, status, sd); + else + NEXT_IMMEDIATE(sd, 39); + break; + + case 39: + if (sd->row) + { + /* Got a row. */ + unsigned int i; + printf("%d - ", sd->index); + for (i= 0; i < mysql_num_fields(sd->result); i++) + printf("%s%s", (i ? "\t" : ""), (sd->row[i] ? sd->row[i] : "(null)")); + printf ("\n"); + NEXT_IMMEDIATE(sd, 30); + } + else + { + if (mysql_errno(&sd->mysql)) + { + /* An error occured. */ + printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql)); + } + else + { + /* EOF. */ + printf("%d | EOF\n", sd->index); + } + mysql_free_result(sd->result); + NEXT_IMMEDIATE(sd, 10); + } + break; + + case 40: + status= mysql_close_start(&sd->mysql); + if (status) + next_event(41, status, sd); + else + NEXT_IMMEDIATE(sd, 50); + break; + + case 41: + status= mysql_close_cont(&sd->mysql, mysql_status(event)); + if (status) + next_event(41, status, sd); + else + NEXT_IMMEDIATE(sd, 50); + break; + + case 50: + /* We are done! */ + num_active_connections--; + if (num_active_connections == 0) + event_loopbreak(); + break; + + default: + abort(); + } +} + + +void +add_query(const char *q) +{ + struct query_entry *e; + char *q2; + size_t len; + + e= malloc(sizeof(*e)); + q2= strdup(q); + if (!e || !q2) + fatal(NULL, "Out of memory"); + + /* Remove any trailing newline. */ + len= strlen(q2); + if (q2[len] == '\n') + q2[len--]= '\0'; + if (q2[len] == '\r') + q2[len--]= '\0'; + + e->next= NULL; + e->query= q2; + e->index= query_counter++; + *tail_ptr= e; + tail_ptr= &e->next; +} + + +static my_bool +handle_option(int optid, const struct my_option *opt __attribute__((unused)), + char *arg) +{ + switch (optid) + { + case '?': + printf("Usage: async_queries [OPTIONS] query ...\n"); + my_print_help(options); + my_print_variables(options); + exit(0); + break; + + case 'p': + if (arg) + opt_password= arg; + else + tty_password= 1; + break; + } + + return 0; +} + + +int +main(int argc, char *argv[]) +{ + struct state_data *sds; + unsigned int i; + int err; + struct event_base *libevent_base; + + err= handle_options(&argc, &argv, options, handle_option); + if (err) + exit(err); + if (tty_password) + opt_password= get_tty_password(NullS); + + if (opt_query_file) + { + FILE *f= fopen(opt_query_file, "r"); + char buf[65536]; + if (!f) + fatal(NULL, "Cannot open query file"); + while (!feof(f)) + { + if (!fgets(buf, sizeof(buf), f)) + break; + add_query(buf); + } + fclose(f); + } + /* Add extra queries directly on command line. */ + while (argc > 0) + { + --argc; + add_query(*argv++); + } + + sds= malloc(opt_connections * sizeof(*sds)); + if (!sds) + fatal(NULL, "Out of memory"); + + libevent_base= event_init(); + + err= mysql_library_init(argc, argv, (char **)my_groups); + if (err) + { + fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n", err); + exit(1); + } + + num_active_connections= 0; + for (i= 0; i < opt_connections; i++) + { + mysql_init(&sds[i].mysql); + mysql_options(&sds[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries"); + + /* + We put the initial connect call in the first state 0 of the state machine + and run that manually, just to have everything in one place. + */ + sds[i].ST= 0; + num_active_connections++; + state_machine_handler(-1, -1, &sds[i]); + } + + event_dispatch(); + + free(sds); + + mysql_library_end(); + + event_base_free(libevent_base); + + return 0; +} diff --git a/tests/check_async_queries.pl b/tests/check_async_queries.pl new file mode 100644 index 00000000000..b599bc334d3 --- /dev/null +++ b/tests/check_async_queries.pl @@ -0,0 +1,73 @@ +#! /usr/bin/perl + +# Read the output of async_queries.c. Run the queries again serially, using +# the normal (not asynchronous) API. Compare the two results for correctness. + +use strict; +use warnings; + +use DBI; + +my $D= []; + +die "Usage: $0 <host> <user> <password> <database>\n" + unless @ARGV == 4; + +my $dbh= DBI->connect("DBI:mysql:database=$ARGV[3];host=$ARGV[0]", + $ARGV[1], $ARGV[2], + { RaiseError => 1, PrintError => 0 }); + +while (<STDIN>) { + chomp; + if (/^([0-9]+) ! (.*);$/) { + my ($index, $query)= ($1, $2); + $D->[$index]= { QUERY => $query, OUTPUT => [] }; + } elsif (/^([0-9]+) - (.*)$/) { + my ($index, $data)= ($1, $2); + push @{$D->[$index]{OUTPUT}}, $data; + } elsif (/^([0-9]+) \| Error: (.*)$/) { + my ($index, $errmsg)= ($1, $2); + my $rows; + my $res= eval { + my $stm= $dbh->prepare($D->[$index]{QUERY}); + $stm->execute(); + $rows= $stm->fetchall_arrayref(); + 1; + }; + if ($res) { + die "Query $index succeeded, but should have failed with error.\nquery=$D->[$index]{QUERY}\nerror=$errmsg\n"; + } + my $errmsg2= $@; + if ($errmsg2 =~ /^DBD::.*failed: (.*) at .*$/s) { + $errmsg2= $1; + } else { + die "Unexpected DBD error message format: '$errmsg2'\n"; + } + if ($errmsg2 ne $errmsg) { + die "Query $index failed with different error message\nquery=$D->[$index]{QUERY}\nerror1=$errmsg\nerror2=$errmsg2\n"; + } + print "OK $index\n"; + delete $D->[$index]; + } elsif (/^([0-9]+) \| EOF$/) { + my $index= $1; + my $rows; + my $res= eval { + my $stm= $dbh->prepare($D->[$index]{QUERY}); + $stm->execute(); + $rows= $stm->fetchall_arrayref(); + 1; + }; + if (!$res) { + die "Query $index failed, but should have succeeded.\nquery=$D->[$index]{QUERY}\nerror=$@\n"; + } + my $result_string= join("\n", sort @{$D->[$index]{OUTPUT}}); + my $result_string2= join("\n", sort(map(join("\t", map((defined($_) ? $_ : "(null)"), @$_)), @$rows))); + if ($result_string ne $result_string2) { + die "Query $index result difference.\nquery=$D->[$index]{QUERY}\noutput1=\n$$result_string\noutput2=\n$result_string2\n"; + } + delete $D->[$index]; + } else { + die "Unexpected line: '$_'\n"; + } +} +$dbh->disconnect(); diff --git a/tests/mysql_client_test.c b/tests/mysql_client_test.c index c6efdca60f6..1ca70fd972a 100644 --- a/tests/mysql_client_test.c +++ b/tests/mysql_client_test.c @@ -36,6 +36,18 @@ #include <my_handler.h> #include <sql_common.h> +/* + If non_blocking_api_enabled is true, we will re-define all the blocking + API functions as wrappers that call the corresponding non-blocking API + and use poll()/select() to wait for them to complete. This way we can get + a good coverage testing of the non-blocking API as well. +*/ +static my_bool non_blocking_api_enabled= 0; +#if !defined(EMBEDDED_LIBRARY) +#define WRAP_NONBLOCK_ENABLED non_blocking_api_enabled +#include "nonblock-wrappers.h" +#endif + #define VER "2.1" #define MAX_TEST_QUERY_LENGTH 300 /* MAX QUERY BUFFER LENGTH */ #define MAX_KEY MAX_INDEXES @@ -18614,6 +18626,10 @@ static struct my_option client_test_long_options[] = #endif {"vardir", 'v', "Data dir for tests.", (char**) &opt_vardir, (char**) &opt_vardir, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"non-blocking-api", 'n', + "Use the non-blocking client API for communication.", + &non_blocking_api_enabled, &non_blocking_api_enabled, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"getopt-ll-test", 'g', "Option for testing bug in getopt library", &opt_getopt_ll_test, &opt_getopt_ll_test, 0, GET_LL, REQUIRED_ARG, 0, 0, LONGLONG_MAX, 0, 0, 0}, diff --git a/tests/nonblock-wrappers.h b/tests/nonblock-wrappers.h new file mode 100644 index 00000000000..a7346ba641c --- /dev/null +++ b/tests/nonblock-wrappers.h @@ -0,0 +1,514 @@ +/* Copyright (c) 2011 Monty Program Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + Wrappers that re-implement the normal blocking libmysql API calls in terms + of the non-blocking API calls and explicit waiting. + + Used to test the non-blocking calls using mysql_client_test. +*/ + +#ifndef __WIN__ +#include <poll.h> +#else +#include <WinSock2.h> +#endif + +/* + Run the appropriate poll() syscall to wait for the event that libmysql + requested. Return which event(s) occured. +*/ +static int +wait_for_mysql(MYSQL *mysql, int status) +{ +#ifdef __WIN__ + fd_set rs, ws, es; + int res; + struct timeval tv, *timeout; + my_socket s= mysql_get_socket(mysql); + FD_ZERO(&rs); + FD_ZERO(&ws); + FD_ZERO(&es); + if (status & MYSQL_WAIT_READ) + FD_SET(s, &rs); + if (status & MYSQL_WAIT_WRITE) + FD_SET(s, &ws); + if (status & MYSQL_WAIT_EXCEPT) + FD_SET(s, &es); + if (status & MYSQL_WAIT_TIMEOUT) + { + tv.tv_sec= mysql_get_timeout_value(mysql); + tv.tv_usec= 0; + timeout= &tv; + } + else + timeout= NULL; + res= select(1, &rs, &ws, &es, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res == SOCKET_ERROR) + return MYSQL_WAIT_TIMEOUT; + else + { + int status= 0; + if (FD_ISSET(s, &rs)) + status|= MYSQL_WAIT_READ; + if (FD_ISSET(s, &ws)) + status|= MYSQL_WAIT_WRITE; + if (FD_ISSET(s, &es)) + status|= MYSQL_WAIT_EXCEPT; + return status; + } +#else + struct pollfd pfd; + int timeout; + int res; + + pfd.fd= mysql_get_socket(mysql); + pfd.events= + (status & MYSQL_WAIT_READ ? POLLIN : 0) | + (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | + (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); + if (status & MYSQL_WAIT_TIMEOUT) + timeout= 1000*mysql_get_timeout_value(mysql); + else + timeout= -1; + do { + res= poll(&pfd, 1, timeout); + /* + In a real event framework, we should re-compute the timeout on getting + EINTR to account for the time elapsed before the interruption. + */ + } while (res < 0 && errno == EINTR); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res < 0) + return MYSQL_WAIT_TIMEOUT; + else + { + int status= 0; + if (pfd.revents & POLLIN) + status|= MYSQL_WAIT_READ; + if (pfd.revents & POLLOUT) + status|= MYSQL_WAIT_WRITE; + if (pfd.revents & POLLPRI) + status|= MYSQL_WAIT_EXCEPT; + return status; + } +#endif +} + + +/* + If WRAP_NONBLOCK_ENABLED is defined, it is a variable that can be used to + enable or disable the use of non-blocking API wrappers. If true the + non-blocking API will be used, if false the normal blocking API will be + called directly. +*/ +#ifdef WRAP_NONBLOCK_ENABLED +#define USE_BLOCKING(name__, invoke_blocking__) \ + if (!(WRAP_NONBLOCK_ENABLED)) return name__ invoke_blocking__; +#define USE_BLOCKING_VOID_RETURN(name__, invoke__) \ + if (!(WRAP_NONBLOCK_ENABLED)) { name__ invoke__; return; } +#else +#define USE_BLOCKING(name__, invoke_blocking__) +#define USE_BLOCKING_VOID_RETURN(name__, invoke__) +#endif + +/* + I would preferably have declared the wrappers static. + However, if we do so, compilers will warn about definitions not used, and + with -Werror this breaks compilation :-( +*/ +#define MK_WRAPPER(ret_type__, name__, decl__, invoke__, invoke_blocking__, cont_arg__, mysql__) \ +ret_type__ wrap_ ## name__ decl__ \ +{ \ + ret_type__ res; \ + int status; \ + USE_BLOCKING(name__, invoke_blocking__) \ + status= name__ ## _start invoke__; \ + while (status) \ + { \ + status= wait_for_mysql(mysql__, status); \ + status= name__ ## _cont(&res, cont_arg__, status); \ + } \ + return res; \ +} + +#define MK_WRAPPER_VOID_RETURN(name__, decl__, invoke__, cont_arg__, mysql__) \ +void wrap_ ## name__ decl__ \ +{ \ + int status; \ + USE_BLOCKING_VOID_RETURN(name__, invoke__) \ + status= name__ ## _start invoke__; \ + while (status) \ + { \ + status= wait_for_mysql(mysql__, status); \ + status= name__ ## _cont(cont_arg__, status); \ + } \ +} + +MK_WRAPPER( + MYSQL *, + mysql_real_connect, + (MYSQL *mysql, const char *host, const char *user, const char *passwd, const char *db, unsigned int port, const char *unix_socket, unsigned long clientflag), + (&res, mysql, host, user, passwd, db, port, unix_socket, clientflag), + (mysql, host, user, passwd, db, port, unix_socket, clientflag), + mysql, + mysql) + + +MK_WRAPPER( + int, + mysql_real_query, + (MYSQL *mysql, const char *stmt_str, unsigned long length), + (&res, mysql, stmt_str, length), + (mysql, stmt_str, length), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_ROW, + mysql_fetch_row, + (MYSQL_RES *result), + (&res, result), + (result), + result, + result->handle) + +MK_WRAPPER( + int, + mysql_set_character_set, + (MYSQL *mysql, const char *csname), + (&res, mysql, csname), + (mysql, csname), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_select_db, + (MYSQL *mysql, const char *db), + (&res, mysql, db), + (mysql, db), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_send_query, + (MYSQL *mysql, const char *q, unsigned long length), + (&res, mysql, q, length), + (mysql, q, length), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_RES *, + mysql_store_result, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER_VOID_RETURN( + mysql_free_result, + (MYSQL_RES *result), + (result), + result, + result->handle) + +MK_WRAPPER_VOID_RETURN( + mysql_close, + (MYSQL *sock), + (sock), + sock, + sock) + +MK_WRAPPER( + my_bool, + mysql_change_user, + (MYSQL *mysql, const char *user, const char *passwd, const char *db), + (&res, mysql, user, passwd, db), + (mysql, user, passwd, db), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_query, + (MYSQL *mysql, const char *q), + (&res, mysql, q), + (mysql, q), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_shutdown, + (MYSQL *mysql, enum mysql_enum_shutdown_level shutdown_level), + (&res, mysql, shutdown_level), + (mysql, shutdown_level), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_dump_debug_info, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_refresh, + (MYSQL *mysql, unsigned int refresh_options), + (&res, mysql, refresh_options), + (mysql, refresh_options), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_kill, + (MYSQL *mysql, unsigned long pid), + (&res, mysql, pid), + (mysql, pid), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_set_server_option, + (MYSQL *mysql, enum enum_mysql_set_option option), + (&res, mysql, option), + (mysql, option), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_ping, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + const char *, + mysql_stat, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_RES *, + mysql_list_dbs, + (MYSQL *mysql, const char *wild), + (&res, mysql, wild), + (mysql, wild), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_RES *, + mysql_list_tables, + (MYSQL *mysql, const char *wild), + (&res, mysql, wild), + (mysql, wild), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_RES *, + mysql_list_processes, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + MYSQL_RES *, + mysql_list_fields, + (MYSQL *mysql, const char *table, const char *wild), + (&res, mysql, table, wild), + (mysql, table, wild), + mysql, + mysql) + +MK_WRAPPER( + my_bool, + mysql_read_query_result, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_stmt_prepare, + (MYSQL_STMT *stmt, const char *query, unsigned long length), + (&res, stmt, query, length), + (stmt, query, length), + stmt, + stmt->mysql) + +MK_WRAPPER( + int, + mysql_stmt_execute, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + int, + mysql_stmt_fetch, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + int, + mysql_stmt_store_result, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + my_bool, + mysql_stmt_close, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + my_bool, + mysql_stmt_reset, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + my_bool, + mysql_stmt_free_result, + (MYSQL_STMT *stmt), + (&res, stmt), + (stmt), + stmt, + stmt->mysql) + +MK_WRAPPER( + my_bool, + mysql_stmt_send_long_data, + (MYSQL_STMT *stmt, unsigned int param_number, const char *data, unsigned long length), + (&res, stmt, param_number, data, length), + (stmt, param_number, data, length), + stmt, + stmt->mysql) + +MK_WRAPPER( + my_bool, + mysql_commit, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + my_bool, + mysql_rollback, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +MK_WRAPPER( + my_bool, + mysql_autocommit, + (MYSQL *mysql, my_bool auto_mode), + (&res, mysql, auto_mode), + (mysql, auto_mode), + mysql, + mysql) + +MK_WRAPPER( + int, + mysql_next_result, + (MYSQL *mysql), + (&res, mysql), + (mysql), + mysql, + mysql) + +#undef USE_BLOCKING +#undef MK_WRAPPER +#undef MK_WRAPPER_VOID_RETURN + + +#define mysql_real_connect wrap_mysql_real_connect +#define mysql_real_query wrap_mysql_real_query +#define mysql_fetch_row wrap_mysql_fetch_row +#define mysql_set_character_set wrap_mysql_set_character_set +#define mysql_select_db wrap_mysql_select_db +#define mysql_send_query wrap_mysql_send_query +#define mysql_store_result wrap_mysql_store_result +#define mysql_free_result wrap_mysql_free_result +#define mysql_close wrap_mysql_close +#define mysql_change_user wrap_mysql_change_user +#define mysql_query wrap_mysql_query +#define mysql_shutdown wrap_mysql_shutdown +#define mysql_dump_debug_info wrap_mysql_dump_debug_info +#define mysql_refresh wrap_mysql_refresh +#define mysql_kill wrap_mysql_kill +#define mysql_set_server_option wrap_mysql_set_server_option +#define mysql_ping wrap_mysql_ping +#define mysql_stat wrap_mysql_stat +#define mysql_list_dbs wrap_mysql_list_dbs +#define mysql_list_tables wrap_mysql_list_tables +#define mysql_list_processes wrap_mysql_list_processes +#define mysql_list_fields wrap_mysql_list_fields +#define mysql_read_query_result wrap_mysql_read_query_result +#define mysql_stmt_prepare wrap_mysql_stmt_prepare +#define mysql_stmt_execute wrap_mysql_stmt_execute +#define mysql_stmt_fetch wrap_mysql_stmt_fetch +#define mysql_stmt_store_result wrap_mysql_stmt_store_result +#define mysql_stmt_close wrap_mysql_stmt_close +#define mysql_stmt_reset wrap_mysql_stmt_reset +#define mysql_stmt_free_result wrap_mysql_stmt_free_result +#define mysql_stmt_send_long_data wrap_mysql_stmt_send_long_data +#define mysql_commit wrap_mysql_commit +#define mysql_rollback wrap_mysql_rollback +#define mysql_autocommit wrap_mysql_autocommit +#define mysql_next_result wrap_mysql_next_result diff --git a/vio/viosocket.c b/vio/viosocket.c index f780764cbe4..878378bb837 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -21,6 +21,7 @@ */ #include "vio_priv.h" +#include "my_context.h" int vio_errno(Vio *vio __attribute__((unused))) { @@ -31,18 +32,34 @@ int vio_errno(Vio *vio __attribute__((unused))) size_t vio_read(Vio * vio, uchar* buf, size_t size) { size_t r; + extern ssize_t my_recv_async(struct mysql_async_context *b, int fd, + unsigned char *buf, size_t size, uint timeout); DBUG_ENTER("vio_read"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); /* Ensure nobody uses vio_read_buff and vio_read simultaneously */ DBUG_ASSERT(vio->read_end == vio->read_pos); + if (vio->async_context && vio->async_context->active) + r= my_recv_async(vio->async_context, vio->sd, buf, size, vio->read_timeout); + else + { + if (vio->async_context) + { + /* + If switching from non-blocking to blocking API usage, set the socket + back to blocking mode. + */ + my_bool old_mode; + vio_blocking(vio, TRUE, &old_mode); + } #ifdef __WIN__ - r = recv(vio->sd, buf, size,0); + r = recv(vio->sd, buf, size,0); #else - errno=0; /* For linux */ - r = read(vio->sd, buf, size); + errno=0; /* For linux */ + r = read(vio->sd, buf, size); #endif /* __WIN__ */ + } #ifndef DBUG_OFF if (r == (size_t) -1) { @@ -102,14 +119,32 @@ size_t vio_read_buff(Vio *vio, uchar* buf, size_t size) size_t vio_write(Vio * vio, const uchar* buf, size_t size) { size_t r; + extern ssize_t my_send_async(struct mysql_async_context *b, int fd, + const unsigned char *buf, size_t size, + uint timeout); DBUG_ENTER("vio_write"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); + if (vio->async_context && vio->async_context->active) + r= my_send_async(vio->async_context, vio->sd, buf, size, + vio->write_timeout); + else + { + if (vio->async_context) + { + /* + If switching from non-blocking to blocking API usage, set the socket + back to blocking mode. + */ + my_bool old_mode; + vio_blocking(vio, TRUE, &old_mode); + } #ifdef __WIN__ - r = send(vio->sd, buf, size,0); + r = send(vio->sd, buf, size,0); #else - r = write(vio->sd, buf, size); + r = write(vio->sd, buf, size); #endif /* __WIN__ */ + } #ifndef DBUG_OFF if (r == (size_t) -1) { @@ -359,12 +394,17 @@ void vio_in_addr(Vio *vio, struct in_addr *in) my_bool vio_poll_read(Vio *vio,uint timeout) { + extern my_bool my_poll_read_async(struct mysql_async_context *b, + uint timeout); #ifndef HAVE_POLL #if __WIN__ int res; struct fd_set fds; struct timeval tv; DBUG_ENTER("vio_poll"); + + if (vio->async_context && vio->async_context->active) + DBUG_RETURN(my_poll_read_async(vio->async_context, timeout)); fds.fd_count= 1; fds.fd_array[0]= vio->sd; tv.tv_sec= timeout; @@ -372,12 +412,16 @@ my_bool vio_poll_read(Vio *vio,uint timeout) res= select(1, &fds, NULL, NULL, &tv) ? 0 : 1; DBUG_RETURN(res); #else + if (vio->async_context && vio->async_context->active) + return my_poll_read_async(vio->async_context, timeout); return 0; #endif #else struct pollfd fds; int res; DBUG_ENTER("vio_poll"); + if (vio->async_context && vio->async_context->active) + DBUG_RETURN(my_poll_read_async(vio->async_context, timeout)); fds.fd=vio->sd; fds.events=POLLIN; fds.revents=0; @@ -425,6 +469,11 @@ void vio_timeout(Vio *vio, uint which, uint timeout) thr_alarm or just run without read/write timeout(s) */ #endif + /* Make timeout values available for async operations. */ + if (which) + vio->write_timeout= timeout; + else + vio->read_timeout= timeout; } diff --git a/vio/viossl.c b/vio/viossl.c index 61e4d9406a7..58e4089fcd8 100644 --- a/vio/viossl.c +++ b/vio/viossl.c @@ -21,6 +21,7 @@ */ #include "vio_priv.h" +#include "my_context.h" #ifdef HAVE_OPENSSL @@ -90,11 +91,16 @@ report_errors(SSL* ssl) size_t vio_ssl_read(Vio *vio, uchar* buf, size_t size) { size_t r; + extern int my_ssl_read_async(struct mysql_async_context *b, SSL *ssl, + void *buf, int size); DBUG_ENTER("vio_ssl_read"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u ssl: 0x%lx", vio->sd, (long) buf, (uint) size, (long) vio->ssl_arg)); - r= SSL_read((SSL*) vio->ssl_arg, buf, size); + if (vio->async_context && vio->async_context->active) + r= my_ssl_read_async(vio->async_context, (SSL *)vio->ssl_arg, buf, size); + else + r= SSL_read((SSL*) vio->ssl_arg, buf, size); #ifndef DBUG_OFF if (r == (size_t) -1) report_errors((SSL*) vio->ssl_arg); @@ -107,11 +113,16 @@ size_t vio_ssl_read(Vio *vio, uchar* buf, size_t size) size_t vio_ssl_write(Vio *vio, const uchar* buf, size_t size) { size_t r; + extern int my_ssl_write_async(struct mysql_async_context *b, SSL *ssl, + const void *buf, int size); DBUG_ENTER("vio_ssl_write"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - r= SSL_write((SSL*) vio->ssl_arg, buf, size); + if (vio->async_context && vio->async_context->active) + r= my_ssl_write_async(vio->async_context, (SSL *)vio->ssl_arg, buf, size); + else + r= SSL_write((SSL*) vio->ssl_arg, buf, size); #ifndef DBUG_OFF if (r == (size_t) -1) report_errors((SSL*) vio->ssl_arg); |