diff options
author | unknown <konstantin@mysql.com> | 2004-08-03 03:32:21 -0700 |
---|---|---|
committer | unknown <konstantin@mysql.com> | 2004-08-03 03:32:21 -0700 |
commit | eaf34dd8e3383b92f3d6e2600fc6f6f9365c63fc (patch) | |
tree | 90caa835d0b6af4148caf934c5d80616cb5c25ae | |
parent | 4467bcf26e0e5f9b205864b5f54f6234c62a2fe3 (diff) | |
download | mariadb-git-eaf34dd8e3383b92f3d6e2600fc6f6f9365c63fc.tar.gz |
Port of cursors to be pushed into 5.0 tree:
- client side part is simple and may be considered stable
- server side part now just joggles with THD state to save execution
state and has no additional locking wisdom.
Lot's of it are to be rewritten.
include/mysql.h:
Cursor patch to push into the main tree, client library part (considered
stable):
- new statement attribute STMT_ATTR_CURSOR_TYPE
- MYSQL_STMT::flags to store statement cursor type
- MYSQL_STMT::server_status to store server status (i. e. if the server
was able to open a cursor for this query).
include/mysql_com.h:
Cursor patch to push into the main tree, client library part (considered
stable):
- new COMmand, COM_FETCH, to fetch K rows from read-only cursor.
By design should support scrollable cursors as well.
- a few new server statuses:
SERVER_STATUS_CURSOR_EXISTS is sent by server in reply to COM_EXECUTE,
when cursor was successfully opened for this query
SERVER_STATUS_LAST_ROW_SENT is sent along with the last row to prevent one
more round trip just for finding out that all rows were fetched from
this cursor (this is server mem savier also).
- and finally, all possible values of STMT_ATTR_CURSOR_TYPE,
while now we support only CURSORT_TYPE_NO_CURSOR and
CURSOR_TYPE_READ_ONLY
libmysql/libmysql.c:
Cursor patch to push into the main tree, client library part (considered
stable):
- simple additions to mysql_stmt_fetch implementation to read data
from an opened cursor: we can read up to iteration count rows per
one request; read rows are buffered in the same way as rows of
mysql_stmt_store_result.
- now send stmt->flags to server to let him now if we wish to have
a cursor for this statement.
- support for setting/getting statement cursor type.
libmysqld/examples/Makefile.am:
Testing cursors was originally implemented in C++. Now when these tests
go into client_test, it's time to convert it to C++ as well.
libmysqld/lib_sql.cc:
- cleanup: send_fields flags are now named.
sql/ha_innodb.cc:
- cleanup: send_fields flags are now named.
sql/mysql_priv.h:
- cursors support: declaration for server-side handler of COM_FETCH
sql/protocol.cc:
- cleanup: send_fields flags are now named.
- we can't anymore assert that field_types[field_pos] is sensible:
if we have COM_EXCUTE(stmt1), COM_EXECUTE(stmt2), COM_FETCH(stmt1)
field_types[field_pos] will point to fields of stmt2.
sql/protocol.h:
- cleanup: send_fields flag_s_ are now named.
sql/protocol_cursor.cc:
- cleanup: send_fields flags are now named.
sql/repl_failsafe.cc:
- cleanup: send_fields flags are now named.
sql/slave.cc:
- cleanup: send_fields flags are now named.
sql/sp.cc:
- cleanup: send_fields flags are now named.
sql/sp_head.cc:
- cleanup: send_fields flags are now named.
sql/sql_acl.cc:
- cleanup: send_fields flags are now named.
sql/sql_class.cc:
- cleanup: send_fields flags are now named.
sql/sql_class.h:
- cleanup: send_fields flags are now named.
sql/sql_error.cc:
- cleanup: send_fields flags are now named.
sql/sql_handler.cc:
- cleanup: send_fields flags are now named.
sql/sql_help.cc:
- cleanup: send_fields flags are now named.
sql/sql_parse.cc:
Server side support for cursors:
- handle COM_FETCH
- enforce assumption that whenever we free thd->free_list,
we reset it to zero. This way it's much easier to handle free_list
in prepared statements implementation.
sql/sql_prepare.cc:
Server side support for cursors:
- implementation of mysql_stmt_fetch (fetch some rows from open cursor).
- management of cursors memory is quite tricky now.
- execute_stmt can't be reused anymore in mysql_stmt_execute and
mysql_sql_stmt_execute
sql/sql_repl.cc:
- cleanup: send_fields flags are now named.
sql/sql_select.cc:
Server side support for cursors:
- implementation of Cursor::open, Cursor::fetch (buggy when it comes to
non-equi joins), cursor cleanups.
- -4 -3 -0 constants indicating return value of sub_select and end_send are
to be renamed to something more readable:
it turned out to be not so simple, so it should come with the other patch.
sql/sql_select.h:
Server side support for cursors:
- declaration of Cursor class.
- JOIN::fetch_limit contains runtime value of rows fetched via cursor.
sql/sql_show.cc:
- cleanup: send_fields flags are now named.
sql/sql_table.cc:
- cleanup: send_fields flags are now named.
sql/sql_union.cc:
- if there was a cursor, don't cleanup unit: we'll need it to fetch
the rest of the rows.
tests/Makefile.am:
Now client_test is in C++.
tests/client_test.cc:
A few elementary tests for cursors.
BitKeeper/etc/ignore:
Added libmysqld/examples/client_test.cc to the ignore list
-rw-r--r-- | .bzrignore | 1 | ||||
-rw-r--r-- | include/mysql.h | 13 | ||||
-rw-r--r-- | include/mysql_com.h | 23 | ||||
-rw-r--r-- | libmysql/libmysql.c | 77 | ||||
-rw-r--r-- | libmysqld/examples/Makefile.am | 2 | ||||
-rw-r--r-- | libmysqld/lib_sql.cc | 4 | ||||
-rw-r--r-- | sql/ha_innodb.cc | 3 | ||||
-rw-r--r-- | sql/mysql_priv.h | 1 | ||||
-rw-r--r-- | sql/protocol.cc | 58 | ||||
-rw-r--r-- | sql/protocol.h | 9 | ||||
-rw-r--r-- | sql/protocol_cursor.cc | 4 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 6 | ||||
-rw-r--r-- | sql/slave.cc | 3 | ||||
-rw-r--r-- | sql/sp.cc | 3 | ||||
-rw-r--r-- | sql/sp_head.cc | 6 | ||||
-rw-r--r-- | sql/sql_acl.cc | 3 | ||||
-rw-r--r-- | sql/sql_class.cc | 14 | ||||
-rw-r--r-- | sql/sql_class.h | 26 | ||||
-rw-r--r-- | sql/sql_error.cc | 3 | ||||
-rw-r--r-- | sql/sql_handler.cc | 2 | ||||
-rw-r--r-- | sql/sql_help.cc | 6 | ||||
-rw-r--r-- | sql/sql_parse.cc | 9 | ||||
-rw-r--r-- | sql/sql_prepare.cc | 154 | ||||
-rw-r--r-- | sql/sql_repl.cc | 9 | ||||
-rw-r--r-- | sql/sql_select.cc | 380 | ||||
-rw-r--r-- | sql/sql_select.h | 57 | ||||
-rw-r--r-- | sql/sql_show.cc | 50 | ||||
-rw-r--r-- | sql/sql_table.cc | 6 | ||||
-rw-r--r-- | sql/sql_union.cc | 8 | ||||
-rw-r--r-- | tests/Makefile.am | 2 | ||||
-rw-r--r-- | tests/client_test.cc (renamed from tests/client_test.c) | 299 |
31 files changed, 1091 insertions, 150 deletions
diff --git a/.bzrignore b/.bzrignore index 7e1a3545374..6f5cd2de56a 100644 --- a/.bzrignore +++ b/.bzrignore @@ -800,3 +800,4 @@ vio/test-sslclient vio/test-sslserver vio/viotest-ssl libmysqld/sql_view.cc +libmysqld/examples/client_test.cc diff --git a/include/mysql.h b/include/mysql.h index 0f3fdc90548..2af1c657aeb 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -580,6 +580,12 @@ typedef struct st_mysql_stmt int (*read_row_func)(struct st_mysql_stmt *stmt, unsigned char **row); unsigned long stmt_id; /* Id for prepared statement */ + unsigned long flags; /* i.e. type of cursor to open */ + /* + Copied from mysql->server_status after execute/fetch to know + server-side cursor status for this statement. + */ + unsigned int server_status; unsigned int last_errno; /* error code */ unsigned int param_count; /* inpute parameters count */ unsigned int field_count; /* number of columns in result set */ @@ -608,7 +614,12 @@ enum enum_stmt_attr_type In the new API we do that only by request because it slows down mysql_stmt_store_result sufficiently. */ - STMT_ATTR_UPDATE_MAX_LENGTH + STMT_ATTR_UPDATE_MAX_LENGTH, + /* + unsigned long with combination of cursor flags (read only, for update, + etc) + */ + STMT_ATTR_CURSOR_TYPE }; diff --git a/include/mysql_com.h b/include/mysql_com.h index 01f26399953..fa73895000c 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -49,7 +49,7 @@ enum enum_server_command COM_TIME, COM_DELAYED_INSERT, COM_CHANGE_USER, COM_BINLOG_DUMP, COM_TABLE_DUMP, COM_CONNECT_OUT, COM_REGISTER_SLAVE, COM_PREPARE, COM_EXECUTE, COM_LONG_DATA, COM_CLOSE_STMT, - COM_RESET_STMT, COM_SET_OPTION, + COM_RESET_STMT, COM_SET_OPTION, COM_FETCH, COM_END /* Must be last */ }; @@ -132,6 +132,17 @@ enum enum_server_command #define SERVER_MORE_RESULTS_EXISTS 8 /* Multi query - next query exists */ #define SERVER_QUERY_NO_GOOD_INDEX_USED 16 #define SERVER_QUERY_NO_INDEX_USED 32 +/* + The server was able to fulfill client request and open read-only + non-scrollable cursor for the query. This flag comes in server + status with reply to COM_EXECUTE and COM_EXECUTE_DIRECT commands. +*/ +#define SERVER_STATUS_CURSOR_EXISTS 64 +/* + This flag is sent with last row of read-only cursor, in reply to + COM_FETCH command. +*/ +#define SERVER_STATUS_LAST_ROW_SENT 128 #define MYSQL_ERRMSG_SIZE 512 #define NET_READ_TIMEOUT 30 /* Timeout on read */ @@ -257,6 +268,16 @@ enum enum_shutdown_level { KILL_CONNECTION= 255 }; + +enum enum_cursor_type +{ + CURSOR_TYPE_NO_CURSOR= 0, + CURSOR_TYPE_READ_ONLY= 1, + CURSOR_TYPE_FOR_UPDATE= 2, + CURSOR_TYPE_SCROLLABLE= 4 +}; + + /* options for mysql_set_option */ enum enum_mysql_set_option { diff --git a/libmysql/libmysql.c b/libmysql/libmysql.c index fc7728c98e0..2dd35809a7b 100644 --- a/libmysql/libmysql.c +++ b/libmysql/libmysql.c @@ -1668,6 +1668,7 @@ myodbc_remove_escape(MYSQL *mysql,char *name) static int stmt_read_row_unbuffered(MYSQL_STMT *stmt, unsigned char **row); static int stmt_read_row_buffered(MYSQL_STMT *stmt, unsigned char **row); +static int stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row); static int stmt_read_row_no_data(MYSQL_STMT *stmt, unsigned char **row); /* @@ -2387,7 +2388,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length) mysql->last_used_con= mysql; int4store(buff, stmt->stmt_id); /* Send stmt id to server */ - buff[4]= (char) 0; /* no flags */ + buff[4]= (char) stmt->flags; int4store(buff+5, 1); /* iteration count */ if (cli_advanced_command(mysql, COM_EXECUTE, buff, sizeof(buff), packet, length, 1) || @@ -2397,6 +2398,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length) DBUG_RETURN(1); } stmt->affected_rows= mysql->affected_rows; + stmt->server_status= mysql->server_status; stmt->insert_id= mysql->insert_id; DBUG_RETURN(0); } @@ -2552,6 +2554,59 @@ error: return rc; } + +/* + Fetch statement row using server side cursor. + + SYNOPSIS + stmt_read_row_from_cursor() + + RETURN VALUE + 0 success + 1 error + MYSQL_NO_DATA end of data +*/ + +static int +stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row) +{ + if (stmt->data_cursor) + return stmt_read_row_buffered(stmt, row); + if (stmt->server_status & SERVER_STATUS_LAST_ROW_SENT) + stmt->server_status &= ~SERVER_STATUS_LAST_ROW_SENT; + else + { + MYSQL *mysql= stmt->mysql; + NET *net= &mysql->net; + MYSQL_DATA *result= &stmt->result; + char buff[4 /* statement id */ + + 4 /* number of rows to fetch */]; + + free_root(&result->alloc, MYF(MY_KEEP_PREALLOC)); + result->data= NULL; + result->rows= 0; + /* Send row request to the server */ + int4store(buff, stmt->stmt_id); + int4store(buff + 4, 1); /* number of rows to fetch */ + if (cli_advanced_command(mysql, COM_FETCH, buff, sizeof(buff), + NullS, 0, 1)) + { + set_stmt_errmsg(stmt, net->last_error, net->last_errno, net->sqlstate); + return 1; + } + stmt->server_status= mysql->server_status; + if (cli_read_binary_rows(stmt)) + return 1; + stmt->server_status= mysql->server_status; + + stmt->data_cursor= result->data; + return stmt_read_row_buffered(stmt, row); + } + *row= 0; + return MYSQL_NO_DATA; +} + + /* Default read row function to not SIGSEGV in client in case of wrong sequence of API calls. @@ -2593,6 +2648,9 @@ my_bool STDCALL mysql_stmt_attr_set(MYSQL_STMT *stmt, case STMT_ATTR_UPDATE_MAX_LENGTH: stmt->update_max_length= value ? *(const my_bool*) value : 0; break; + case STMT_ATTR_CURSOR_TYPE: + stmt->flags= value ? *(const unsigned long *) value : 0; + break; default: return TRUE; } @@ -2608,6 +2666,9 @@ my_bool STDCALL mysql_stmt_attr_get(MYSQL_STMT *stmt, case STMT_ATTR_UPDATE_MAX_LENGTH: *(unsigned long *) value= stmt->update_max_length; break; + case STMT_ATTR_CURSOR_TYPE: + *(unsigned long *) value= stmt->flags; + break; default: return TRUE; } @@ -2711,9 +2772,17 @@ int STDCALL mysql_stmt_execute(MYSQL_STMT *stmt) stmt->state= MYSQL_STMT_EXECUTE_DONE; if (stmt->field_count) { - stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled; - stmt->unbuffered_fetch_cancelled= FALSE; - stmt->read_row_func= stmt_read_row_unbuffered; + if (stmt->server_status & SERVER_STATUS_CURSOR_EXISTS) + { + mysql->status= MYSQL_STATUS_READY; + stmt->read_row_func= stmt_read_row_from_cursor; + } + else + { + stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled; + stmt->unbuffered_fetch_cancelled= FALSE; + stmt->read_row_func= stmt_read_row_unbuffered; + } } DBUG_RETURN(0); } diff --git a/libmysqld/examples/Makefile.am b/libmysqld/examples/Makefile.am index b3db54d305a..7e32df82e80 100644 --- a/libmysqld/examples/Makefile.am +++ b/libmysqld/examples/Makefile.am @@ -27,7 +27,7 @@ mysql_SOURCES = mysql.cc readline.cc completion_hash.cc \ mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD) client_test_LINK = $(CXXLINK) -client_test_SOURCES = client_test.c +client_test_SOURCES = client_test.cc clean: rm -f $(client_sources) diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index 0adf9aeb86a..1f37b115f6c 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -568,7 +568,7 @@ err: C_MODE_END -bool Protocol::send_fields(List<Item> *list, uint flag) +bool Protocol::send_fields(List<Item> *list, uint flags) { List_iterator_fast<Item> it(*list); Item *item; @@ -615,7 +615,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag) if (INTERNAL_NUM_FIELD(client_field)) client_field->flags|= NUM_FLAG; - if (flag & 2) + if (flags & Protocol::SEND_DEFAULTS) { char buff[80]; String tmp(buff, sizeof(buff), default_charset_info), *res; diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc index fdaf5198b7f..700b8fafe19 100644 --- a/sql/ha_innodb.cc +++ b/sql/ha_innodb.cc @@ -4891,7 +4891,8 @@ innodb_show_status( field_list.push_back(new Item_empty_string("Status", flen)); - if (protocol->send_fields(&field_list, 1)) { + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { my_free(str, MYF(0)); diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index b0acf64c4ff..bae1dcf5ecf 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -694,6 +694,7 @@ int mysql_stmt_prepare(THD *thd, char *packet, uint packet_length, LEX_STRING *name=NULL); void mysql_stmt_execute(THD *thd, char *packet, uint packet_length); void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name); +void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length); void mysql_stmt_free(THD *thd, char *packet); void mysql_stmt_reset(THD *thd, char *packet); void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length); diff --git a/sql/protocol.cc b/sql/protocol.cc index 065fcd3d4af..75383001014 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -487,6 +487,7 @@ void Protocol::init(THD *thd_arg) flag Bit mask with the following functions: 1 send number of rows 2 send default values + 4 don't write eof packet DESCRIPTION Sum fields has table name empty and field_name. @@ -497,7 +498,7 @@ void Protocol::init(THD *thd_arg) */ #ifndef EMBEDDED_LIBRARY -bool Protocol::send_fields(List<Item> *list, uint flag) +bool Protocol::send_fields(List<Item> *list, uint flags) { List_iterator_fast<Item> it(*list); Item *item; @@ -508,7 +509,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag) CHARSET_INFO *thd_charset= thd->variables.character_set_results; DBUG_ENTER("send_fields"); - if (flag & 1) + if (flags & SEND_NUM_ROWS) { // Packet with number of elements char *pos=net_store_length(buff, (uint) list->elements); (void) my_net_write(&thd->net, buff,(uint) (pos-buff)); @@ -594,7 +595,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag) } } local_packet->length((uint) (pos - local_packet->ptr())); - if (flag & 2) + if (flags & SEND_DEFAULTS) item->send(&prot, &tmp); // Send default value if (prot.write()) break; /* purecov: inspected */ @@ -603,7 +604,8 @@ bool Protocol::send_fields(List<Item> *list, uint flag) #endif } - my_net_write(&thd->net, eof_buff, 1); + if (flags & SEND_EOF) + my_net_write(&thd->net, eof_buff, 1); DBUG_RETURN(prepare_for_send(list)); err: @@ -962,12 +964,6 @@ void Protocol_prep::prepare_for_resend() bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs) { CHARSET_INFO *tocs= thd->variables.character_set_results; -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DECIMAL || - (field_types[field_pos] >= MYSQL_TYPE_ENUM && - field_types[field_pos] <= MYSQL_TYPE_GEOMETRY)); -#endif field_pos++; return store_string_aux(from, length, fromcs, tocs); } @@ -975,12 +971,6 @@ bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs) bool Protocol_prep::store(const char *from,uint length, CHARSET_INFO *fromcs, CHARSET_INFO *tocs) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DECIMAL || - (field_types[field_pos] >= MYSQL_TYPE_ENUM && - field_types[field_pos] <= MYSQL_TYPE_GEOMETRY)); -#endif field_pos++; return store_string_aux(from, length, fromcs, tocs); } @@ -998,10 +988,6 @@ bool Protocol_prep::store_null() bool Protocol_prep::store_tiny(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_TINY); -#endif char buff[1]; field_pos++; buff[0]= (uchar) from; @@ -1011,11 +997,6 @@ bool Protocol_prep::store_tiny(longlong from) bool Protocol_prep::store_short(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_SHORT || - field_types[field_pos] == MYSQL_TYPE_YEAR); -#endif field_pos++; char *to= packet->prep_append(2, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1027,11 +1008,6 @@ bool Protocol_prep::store_short(longlong from) bool Protocol_prep::store_long(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_INT24 || - field_types[field_pos] == MYSQL_TYPE_LONG); -#endif field_pos++; char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1043,10 +1019,6 @@ bool Protocol_prep::store_long(longlong from) bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_LONGLONG); -#endif field_pos++; char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1058,10 +1030,6 @@ bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag) bool Protocol_prep::store(float from, uint32 decimals, String *buffer) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_FLOAT); -#endif field_pos++; char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1073,10 +1041,6 @@ bool Protocol_prep::store(float from, uint32 decimals, String *buffer) bool Protocol_prep::store(double from, uint32 decimals, String *buffer) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DOUBLE); -#endif field_pos++; char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1100,12 +1064,6 @@ bool Protocol_prep::store(Field *field) bool Protocol_prep::store(TIME *tm) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DATETIME || - field_types[field_pos] == MYSQL_TYPE_DATE || - field_types[field_pos] == MYSQL_TYPE_TIMESTAMP); -#endif char buff[12],*pos; uint length; field_pos++; @@ -1140,10 +1098,6 @@ bool Protocol_prep::store_date(TIME *tm) bool Protocol_prep::store_time(TIME *tm) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_TIME); -#endif char buff[13], *pos; uint length; field_pos++; diff --git a/sql/protocol.h b/sql/protocol.h index 079c06ae155..8dc2f506c6c 100644 --- a/sql/protocol.h +++ b/sql/protocol.h @@ -50,7 +50,12 @@ public: Protocol(THD *thd_arg) { init(thd_arg); } virtual ~Protocol() {} void init(THD* thd_arg); - virtual bool send_fields(List<Item> *list, uint flag); + + static const uint SEND_NUM_ROWS= 1; + static const uint SEND_DEFAULTS= 2; + static const uint SEND_EOF= 4; + virtual bool send_fields(List<Item> *list, uint flags); + bool send_records_num(List<Item> *list, ulonglong records); bool store(I_List<i_string> *str_list); bool store(const char *from, CHARSET_INFO *cs); @@ -163,7 +168,7 @@ public: prev_record= &data; return Protocol_simple::prepare_for_send(item_list); } - bool send_fields(List<Item> *list, uint flag); + bool send_fields(List<Item> *list, uint flags); bool write(); uint get_field_count() { return field_count; } }; diff --git a/sql/protocol_cursor.cc b/sql/protocol_cursor.cc index 749b66785d4..d2c99dcaebc 100644 --- a/sql/protocol_cursor.cc +++ b/sql/protocol_cursor.cc @@ -26,7 +26,7 @@ #include "mysql_priv.h" #include <mysql.h> -bool Protocol_cursor::send_fields(List<Item> *list, uint flag) +bool Protocol_cursor::send_fields(List<Item> *list, uint flags) { List_iterator_fast<Item> it(*list); Item *item; @@ -67,7 +67,7 @@ bool Protocol_cursor::send_fields(List<Item> *list, uint flag) if (INTERNAL_NUM_FIELD(client_field)) client_field->flags|= NUM_FLAG; - if (flag & 2) + if (flags & Protocol::SEND_DEFAULTS) { char buff[80]; String tmp(buff, sizeof(buff), default_charset_info), *res; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 4feb24f06b2..bb70b793d3b 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -461,7 +461,8 @@ int show_new_master(THD* thd) field_list.push_back(new Item_empty_string("Log_name", 20)); field_list.push_back(new Item_return_int("Log_pos", 10, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); protocol->prepare_for_resend(); protocol->store(lex_mi->log_file_name, &my_charset_bin); @@ -651,7 +652,8 @@ int show_slave_hosts(THD* thd) field_list.push_back(new Item_return_int("Master_id", 10, MYSQL_TYPE_LONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); pthread_mutex_lock(&LOCK_slave_list); diff --git a/sql/slave.cc b/sql/slave.cc index 8843e1561ec..0defbe35163 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2335,7 +2335,8 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); if (mi->host[0]) diff --git a/sql/sp.cc b/sql/sp.cc index 4164a27ca5f..83ed95c7c4c 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -563,7 +563,8 @@ db_show_routine_status(THD *thd, int type, const char *wild) } } /* Print header */ - if (thd->protocol->send_fields(&field_list,1)) + if (thd->protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { res= SP_INTERNAL_ERROR; goto err_case; diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 32c25c98425..10e6d8f6db4 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -898,7 +898,8 @@ sp_head::show_create_procedure(THD *thd) // 1024 is for not to confuse old clients field_list.push_back(new Item_empty_string("Create Procedure", max(buffer.length(), 1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { res= 1; goto done; @@ -964,7 +965,8 @@ sp_head::show_create_function(THD *thd) field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len)); field_list.push_back(new Item_empty_string("Create Function", max(buffer.length(),1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) { res= 1; goto done; diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 41c629e5c12..07c3e59fd81 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -3163,7 +3163,8 @@ int mysql_show_grants(THD *thd,LEX_USER *lex_user) strxmov(buff,"Grants for ",lex_user->user.str,"@", lex_user->host.str,NullS); field_list.push_back(field); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); rw_wrlock(&LOCK_grant); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 103a5080caf..da64479abf2 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -675,7 +675,8 @@ int THD::send_explain_fields(select_result *result) item->maybe_null=1; field_list.push_back(new Item_return_int("rows",10, MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Extra",255)); - return (result->send_fields(field_list,1)); + return (result->send_fields(field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)); } #ifdef SIGNAL_WITH_VIO_CLOSE @@ -722,9 +723,9 @@ sql_exchange::sql_exchange(char *name,bool flag) escaped= &default_escaped; } -bool select_send::send_fields(List<Item> &list,uint flag) +bool select_send::send_fields(List<Item> &list, uint flags) { - return thd->protocol->send_fields(&list,flag); + return thd->protocol->send_fields(&list, flags); } /* Send data to client. Returns 0 if ok */ @@ -1354,7 +1355,8 @@ Statement::Statement(THD *thd) allow_sum_func(0), lex(&main_lex), query(0), - query_length(0) + query_length(0), + cursor(0) { name.str= NULL; } @@ -1372,7 +1374,8 @@ Statement::Statement() allow_sum_func(0), /* initialized later */ lex(&main_lex), query(0), /* these two are set */ - query_length(0) /* in alloc_query() */ + query_length(0), /* in alloc_query() */ + cursor(0) { } @@ -1391,6 +1394,7 @@ void Statement::set_statement(Statement *stmt) lex= stmt->lex; query= stmt->query; query_length= stmt->query_length; + cursor= stmt->cursor; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 00d32543da0..eccaf072008 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -487,6 +487,9 @@ public: void set_item_arena(Item_arena *set); }; + +class Cursor; + /* State of a single command executed against this connection. One connection can contain a lot of simultaneously running statements, @@ -543,6 +546,7 @@ public: */ char *query; uint32 query_length; // current query length + Cursor *cursor; public: /* We build without RTTI, so dynamic_cast can't be used. */ @@ -1054,6 +1058,7 @@ public: { DBUG_ASSERT(current_arena!=0); cleanup_items(current_arena->free_list); + /* no need to reset free_list as it won't be used anymore */ free_items(free_list); close_thread_tables(this); // to close derived tables free_root(&mem_root, MYF(0)); @@ -1108,7 +1113,7 @@ public: unit= u; return 0; } - virtual bool send_fields(List<Item> &list,uint flag)=0; + virtual bool send_fields(List<Item> &list, uint flags)=0; virtual bool send_data(List<Item> &items)=0; virtual bool initialize_tables (JOIN *join=0) { return 0; } virtual void send_error(uint errcode,const char *err); @@ -1120,7 +1125,7 @@ public: class select_send :public select_result { public: select_send() {} - bool send_fields(List<Item> &list,uint flag); + bool send_fields(List<Item> &list, uint flags); bool send_data(List<Item> &items); bool send_eof(); }; @@ -1138,7 +1143,7 @@ public: select_to_file(sql_exchange *ex) :exchange(ex), file(-1),row_count(0L) { path[0]=0; } ~select_to_file(); - bool send_fields(List<Item> &list, uint flag) { return 0; } + bool send_fields(List<Item> &list, uint flags) { return 0; } void send_error(uint errcode,const char *err); }; @@ -1185,8 +1190,7 @@ class select_insert :public select_result { } ~select_insert(); int prepare(List<Item> &list, SELECT_LEX_UNIT *u); - bool send_fields(List<Item> &list, uint flag) - { return 0; } + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items); void send_error(uint errcode,const char *err); bool send_eof(); @@ -1273,8 +1277,7 @@ class select_union :public select_result { select_union(TABLE *table_par); ~select_union(); int prepare(List<Item> &list, SELECT_LEX_UNIT *u); - bool send_fields(List<Item> &list, uint flag) - { return 0; } + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items); bool send_eof(); bool flush(); @@ -1288,7 +1291,7 @@ protected: Item_subselect *item; public: select_subselect(Item_subselect *item); - bool send_fields(List<Item> &list, uint flag) { return 0; }; + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items)=0; bool send_eof() { return 0; }; @@ -1457,8 +1460,7 @@ public: multi_delete(THD *thd, TABLE_LIST *dt, uint num_of_tables); ~multi_delete(); int prepare(List<Item> &list, SELECT_LEX_UNIT *u); - bool send_fields(List<Item> &list, - uint flag) { return 0; } + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items); bool initialize_tables (JOIN *join); void send_error(uint errcode,const char *err); @@ -1486,7 +1488,7 @@ public: List<Item> *values, enum_duplicates handle_duplicates); ~multi_update(); int prepare(List<Item> &list, SELECT_LEX_UNIT *u); - bool send_fields(List<Item> &list, uint flag) { return 0; } + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items); bool initialize_tables (JOIN *join); void send_error(uint errcode,const char *err); @@ -1515,7 +1517,7 @@ public: select_dumpvar(void) { var_list.empty(); local_vars.empty(); vars.empty(); row_count=0;} ~select_dumpvar() {} int prepare(List<Item> &list, SELECT_LEX_UNIT *u); - bool send_fields(List<Item> &list, uint flag) {return 0;} + bool send_fields(List<Item> &list, uint flags) { return 0; } bool send_data(List<Item> &items); bool send_eof(); }; diff --git a/sql/sql_error.cc b/sql/sql_error.cc index 8aa7bdf9a7f..d68d62a8820 100644 --- a/sql/sql_error.cc +++ b/sql/sql_error.cc @@ -185,7 +185,8 @@ my_bool mysqld_show_warnings(THD *thd, ulong levels_to_show) field_list.push_back(new Item_return_int("Code",4, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Message",MYSQL_ERRMSG_SIZE)); - if (thd->protocol->send_fields(&field_list,1)) + if (thd->protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); MYSQL_ERROR *err; diff --git a/sql/sql_handler.cc b/sql/sql_handler.cc index 8d06d3f0017..0df3d617d7f 100644 --- a/sql/sql_handler.cc +++ b/sql/sql_handler.cc @@ -252,7 +252,7 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables, insert_fields(thd, tables, tables->db, tables->alias, &it, 0); select_limit+=offset_limit; - protocol->send_fields(&list,1); + protocol->send_fields(&list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); HANDLER_TABLES_HACK(thd); MYSQL_LOCK *lock=mysql_lock_tables(thd,&tables->table,1); diff --git a/sql/sql_help.cc b/sql/sql_help.cc index 08a8fc626cc..85d5271d4c3 100644 --- a/sql/sql_help.cc +++ b/sql/sql_help.cc @@ -426,7 +426,8 @@ int send_answer_1(Protocol *protocol, String *s1, String *s2, String *s3) field_list.push_back(new Item_empty_string("description",1000)); field_list.push_back(new Item_empty_string("example",1000)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); @@ -468,7 +469,8 @@ int send_header_2(Protocol *protocol, bool for_category) field_list.push_back(new Item_empty_string("source_category_name",64)); field_list.push_back(new Item_empty_string("name",64)); field_list.push_back(new Item_empty_string("is_it_category",1)); - DBUG_RETURN(protocol->send_fields(&field_list,1)); + DBUG_RETURN(protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)); } /* diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 7f42d6052d9..21d2a51bfeb 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1452,6 +1452,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, mysql_stmt_execute(thd, packet, packet_length); break; } + case COM_FETCH: + { + mysql_stmt_fetch(thd, packet, packet_length); + break; + } case COM_LONG_DATA: { mysql_stmt_get_longdata(thd, packet, packet_length); @@ -1545,7 +1550,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd, send_error(thd,ER_NO_DB_ERROR); break; } - thd->free_list=0; pend= strend(packet); thd->convert_string(&conv_name, system_charset_info, packet, (uint) (pend-packet), thd->charset()); @@ -1567,6 +1571,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, break; mysqld_list_fields(thd,&table_list,fields); free_items(thd->free_list); + thd->free_list=0; /* free_list should never point to garbage */ break; } #endif @@ -4443,6 +4448,7 @@ void mysql_parse(THD *thd, char *inBuf, uint length) } thd->proc_info="freeing items"; free_items(thd->free_list); /* Free strings used by items */ + thd->free_list= 0; /* free_list should never point to garbage */ lex_end(lex); } DBUG_VOID_RETURN; @@ -4470,6 +4476,7 @@ bool mysql_test_parse_for_slave(THD *thd, char *inBuf, uint length) all_tables_not_ok(thd,(TABLE_LIST*) lex->select_lex.table_list.first)) error= 1; /* Ignore question */ free_items(thd->free_list); /* Free strings used by items */ + thd->free_list= 0; /* free_list should never point to garbage */ lex_end(lex); DBUG_RETURN(error); diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index ed5e75a3622..9d9a0dd73e2 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -75,6 +75,8 @@ Long data handling: #ifdef EMBEDDED_LIBRARY /* include MYSQL_BIND headers */ #include <mysql.h> +#else +#include <mysql_com.h> #endif /****************************************************************************** @@ -107,7 +109,7 @@ public: }; static void execute_stmt(THD *thd, Prepared_statement *stmt, - String *expanded_query, bool set_context); + String *expanded_query); /****************************************************************************** Implementation @@ -166,7 +168,8 @@ static bool send_prep_stmt(Prepared_statement *stmt, uint columns) return my_net_write(net, buff, sizeof(buff)) || (stmt->param_count && stmt->thd->protocol_simple.send_fields((List<Item> *) - &stmt->lex->param_list, 0)) || + &stmt->lex->param_list, + Protocol::SEND_EOF)) || net_flush(net); return 0; } @@ -1098,7 +1101,8 @@ static int mysql_test_select(Prepared_statement *stmt, if (!text_protocol) { if (send_prep_stmt(stmt, lex->select_lex.item_list.elements) || - thd->protocol_simple.send_fields(&lex->select_lex.item_list, 0) + thd->protocol_simple.send_fields(&lex->select_lex.item_list, + Protocol::SEND_EOF) #ifndef EMBEDDED_LIBRARY || net_flush(&thd->net) #endif @@ -1476,6 +1480,12 @@ static int send_prepare_results(Prepared_statement *stmt, bool text_protocol) case SQLCOM_SHOW_GRANTS: case SQLCOM_DROP_TABLE: case SQLCOM_RENAME_TABLE: + case SQLCOM_ALTER_TABLE: + case SQLCOM_COMMIT: + case SQLCOM_CREATE_INDEX: + case SQLCOM_DROP_INDEX: + case SQLCOM_ROLLBACK: + case SQLCOM_TRUNCATE: break; default: @@ -1756,6 +1766,7 @@ static void reset_stmt_params(Prepared_statement *stmt) void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) { ulong stmt_id= uint4korr(packet); + ulong flags= (ulong) ((uchar) packet[4]); /* Query text for binary log, or empty string if the query is not put into binary log. @@ -1782,6 +1793,28 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) DBUG_VOID_RETURN; } + if (flags & (ulong) CURSOR_TYPE_READ_ONLY) + { + if (stmt->lex->result) + { + /* + If lex->result is set in the parser, this is not a SELECT + statement: we can't open a cursor for it. + */ + flags= 0; + } + else + { + if (!stmt->cursor && + !(stmt->cursor= new (&stmt->mem_root) Cursor())) + { + send_error(thd, ER_OUT_OF_RESOURCES); + DBUG_VOID_RETURN; + } + /* If lex->result is set, mysql_execute_command will use it */ + stmt->lex->result= &stmt->cursor->result; + } + } #ifndef EMBEDDED_LIBRARY if (stmt->param_count) { @@ -1800,16 +1833,55 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) if (stmt->param_count && stmt->set_params_data(stmt, &expanded_query)) goto set_params_data_err; #endif + thd->stmt_backup.set_statement(thd); + thd->set_statement(stmt); thd->current_arena= stmt; + reset_stmt_for_execute(thd, stmt->lex); + /* From now cursors assume that thd->mem_root is clean */ + if (expanded_query.length() && + alloc_query(thd, (char *)expanded_query.ptr(), + expanded_query.length()+1)) + { + my_error(ER_OUTOFMEMORY, 0, expanded_query.length()); + goto err; + } + thd->protocol= &thd->protocol_prep; // Switch to binary protocol - execute_stmt(thd, stmt, &expanded_query, true); + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(),QUERY_PRIOR); + mysql_execute_command(thd); + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), WAIT_PRIOR); thd->protocol= &thd->protocol_simple; // Use normal protocol + + if (flags & (ulong) CURSOR_TYPE_READ_ONLY) + { + if (stmt->cursor->is_open()) + stmt->cursor->init_from_thd(thd); + thd->set_item_arena(&thd->stmt_backup); + } + else + { + thd->lex->unit.cleanup(); + cleanup_items(stmt->free_list); + reset_stmt_params(stmt); + close_thread_tables(thd); /* to close derived tables */ + /* + Free items that were created during this execution of the PS by + query optimizer. + */ + free_items(thd->free_list); + thd->free_list= 0; + } + + thd->set_statement(&thd->stmt_backup); thd->current_arena= 0; DBUG_VOID_RETURN; set_params_data_err: reset_stmt_params(stmt); my_error(ER_WRONG_ARGUMENTS, MYF(0), "mysql_stmt_execute"); +err: send_error(thd); DBUG_VOID_RETURN; } @@ -1845,7 +1917,6 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) DBUG_VOID_RETURN; } - thd->free_list= NULL; thd->stmt_backup.set_statement(thd); thd->set_statement(stmt); if (stmt->set_params_from_vars(stmt, @@ -1856,7 +1927,7 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) send_error(thd); } thd->current_arena= stmt; - execute_stmt(thd, stmt, &expanded_query, false); + execute_stmt(thd, stmt, &expanded_query); thd->current_arena= 0; DBUG_VOID_RETURN; } @@ -1872,20 +1943,13 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) placeholders replaced with actual values. Otherwise empty string. NOTES - Caller must set parameter values and thd::protocol. - thd->free_list is assumed to be garbage. + Caller must set parameter values and thd::protocol. */ static void execute_stmt(THD *thd, Prepared_statement *stmt, - String *expanded_query, bool set_context) + String *expanded_query) { DBUG_ENTER("execute_stmt"); - if (set_context) - { - thd->free_list= NULL; - thd->stmt_backup.set_statement(thd); - thd->set_statement(stmt); - } reset_stmt_for_execute(thd, stmt->lex); if (expanded_query->length() && @@ -1899,22 +1963,77 @@ static void execute_stmt(THD *thd, Prepared_statement *stmt, if (!(specialflag & SPECIAL_NO_PRIOR)) my_pthread_setprio(pthread_self(),QUERY_PRIOR); mysql_execute_command(thd); - thd->lex->unit.cleanup(); if (!(specialflag & SPECIAL_NO_PRIOR)) my_pthread_setprio(pthread_self(), WAIT_PRIOR); + thd->lex->unit.cleanup(); cleanup_items(stmt->free_list); reset_stmt_params(stmt); close_thread_tables(thd); // to close derived tables thd->set_statement(&thd->stmt_backup); /* Free Items that were created during this execution of the PS. */ free_items(thd->free_list); + /* + In the rest of prepared statements code we assume that free_list + never points to garbage: keep this predicate true. + */ thd->free_list= 0; DBUG_VOID_RETURN; } /* + COM_FETCH handler: fetches requested amount of rows from cursor + SYNOPSIS +*/ + +void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length) +{ + /* assume there is always place for 8-16 bytes */ + ulong stmt_id= uint4korr(packet); + ulong num_rows= uint4korr(packet+=4); + Statement *stmt; + int error; + + DBUG_ENTER("mysql_stmt_fetch"); + + if (!(stmt= thd->stmt_map.find(stmt_id)) || + !stmt->cursor || + !stmt->cursor->is_open()) + { + my_error(ER_UNKNOWN_STMT_HANDLER, MYF(0), stmt_id, "fetch"); + send_error(thd); + DBUG_VOID_RETURN; + } + + thd->stmt_backup.set_statement(thd); + thd->stmt_backup.set_item_arena(thd); + thd->set_statement(stmt); + stmt->cursor->init_thd(thd); + + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), QUERY_PRIOR); + + thd->protocol= &thd->protocol_prep; // Switch to binary protocol + error= stmt->cursor->fetch(num_rows); + thd->protocol= &thd->protocol_simple; // Use normal protocol + + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), WAIT_PRIOR); + + /* Restore THD state */ + stmt->cursor->reset_thd(thd); + thd->set_statement(&thd->stmt_backup); + thd->set_item_arena(&thd->stmt_backup); + + if (error && error != -4) + send_error(thd, ER_OUT_OF_RESOURCES); + + DBUG_VOID_RETURN; +} + + +/* Reset a prepared statement in case there was a recoverable error. SYNOPSIS mysql_stmt_reset() @@ -2084,8 +2203,11 @@ void Prepared_statement::setup_set_params() } } + Prepared_statement::~Prepared_statement() { + if (cursor) + cursor->Cursor::~Cursor(); free_items(free_list); } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 98bf4e86aba..9e38a65d412 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1297,7 +1297,8 @@ int show_binlog_events(THD* thd) Format_description_log_event(3); /* MySQL 4.0 by default */ Log_event::init_show_field_list(&field_list); - if (protocol-> send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); if (mysql_bin_log.is_open()) @@ -1426,7 +1427,8 @@ int show_binlog_info(THD* thd) field_list.push_back(new Item_empty_string("Binlog_Do_DB",255)); field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); protocol->prepare_for_resend(); @@ -1476,7 +1478,8 @@ int show_binlogs(THD* thd) } field_list.push_back(new Item_empty_string("Log_name", 255)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); mysql_bin_log.lock_index(); index_file=mysql_bin_log.get_index_file(); diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 756b5f3c017..fce77969bcf 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -1105,7 +1105,8 @@ JOIN::exec() (zero_result_cause?zero_result_cause:"No tables used")); else { - result->send_fields(fields_list,1); + result->send_fields(fields_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); if (!having || having->val_int()) { if (do_send_rows && (procedure ? (procedure->send_row(fields_list) || @@ -1512,12 +1513,45 @@ JOIN::exec() DBUG_VOID_RETURN; } } + /* XXX: When can we have here thd->net.report_error not zero? */ + if (thd->net.report_error) + { + error= thd->net.report_error; + DBUG_VOID_RETURN; + } curr_join->having= curr_join->tmp_having; - thd->proc_info="Sending data"; - error= thd->net.report_error || - do_select(curr_join, curr_fields_list, NULL, procedure); - thd->limit_found_rows= curr_join->send_records; - thd->examined_row_count= curr_join->examined_rows; + curr_join->fields= curr_fields_list; + curr_join->procedure= procedure; + + if (unit == &thd->lex->unit && + (unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) && + thd->cursor && tables != const_tables) + { + /* + We are here if this is JOIN::exec for the last select of the main unit + and the client requested to open a cursor. + We check that not all tables are constant because this case is not + handled by do_select() separately, and this case is not implemented + for cursors yet. + */ + DBUG_ASSERT(error == 0); + /* + curr_join is used only for reusable joins - that is, + to perform SELECT for each outer row (like in subselects). + This join is main, so we know for sure that curr_join == join. + */ + DBUG_ASSERT(curr_join == this); + /* Open cursor for the last join sweep */ + error= thd->cursor->open(this); + } + else + { + thd->proc_info="Sending data"; + error= do_select(curr_join, curr_fields_list, NULL, procedure); + thd->limit_found_rows= curr_join->send_records; + thd->examined_row_count= curr_join->examined_rows; + } + DBUG_VOID_RETURN; } @@ -1566,6 +1600,306 @@ JOIN::cleanup() } +/************************* Cursor ******************************************/ + +void +Cursor::init_from_thd(THD *thd) +{ + /* + We need to save and reset thd->mem_root, otherwise it'll be freed + later in mysql_parse. + */ + mem_root= thd->mem_root; + init_sql_alloc(&thd->mem_root, + thd->variables.query_alloc_block_size, + thd->variables.query_prealloc_size); + + /* + The same is true for open tables and lock: save tables and zero THD + pointers to prevent table close in close_thread_tables (This is a part + of the temporary solution to make cursors work with minimal changes to + the current source base). + */ + derived_tables= thd->derived_tables; + open_tables= thd->open_tables; + lock= thd->lock; + query_id= thd->query_id; + free_list= thd->free_list; + reset_thd(thd); + /* + XXX: thd->locked_tables is not changed. + What problems can we have with it if cursor is open? + */ + /* + TODO: grab thd->free_list here? + */ +} + + +void +Cursor::init_thd(THD *thd) +{ + thd->mem_root= mem_root; + + DBUG_ASSERT(thd->derived_tables == 0); + thd->derived_tables= derived_tables; + + DBUG_ASSERT(thd->open_tables == 0); + thd->open_tables= open_tables; + + DBUG_ASSERT(thd->lock== 0); + thd->lock= lock; + thd->query_id= query_id; + thd->free_list= free_list; +} + + +void +Cursor::reset_thd(THD *thd) +{ + thd->derived_tables= 0; + thd->open_tables= 0; + thd->lock= 0; + thd->free_list= 0; +} + + +int +Cursor::open(JOIN *join_arg) +{ + join= join_arg; + + THD *thd= join->thd; + + /* First non-constant table */ + JOIN_TAB *join_tab= join->join_tab + join->const_tables; + + /* + Send fields description to the client; server_status is sent + in 'EOF' packet, which ends send_fields(). + */ + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS); + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + + /* Prepare JOIN for reading rows. */ + + Next_select_func end_select= join->sort_and_group || join->procedure && + join->procedure->flags & PROC_GROUP ? + end_send_group : end_send; + + join->join_tab[join->tables-1].next_select= end_select; + join->send_records= 0; + join->fetch_limit= join->unit->offset_limit_cnt; + + /* Disable JOIN CACHE as it is not working with cursors yet */ + for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab) + { + if (tab->next_select == sub_select_cache) + tab->next_select= sub_select; + } + + DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0); + DBUG_ASSERT(join_tab->not_used_in_distinct == 0); + /* + null_row is set only if row not found and it's outer join: should never + happen for the first table in join_tab list + */ + DBUG_ASSERT(join_tab->table->null_row == 0); + + return join_tab->read_first_record(join_tab); +} + + +/* + DESCRIPTION + Fetch next num_rows rows from the cursor and sent them to the client + PRECONDITION: + Cursor is open + RETURN VALUES: + -4 there are more rows, send_eof sent to the client + 0 no more rows, send_eof was sent to the client, cursor is closed + other fatal fetch error, cursor is closed (error is not reported) +*/ + +int +Cursor::fetch(ulong num_rows) +{ + THD *thd= join->thd; + JOIN_TAB *join_tab= join->join_tab + join->const_tables;; + COND *on_expr= join_tab->on_expr; + COND *select_cond= join_tab->select_cond; + READ_RECORD *info= &join_tab->read_record; + + int error= 0; + + join->fetch_limit+= num_rows; + + /* + Run while there are new rows in the first table; + For each row, satisfying ON and WHERE clauses (those parts of them which + can be evaluated early), call next_select. + */ + do + { + int no_more_rows; + + join->examined_rows++; + + if (thd->killed) /* Aborted by user */ + { + my_error(ER_SERVER_SHUTDOWN,MYF(0)); + return -1; + } + + if (on_expr == 0 || on_expr->val_int()) + { + if (select_cond == 0 || select_cond->val_int()) + { + /* + TODO: call table->unlock_row() to unlock row failed selection, + when this feature will be used. + */ + error= join_tab->next_select(join, join_tab + 1, 0); + DBUG_ASSERT(error <= 0); + if (error) + { + /* real error or LIMIT/FETCH LIMIT worked */ + if (error == -4) + { + /* + FETCH LIMIT, read ahead one row, and close cursor + if there is no more rows XXX: to be fixed to support + non-equi-joins! + */ + if ((no_more_rows= info->read_record(info))) + error= no_more_rows > 0 ? -1: 0; + } + break; + } + } + } + /* read next row; break loop if there was an error */ + if ((no_more_rows= info->read_record(info))) + { + if (no_more_rows > 0) + error= -1; + else + { + enum { END_OF_RECORDS= 1 }; + error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS); + } + break; + } + } + while (thd->net.report_error == 0); + + if (thd->net.report_error) + error= -1; + + switch (error) { + /* Fetch limit worked, possibly more rows are there */ + case -4: + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + /* save references to memory, allocated during fetch */ + mem_root= thd->mem_root; + free_list= thd->free_list; + break; + /* Limit clause worked: this is the same as 'no more rows' */ + case -3: /* LIMIT clause worked */ + error= 0; + /* fallthrough */ + case 0: /* No more rows */ + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + close(); + thd->server_status|= SERVER_STATUS_LAST_ROW_SENT; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT; + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + default: + close(); + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + } + return error; +} + + +void +Cursor::close() +{ + THD *thd= join->thd; + join->join_free(0); + if (unit) + { + /* In case of UNIONs JOIN is freed inside unit->cleanup() */ + unit->cleanup(); + } + else + { + join->cleanup(); + delete join; + } + /* XXX: Another hack: closing tables used in the cursor */ + { + DBUG_ASSERT(lock || open_tables || derived_tables); + + TABLE *tmp_open_tables= thd->open_tables; + TABLE *tmp_derived_tables= thd->derived_tables; + MYSQL_LOCK *tmp_lock= thd->lock; + + thd->open_tables= open_tables; + thd->derived_tables= derived_tables; + thd->lock= lock; + close_thread_tables(thd); + + thd->open_tables= tmp_derived_tables; + thd->derived_tables= tmp_derived_tables; + thd->lock= tmp_lock; + } +} + + +Cursor::~Cursor() +{ + if (is_open()) + close(); + free_items(free_list); + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + free_root(&mem_root, MYF(0)); +} + +/*********************************************************************/ + + int mysql_select(THD *thd, Item ***rref_pointer_array, TABLE_LIST *tables, uint wild_num, List<Item> &fields, @@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array, join->exec(); + if (thd->cursor && thd->cursor->is_open()) + { + /* + A cursor was opened for the last sweep in exec(). + We are here only if this is mysql_select for top-level SELECT_LEX_UNIT + and there were no error. + */ + free_join= 0; + } + if (thd->lex->describe & DESCRIBE_EXTENDED) { select_lex->where= join->conds_history; @@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables, if (having && having->val_int() == 0) send_row=0; } - if (!(result->send_fields(fields,1))) + if (!(result->send_fields(fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))) { if (send_row) { @@ -7035,7 +7380,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure) { int error= 0; JOIN_TAB *join_tab; - int (*end_select)(JOIN *, struct st_join_table *,bool); + Next_select_func end_select; DBUG_ENTER("do_select"); join->procedure=procedure; @@ -7043,7 +7388,8 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure) Tell the client how many fields there are in a row */ if (!table) - join->result->send_fields(*fields,1); + join->result->send_fields(*fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); else { VOID(table->file->extra(HA_EXTRA_WRITE_CACHE)); @@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), } DBUG_RETURN(-3); // Abort nicely } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows for + this fetch request are sent. + */ + DBUG_RETURN(-4); + } } else { @@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), join->do_send_rows=0; join->unit->select_limit_cnt = HA_POS_ERROR; } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows + for this fetch request are sent. + */ + DBUG_RETURN(-4); + } } } else diff --git a/sql/sql_select.h b/sql/sql_select.h index 8ffe50e6db2..284e4315917 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -81,6 +81,10 @@ enum join_type { JT_UNKNOWN,JT_SYSTEM,JT_CONST,JT_EQ_REF,JT_REF,JT_MAYBE_REF, class JOIN; +typedef int (*Next_select_func)(JOIN *,struct st_join_table *,bool); +typedef int (*Read_record_func)(struct st_join_table *tab); + + typedef struct st_join_table { TABLE *table; KEYUSE *keyuse; /* pointer to first used key */ @@ -95,8 +99,8 @@ typedef struct st_join_table { st_join_table *first_upper; /* first inner table for embedding outer join */ st_join_table *first_unmatched; /* used for optimization purposes only */ const char *info; - int (*read_first_record)(struct st_join_table *tab); - int (*next_select)(JOIN *,struct st_join_table *,bool); + Read_record_func read_first_record; + Next_select_func next_select; READ_RECORD read_record; double worst_seeks; key_map const_keys; /* Keys with constant part */ @@ -149,6 +153,16 @@ class JOIN :public Sql_alloc bool do_send_rows; table_map const_table_map,found_const_table_map,outer_join; ha_rows send_records,found_records,examined_rows,row_limit, select_limit; + /* + Used to fetch no more than given amount of rows per one + fetch operation of server side cursor. + The value is checked in end_send and end_send_group in fashion, similar + to offset_limit_cnt: + - fetch_limit= HA_POS_ERROR if there is no cursor. + - when we open a cursor, we set fetch_limit to 0, + - on each fetch iteration we add num_rows to fetch to fetch_limit + */ + ha_rows fetch_limit; POSITION positions[MAX_TABLES+1],best_positions[MAX_TABLES+1]; double best_read; List<Item> *fields; @@ -239,6 +253,7 @@ class JOIN :public Sql_alloc do_send_rows= 1; send_records= 0; found_records= 0; + fetch_limit= HA_POS_ERROR; examined_rows= 0; exec_tmp_table1= 0; exec_tmp_table2= 0; @@ -319,6 +334,44 @@ class JOIN :public Sql_alloc }; +/* + Server-side cursor (now stands only for basic read-only cursor) + See class implementation in sql_select.cc +*/ + +class Cursor: public Sql_alloc, public Item_arena +{ + JOIN *join; + SELECT_LEX_UNIT *unit; + + TABLE *open_tables; + MYSQL_LOCK *lock; + TABLE *derived_tables; + /* List of items created during execution */ + ulong query_id; +public: + select_send result; + + /* Temporary implementation as now we replace THD state by value */ + /* Save THD state into cursor */ + void init_from_thd(THD *thd); + /* Restore THD from cursor to continue cursor execution */ + void init_thd(THD *thd); + /* bzero cursor state in THD */ + void reset_thd(THD *thd); + + int open(JOIN *join); + int fetch(ulong num_rows); + void reset() { join= 0; } + bool is_open() const { return join != 0; } + void close(); + + void set_unit(SELECT_LEX_UNIT *unit_arg) { unit= unit_arg; } + Cursor() :join(0), unit(0) {} + ~Cursor(); +}; + + typedef struct st_select_check { uint const_ref,reg_ref; } SELECT_CHECK; diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 5064a95d468..c11207eac24 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -66,7 +66,8 @@ mysqld_show_dbs(THD *thd,const char *wild) strxmov(end," (",wild,")",NullS); field_list.push_back(field); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,NullS,mysql_data_home,wild,1)) DBUG_RETURN(1); @@ -107,7 +108,8 @@ int mysqld_show_open_tables(THD *thd,const char *wild) field_list.push_back(new Item_return_int("In_use", 1, MYSQL_TYPE_TINY)); field_list.push_back(new Item_return_int("Name_locked", 4, MYSQL_TYPE_TINY)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (!(open_list=list_open_tables(thd,wild)) && thd->is_fatal_error) @@ -160,7 +162,8 @@ int mysqld_show_tables(THD *thd,const char *db,const char *wild) field_list.push_back(field); if (show_type) field_list.push_back(new Item_empty_string("table_type", 10)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,db,path,wild,0)) DBUG_RETURN(-1); @@ -208,7 +211,8 @@ int mysqld_show_storage_engines(THD *thd) field_list.push_back(new Item_empty_string("Support",10)); field_list.push_back(new Item_empty_string("Comment",80)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); const char *default_type_name= @@ -282,7 +286,8 @@ int mysqld_show_privileges(THD *thd) field_list.push_back(new Item_empty_string("Context",15)); field_list.push_back(new Item_empty_string("Comment",NAME_LEN)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); show_privileges_st *privilege= sys_privileges; @@ -357,7 +362,8 @@ int mysqld_show_column_types(THD *thd) field_list.push_back(new Item_empty_string("Default",NAME_LEN)); field_list.push_back(new Item_empty_string("Comment",NAME_LEN)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); /* TODO: Change the loop to not use 'i' */ @@ -530,7 +536,8 @@ int mysqld_extend_show_tables(THD *thd,const char *db,const char *wild) item->maybe_null=1; field_list.push_back(item=new Item_empty_string("Comment",80)); item->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,db,path,wild,0)) @@ -721,7 +728,7 @@ mysqld_show_fields(THD *thd, TABLE_LIST *table_list,const char *wild, } // Send first number of fields and records if (protocol->send_records_num(&field_list, (ulonglong)file->records) || - protocol->send_fields(&field_list,0)) + protocol->send_fields(&field_list, Protocol::SEND_EOF)) DBUG_RETURN(1); restore_record(table,default_values); // Get empty record @@ -859,7 +866,8 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list) field_list.push_back(new Item_empty_string("Create Table", max(buffer.length(),1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); buffer.length(0); @@ -943,7 +951,8 @@ int mysqld_show_create_db(THD *thd, char *dbname, field_list.push_back(new Item_empty_string("Database",NAME_LEN)); field_list.push_back(new Item_empty_string("Create Database",1024)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); @@ -985,7 +994,8 @@ mysqld_show_logs(THD *thd) field_list.push_back(new Item_empty_string("Type",10)); field_list.push_back(new Item_empty_string("Status",10)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); #ifdef HAVE_BERKELEY_DB @@ -1034,7 +1044,8 @@ mysqld_show_keys(THD *thd, TABLE_LIST *table_list) field_list.push_back(new Item_empty_string("Comment",255)); item->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); KEY *key_info=table->key_info; @@ -1121,7 +1132,8 @@ mysqld_list_fields(THD *thd, TABLE_LIST *table_list, const char *wild) field_list.push_back(new Item_field(field)); } restore_record(table,default_values); // Get empty record - if (thd->protocol->send_fields(&field_list,2)) + if (thd->protocol->send_fields(&field_list, Protocol::SEND_DEFAULTS | + Protocol::SEND_EOF)) DBUG_VOID_RETURN; net_flush(&thd->net); DBUG_VOID_RETURN; @@ -1615,7 +1627,8 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) field->maybe_null=1; field_list.push_back(field=new Item_empty_string("Info",max_query_length)); field->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_VOID_RETURN; VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list @@ -1751,7 +1764,8 @@ int mysqld_show_collations(THD *thd, const char *wild) field_list.push_back(new Item_empty_string("Compiled",30)); field_list.push_back(new Item_return_int("Sortlen",3, FIELD_TYPE_SHORT)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ ) @@ -1804,7 +1818,8 @@ int mysqld_show_charsets(THD *thd, const char *wild) field_list.push_back(new Item_empty_string("Default collation",60)); field_list.push_back(new Item_return_int("Maxlen",3, FIELD_TYPE_SHORT)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ ) @@ -1838,7 +1853,8 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables, field_list.push_back(new Item_empty_string("Variable_name",30)); field_list.push_back(new Item_empty_string("Value",256)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); /* purecov: inspected */ null_lex_str.str= 0; // For sys_var->value_ptr() null_lex_str.length= 0; diff --git a/sql/sql_table.cc b/sql/sql_table.cc index ea57536d7c1..93fb7930da7 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1738,7 +1738,8 @@ static int mysql_admin_table(THD* thd, TABLE_LIST* tables, item->maybe_null = 1; field_list.push_back(item = new Item_empty_string("Msg_text", 255)); item->maybe_null = 1; - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); mysql_ha_close(thd, tables, /*dont_send_ok*/ 1, /*dont_lock*/ 1); @@ -3449,7 +3450,8 @@ int mysql_checksum_table(THD *thd, TABLE_LIST *tables, HA_CHECK_OPT *check_opt) item->maybe_null= 1; field_list.push_back(item=new Item_int("Checksum",(longlong) 1,21)); item->maybe_null= 1; - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); for (table= tables; table; table= table->next_local) diff --git a/sql/sql_union.cc b/sql/sql_union.cc index 35f8a390308..d6b776571f2 100644 --- a/sql/sql_union.cc +++ b/sql/sql_union.cc @@ -31,7 +31,13 @@ int mysql_union(THD *thd, LEX *lex, select_result *result, int res, res_cln; if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK))) res= unit->exec(); - res_cln= unit->cleanup(); + if (res == 0 && thd->cursor && thd->cursor->is_open()) + { + thd->cursor->set_unit(unit); + res_cln= 0; + } + else + res_cln= unit->cleanup(); DBUG_RETURN(res?res:res_cln); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 5d0e4627b69..0a86b330f18 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -35,7 +35,7 @@ INCLUDES = -I$(top_srcdir)/include $(openssl_includes) LIBS = @CLIENT_LIBS@ LDADD = @CLIENT_EXTRA_LDFLAGS@ ../libmysql/libmysqlclient.la client_test_LDADD= $(LDADD) $(CXXLDFLAGS) -client_test_SOURCES= client_test.c +client_test_SOURCES= client_test.cc insert_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) select_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) diff --git a/tests/client_test.c b/tests/client_test.cc index 13f5a3ac852..36adbc381e1 100644 --- a/tests/client_test.c +++ b/tests/client_test.cc @@ -693,6 +693,240 @@ static void client_use_result() } +/* + Accepts arbitrary number of queries and runs them against the database. + Used to fill tables for each test. +*/ + +void fill_tables(const char **query_list, unsigned query_count) +{ + int rc; + for (const char **query= query_list; query < query_list + query_count; + ++query) + { + rc= mysql_query(mysql, *query); + if (rc) + { + fprintf(stderr, + "fill_tables failed: query is\n" + "%s,\n" + "error: %s\n", *query, mysql_error(mysql)); + exit(1); + } + } +} + +/* + All state of fetch from one statement: statement handle, out buffers, + fetch position. + See fetch_n for for the only use case. +*/ + +struct Stmt_fetch +{ + enum { MAX_COLUMN_LENGTH= 255 }; + + Stmt_fetch() {} + ~Stmt_fetch(); + + void init(unsigned stmt_no_arg, const char *query_arg); + int fetch_row(); + + const char *query; + unsigned stmt_no; + MYSQL_STMT *stmt; + bool is_open; + MYSQL_BIND *bind_array; + char **out_data; + unsigned long *out_data_length; + unsigned column_count; + unsigned row_count; +}; + +/* + Create statement handle, prepare it with statement, execute and allocate + fetch buffers. +*/ + +void Stmt_fetch::init(unsigned stmt_no_arg, const char *query_arg) +{ + unsigned long type= CURSOR_TYPE_READ_ONLY; + int rc; + unsigned i; + MYSQL_RES *metadata; + + /* Save query and statement number for error messages */ + stmt_no= stmt_no_arg; + query= query_arg; + + stmt= mysql_stmt_init(mysql); + + rc= mysql_stmt_prepare(stmt, query, strlen(query)); + if (rc) + { + fprintf(stderr, + "mysql_stmt_prepare of stmt %d failed:\n" + "query: %s\n" + "error: %s\n", + stmt_no, query, mysql_stmt_error(stmt)); + exit(1); + } + + /* + The attribute is sent to server on execute and asks to open read-only + for result set + */ + mysql_stmt_attr_set(stmt, STMT_ATTR_CURSOR_TYPE, (const void *) &type); + + rc= mysql_stmt_execute(stmt); + if (rc) + { + fprintf(stderr, + "mysql_stmt_execute of stmt %d failed:\n" + "query: %s\n" + "error: %s\n", + stmt_no, query, mysql_stmt_error(stmt)); + exit(1); + } + + /* Find out total number of columns in result set */ + metadata= mysql_stmt_result_metadata(stmt); + column_count= mysql_num_fields(metadata); + mysql_free_result(metadata); + + /* + Now allocate bind handles and buffers for output data: + calloc memory to reduce number of MYSQL_BIND members we need to + set up. + */ + + bind_array= (MYSQL_BIND *) calloc(1, sizeof(MYSQL_BIND) * column_count); + out_data= (char **) calloc(1, sizeof(*out_data) * column_count); + out_data_length= (unsigned long *) calloc(1, + sizeof(*out_data_length) * column_count); + + for (i= 0; i < column_count; ++i) + { + out_data[i]= (char *) calloc(1, MAX_COLUMN_LENGTH); + bind_array[i].buffer_type= MYSQL_TYPE_STRING; + bind_array[i].buffer= out_data[i]; + bind_array[i].buffer_length= MAX_COLUMN_LENGTH; + bind_array[i].length= out_data_length + i; + } + + mysql_stmt_bind_result(stmt, bind_array); + + row_count= 0; + is_open= true; + + /* Ready for reading rows */ +} + + +/* Fetch and print one row from cursor */ + +int Stmt_fetch::fetch_row() +{ + int rc; + unsigned i; + + if ((rc= mysql_stmt_fetch(stmt)) == 0) + { + ++row_count; + printf("Stmt %d fetched row %d:\n", stmt_no, row_count); + for (i= 0; i < column_count; ++i) + { + out_data[i][out_data_length[i]]= '\0'; + printf("column %d: %s\n", i+1, out_data[i]); + } + } + else + is_open= false; + return rc; +} + + +Stmt_fetch::~Stmt_fetch() +{ + unsigned i; + + for (i= 0; i < column_count; ++i) + free(out_data[i]); + free(out_data); + free(bind_array); + mysql_stmt_close(stmt); +} + +/* We need these to compile without libstdc++ */ + +void *operator new[] (size_t sz) +{ + return (void *) malloc (sz ? sz : 1); +} + +void operator delete[] (void *ptr) throw () +{ + if (ptr) + free(ptr); +} + +/* + For given array of queries, open query_count cursors and fetch + from them in simultaneous manner. + In case there was an error in one of the cursors, continue + reading from the rest. +*/ + +bool fetch_n(const char **query_list, unsigned query_count) +{ + unsigned open_statements= query_count; + unsigned i; + int rc, error_count= 0; + Stmt_fetch *stmt_array= new Stmt_fetch[query_count]; + Stmt_fetch *stmt; + + for (i= 0; i < query_count; ++i) + { + /* Init will exit(1) in case of error */ + stmt_array[i].init(i, query_list[i]); + } + + while (open_statements) + { + for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt) + { + if (stmt->is_open && (rc= stmt->fetch_row())) + { + --open_statements; + /* + We try to fetch from the rest of the statements in case of + error + */ + if (rc != MYSQL_NO_DATA) + { + fprintf(stderr, + "Got error reading rows from statement %d,\n" + "query is: %s,\n" + "error message: %s", stmt - stmt_array, stmt->query, + mysql_stmt_error(stmt->stmt)); + ++error_count; + } + } + } + } + if (error_count) + fprintf(stderr, "Fetch FAILED"); + else + { + unsigned total_row_count= 0; + for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt) + total_row_count+= stmt->row_count; + printf("Success, total rows fetched: %d\n", total_row_count); + } + delete [] stmt_array; + return error_count != 0; +} + /* Separate thread query to test some cases */ static my_bool thread_query(char *query) @@ -10043,7 +10277,7 @@ static void test_view() int rc, i; MYSQL_BIND bind[1]; char str_data[50]; - long length = 0L; + ulong length = 0L; long is_null = 0L; const char *query= "SELECT COUNT(*) FROM v1 WHERE `SERVERNAME`=?"; @@ -10141,7 +10375,7 @@ static void test_view_2where() int rc, i; MYSQL_BIND bind[8]; char parms[8][100]; - long length[8]; + ulong length[8]; const char *query= "SELECT `RELID` ,`REPORT` ,`HANDLE` ,`LOG_GROUP` ,`USERNAME` ,`VARIANT` ,`TYPE` ,`VERSION` ,`ERFDAT` ,`ERFTIME` ,`ERFNAME` ,`AEDAT` ,`AETIME` ,`AENAME` ,`DEPENDVARS` ,`INACTIVE` FROM `V_LTDX` WHERE `MANDT` = ? AND `RELID` = ? AND `REPORT` = ? AND `HANDLE` = ? AND `LOG_GROUP` = ? AND `USERNAME` IN ( ? , ? ) AND `TYPE` = ?"; myheader("test_view_2where"); @@ -10189,7 +10423,7 @@ static void test_view_star() int rc, i; MYSQL_BIND bind[8]; char parms[8][100]; - long length[8]; + ulong length[8]; const char *query= "SELECT * FROM vt1 WHERE a IN (?,?)"; myheader("test_view_star"); @@ -10338,7 +10572,7 @@ static void test_view_insert_fields() { MYSQL_STMT *stmt; char parm[11][1000]; - long l[11]; + ulong l[11]; int rc, i; MYSQL_BIND bind[11]; const char *query= "INSERT INTO `v1` ( `K1C4` ,`K2C4` ,`K3C4` ,`K4N4` ,`F1C4` ,`F2I4` ,`F3N5` ,`F7F8` ,`F6N4` ,`F5C8` ,`F9D8` ) VALUES( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? )"; @@ -10390,6 +10624,61 @@ static void test_view_insert_fields() } + +static void test_basic_cursors() +{ + myheader("test_basic_cursors"); + const char *basic_tables[]= + { + "DROP TABLE IF EXISTS t1, t2", + + "CREATE TABLE t1 " + "(id INTEGER NOT NULL PRIMARY KEY, " + " name VARCHAR(20) NOT NULL)", + + "INSERT INTO t1 (id, name) VALUES " + " (2, 'Ja'), (3, 'Ede'), " + " (4, 'Haag'), (5, 'Kabul'), " + " (6, 'Almere'), (7, 'Utrecht'), " + " (8, 'Qandahar'), (9, 'Amsterdam'), " + " (10, 'Amersfoort'), (11, 'Constantine')", + + "CREATE TABLE t2 " + "(id INTEGER NOT NULL PRIMARY KEY, " + " name VARCHAR(20) NOT NULL)", + + "INSERT INTO t2 (id, name) VALUES " + " (4, 'Guam'), (5, 'Aruba'), " + " (6, 'Angola'), (7, 'Albania'), " + " (8, 'Anguilla'), (9, 'Argentina'), " + " (10, 'Azerbaijan'), (11, 'Afghanistan'), " + " (12, 'Burkina Faso'), (13, 'Faroe Islands')" + }; + + fill_tables(basic_tables, sizeof(basic_tables)/sizeof(*basic_tables)); + + const char *queries[]= + { + "SELECT * FROM t1", + "SELECT * FROM t2" + }; + + fetch_n(queries, sizeof(queries)/sizeof(*queries)); +} + + +static void test_cursors_with_union() +{ + myheader("test_cursors_with_union"); + + const char *queries[]= + { + "SELECT t1.name FROM t1 UNION SELECT t2.name FROM t2", + "SELECT t1.id FROM t1 WHERE t1.id < 5" + }; + fetch_n(queries, sizeof(queries)/sizeof(*queries)); +} + /* Read and parse arguments and MySQL options from my.cnf */ @@ -10694,6 +10983,8 @@ int main(int argc, char **argv) test_view_insert(); /* inserting in VIEW without field list */ test_left_join_view(); /* left join on VIEW with WHERE condition */ test_view_insert_fields(); /* insert into VIOEW with fields list */ + test_basic_cursors(); + test_cursors_with_union(); /* XXX: PLEASE RUN THIS PROGRAM UNDER VALGRIND AND VERIFY THAT YOUR TEST DOESN'T CONTAIN WARNINGS/ERRORS BEFORE YOU PUSH. |