diff options
author | Alexey Botchkov <holyfoot@askmonty.org> | 2021-09-06 22:34:35 +0400 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2021-10-19 17:35:06 +0200 |
commit | 0a0dfd63d9a0ae5adb139159e0aeca93c5c2d5c9 (patch) | |
tree | 66209a932d5575a6c3021ab1d215c16e9a3bead7 /sql/sql_prepare.cc | |
parent | 401ff6994d842a4072b7b155e5a958e178e6497a (diff) | |
download | mariadb-git-0a0dfd63d9a0ae5adb139159e0aeca93c5c2d5c9.tar.gz |
MDEV-19275 Provide SQL service to plugins.
SQL service added.
It provides the limited set of client library functions
to be used by plugin.
Diffstat (limited to 'sql/sql_prepare.cc')
-rw-r--r-- | sql/sql_prepare.cc | 411 |
1 files changed, 252 insertions, 159 deletions
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index f9f008602ed..19ae176ddcb 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -133,6 +133,7 @@ static const uint PARAMETER_FLAG_UNSIGNED= 128U << 8; #include "wsrep_trans_observer.h" #endif /* WITH_WSREP */ #include "xa.h" // xa_recover_get_fields +#include "sql_audit.h" // mysql_audit_release /** A result class used to send cursor rows using the binary protocol. @@ -4049,19 +4050,22 @@ Execute_sql_statement(LEX_STRING sql_text) executions without having to cleanup/reset THD in between. */ -bool -Execute_sql_statement::execute_server_code(THD *thd) +static bool execute_server_code(THD *thd, + const char *sql_text, size_t sql_len) { PSI_statement_locker *parent_locker; bool error; + query_id_t save_query_id= thd->query_id; + query_id_t next_id= next_query_id(); - if (alloc_query(thd, m_sql_text.str, m_sql_text.length)) + if (alloc_query(thd, sql_text, sql_len)) return TRUE; Parser_state parser_state; if (parser_state.init(thd, thd->query(), thd->query_length())) return TRUE; + thd->query_id= next_id; parser_state.m_lip.multi_statements= FALSE; lex_start(thd); @@ -4079,17 +4083,23 @@ Execute_sql_statement::execute_server_code(THD *thd) /* report error issued during command execution */ if (likely(error == 0) && thd->spcont == NULL) - general_log_write(thd, COM_STMT_EXECUTE, + general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); end: thd->lex->restore_set_statement_var(); + thd->query_id= save_query_id; delete_explain_query(thd->lex); lex_end(thd->lex); return error; } +bool Execute_sql_statement::execute_server_code(THD *thd) +{ + return ::execute_server_code(thd, m_sql_text.str, m_sql_text.length); +} + /*************************************************************************** Prepared_statement ****************************************************************************/ @@ -4850,7 +4860,10 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable) Statement stmt_backup; bool error; Query_arena *save_stmt_arena= thd->stmt_arena; + my_time_t save_query_start= thd->query_start(); + ulong save_query_sec= thd->start_time_sec_part; Item_change_list save_change_list; + thd->Item_change_list::move_elements_to(&save_change_list); state= STMT_CONVENTIONAL_EXECUTION; @@ -4858,6 +4871,7 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable) if (!(lex= new (mem_root) st_lex_local)) return TRUE; + thd->set_time(); thd->set_n_backup_statement(this, &stmt_backup); thd->set_n_backup_active_arena(this, &stmt_backup); thd->stmt_arena= this; @@ -4871,6 +4885,7 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable) thd->stmt_arena= save_stmt_arena; save_change_list.move_elements_to(thd); + thd->force_set_time(save_query_start, save_query_sec); /* Items and memory will freed in destructor */ @@ -5582,14 +5597,6 @@ Ed_connection::store_result_set() return ed_result_set; } -/* - MENT-56 - Protocol_local and service_sql for plugins to enable 'local' SQL query execution. -*/ - -#ifndef EMBEDDED_LIBRARY -// This part is mostly copied from libmysqld/lib_sql.cc -// TODO: get rid of code duplications #include <mysql.h> #include "../libmysqld/embedded_priv.h" @@ -5605,11 +5612,13 @@ public: char **next_field; MYSQL_FIELD *next_mysql_field; MEM_ROOT *alloc; + THD *new_thd; - Protocol_local(THD *thd_arg, ulong prealloc= 0) : + Protocol_local(THD *thd_arg, THD *new_thd_arg, ulong prealloc) : Protocol_text(thd_arg, prealloc), - cur_data(0), first_data(0), data_tail(&first_data), alloc(0) - {} + cur_data(0), first_data(0), data_tail(&first_data), alloc(0), + new_thd(new_thd_arg) + {} protected: bool net_store_data(const uchar *from, size_t length); @@ -5681,6 +5690,20 @@ MYSQL_DATA *Protocol_local::alloc_new_dataset() } +void Protocol_local::clear_data_list() +{ + while (first_data) + { + MYSQL_DATA *data= first_data; + first_data= data->embedded_info->next; + free_rows(data); + } + data_tail= &first_data; + free_rows(cur_data); + cur_data= 0; +} + + static char *dup_str_aux(MEM_ROOT *root, const char *from, uint length, CHARSET_INFO *fromcs, CHARSET_INFO *tocs) { @@ -5974,7 +5997,6 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags) { List_iterator_fast<Item> it(*list); Item *item; -// Protocol_local prot(thd); DBUG_ENTER("send_result_set_metadata"); // if (!thd->mysql) // bootstrap file handling @@ -5985,7 +6007,7 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags) for (uint pos= 0 ; (item= it++); pos++) { - if (/*prot.*/store_item_metadata(thd, item, pos)) + if (store_item_metadata(thd, item, pos)) goto err; } @@ -5999,6 +6021,7 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags) DBUG_RETURN(1); /* purecov: inspected */ } + static void list_fields_send_default(THD *thd, Protocol_local *p, Field *fld, uint pos) { @@ -6086,19 +6109,6 @@ bool Protocol_local::store_null() #include <sql_common.h> #include <errmsg.h> -struct local_results -{ - struct st_mysql_data *cur_data; - struct st_mysql_data *first_data; - struct st_mysql_data **data_tail; - void clear_data_list(); - struct st_mysql_data *alloc_new_dataset(); - char **next_field; - MYSQL_FIELD *next_mysql_field; - MEM_ROOT *alloc; -}; - - static void embedded_get_error(MYSQL *mysql, MYSQL_DATA *data) { NET *net= &mysql->net; @@ -6113,11 +6123,11 @@ static void embedded_get_error(MYSQL *mysql, MYSQL_DATA *data) static my_bool loc_read_query_result(MYSQL *mysql) { - local_results *thd= (local_results *) mysql->thd; + Protocol_local *p= (Protocol_local *) mysql->thd; - MYSQL_DATA *res= thd->first_data; - DBUG_ASSERT(!thd->cur_data); - thd->first_data= res->embedded_info->next; + MYSQL_DATA *res= p->first_data; + DBUG_ASSERT(!p->cur_data); + p->first_data= res->embedded_info->next; if (res->embedded_info->last_errno && !res->embedded_info->fields_list) { @@ -6145,7 +6155,7 @@ static my_bool loc_read_query_result(MYSQL *mysql) if (res->embedded_info->fields_list) { mysql->status=MYSQL_STATUS_GET_RESULT; - thd->cur_data= res; + p->cur_data= res; } else my_free(res); @@ -6154,23 +6164,193 @@ static my_bool loc_read_query_result(MYSQL *mysql) } +static my_bool +loc_advanced_command(MYSQL *mysql, enum enum_server_command command, + const uchar *header, ulong header_length, + const uchar *arg, ulong arg_length, my_bool skip_check, + MYSQL_STMT *stmt) +{ + my_bool result= 1; + Protocol_local *p= (Protocol_local *) mysql->thd; + NET *net= &mysql->net; + + if (p->thd && p->thd->killed != NOT_KILLED) + { + if (p->thd->killed < KILL_CONNECTION) + p->thd->killed= NOT_KILLED; + else + return 1; + } + + p->clear_data_list(); + /* Check that we are calling the client functions in right order */ + if (mysql->status != MYSQL_STATUS_READY) + { + set_mysql_error(mysql, CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate); + goto end; + } + + /* Clear result variables */ + p->thd->clear_error(1); + mysql->affected_rows= ~(my_ulonglong) 0; + mysql->field_count= 0; + net_clear_error(net); + + /* + We have to call free_old_query before we start to fill mysql->fields + for new query. In the case of embedded server we collect field data + during query execution (not during data retrieval as it is in remote + client). So we have to call free_old_query here + */ + free_old_query(mysql); + + if (header) + { + arg= header; + arg_length= header_length; + } + + if (p->new_thd) + { + THD *thd_orig= current_thd; + set_current_thd(p->thd); + p->thd->thread_stack= (char*) &result; + p->thd->set_time(); + result= execute_server_code(p->thd, (const char *)arg, arg_length); + p->thd->cleanup_after_query(); + mysql_audit_release(p->thd); + p->end_statement(); + set_current_thd(thd_orig); + } + else + { + Ed_connection con(p->thd); + MYSQL_LEX_STRING sql_text; + DBUG_ASSERT(current_thd == p->thd); + sql_text.str= (char *) arg; + sql_text.length= arg_length; + result= con.execute_direct(p, sql_text); + } + if (skip_check) + result= 0; + p->cur_data= 0; + +end: + return result; +} + + +/* + reads dataset from the next query result + + SYNOPSIS + loc_read_rows() + mysql connection handle + other parameters are not used + + NOTES + It just gets next MYSQL_DATA from the result's queue + + RETURN + pointer to MYSQL_DATA with the coming recordset +*/ + +static MYSQL_DATA * +loc_read_rows(MYSQL *mysql, MYSQL_FIELD *mysql_fields __attribute__((unused)), + unsigned int fields __attribute__((unused))) +{ + MYSQL_DATA *result= ((Protocol_local *)mysql->thd)->cur_data; + ((Protocol_local *)mysql->thd)->cur_data= 0; + if (result->embedded_info->last_errno) + { + embedded_get_error(mysql, result); + return NULL; + } + *result->embedded_info->prev_ptr= NULL; + return result; +} + + +/************************************************************************** + Get column lengths of the current row + If one uses mysql_use_result, res->lengths contains the length information, + else the lengths are calculated from the offset between pointers. +**************************************************************************/ + +static void loc_fetch_lengths(ulong *to, MYSQL_ROW column, + unsigned int field_count) +{ + MYSQL_ROW end; + + for (end=column + field_count; column != end ; column++,to++) + *to= *column ? *(uint *)((*column) - sizeof(uint)) : 0; +} + + +static void loc_flush_use_result(MYSQL *mysql, my_bool) +{ + Protocol_local *p= (Protocol_local *) mysql->thd; + if (p->cur_data) + { + free_rows(p->cur_data); + p->cur_data= 0; + } + else if (p->first_data) + { + MYSQL_DATA *data= p->first_data; + p->first_data= data->embedded_info->next; + free_rows(data); + } +} + + +static void loc_on_close_free(MYSQL *mysql) +{ + Protocol_local *p= (Protocol_local *) mysql->thd; + THD *thd= p->new_thd; + delete p; + if (thd) + { + delete thd; + local_connection_thread_count--; + } + my_free(mysql->info_buffer); + mysql->info_buffer= 0; +} + + static MYSQL_METHODS local_methods= { loc_read_query_result, /* read_query_result */ - NULL/*loc_advanced_command*/, /* advanced_command */ - NULL/*loc_read_rows*/, /* read_rows */ - NULL/*loc_use_result*/, /* use_result */ - NULL/*loc_fetch_lengths*/, /* fetch_lengths */ - NULL/*loc_flush_use_result*/, /* flush_use_result */ - NULL/*loc_read_change_user_result*/ /* read_change_user_result */ + loc_advanced_command, /* advanced_command */ + loc_read_rows, /* read_rows */ + mysql_store_result, /* use_result */ + loc_fetch_lengths, /* fetch_lengths */ + loc_flush_use_result, /* flush_use_result */ + NULL, /* read_change_user_result */ + loc_on_close_free /* on_close_free */ +#ifdef EMBEDDED_LIBRARY + ,NULL, /* list_fields */ + NULL, /* read_prepare_result */ + NULL, /* stmt_execute */ + NULL, /* read_binary_rows */ + NULL, /* unbuffered_fetch */ + NULL, /* read_statistics */ + NULL, /* next_result */ + NULL /* read_rows_from_cursor */ +#endif }; +Atomic_counter<uint32_t> local_connection_thread_count; + extern "C" MYSQL *mysql_real_connect_local(MYSQL *mysql, - const char *host, const char *user, const char *passwd, const char *db) + const char *host, const char *user, const char *db, + unsigned long clientflag) { - //char name_buff[USERNAME_LENGTH]; - + THD *thd_orig= current_thd; + THD *new_thd; + Protocol_local *p; DBUG_ENTER("mysql_real_connect_local"); /* Test whether we're already connected */ @@ -6191,137 +6371,50 @@ extern "C" MYSQL *mysql_real_connect_local(MYSQL *mysql, if (!user || !user[0]) user=mysql->options.user; - mysql->user= my_strdup(PSI_INSTRUMENT_ME, user, MYF(0)); - + mysql->user= NULL; mysql->info_buffer= (char *) my_malloc(PSI_INSTRUMENT_ME, MYSQL_ERRMSG_SIZE, MYF(0)); - //mysql->thd= create_embedded_thd(client_flag); - - //init_embedded_mysql(mysql, client_flag); - - //if (mysql_init_character_set(mysql)) - // goto error; - - //if (check_embedded_connection(mysql, db)) - // goto error; - - mysql->server_status= SERVER_STATUS_AUTOCOMMIT; - - //if (mysql->options.init_commands) - //{ - // DYNAMIC_ARRAY *init_commands= mysql->options.init_commands; - // char **ptr= (char**)init_commands->buffer; - // char **end= ptr + init_commands->elements; -// - // for (; ptr<end; ptr++) - // { - // MYSQL_RES *res; - // if (mysql_query(mysql,*ptr)) - // goto error; - // if (mysql->fields) - // { - // if (!(res= (*mysql->methods->use_result)(mysql))) - // goto error; - // mysql_free_result(res); - // } - // } - //} - - DBUG_PRINT("exit",("Mysql handler: %p", mysql)); - DBUG_RETURN(mysql); - -//error: - DBUG_PRINT("error",("message: %u (%s)", - mysql->net.last_errno, - mysql->net.last_error)); + if (!thd_orig || thd_orig->lock) { - /* Free alloced memory */ - my_bool free_me=mysql->free_me; - free_old_query(mysql); - mysql->free_me=0; - mysql_close(mysql); - mysql->free_me=free_me; - } - DBUG_RETURN(0); -} - - -extern "C" int execute_sql_command(const char *command, - char *hosts, char *names, char *filters) -{ - MYSQL_LEX_STRING sql_text; - THD *thd= current_thd; - THD *new_thd= 0; - int result; - my_bool qc_save= 0; - Reprepare_observer *save_reprepare_observer= nullptr; + /* + When we start with the empty current_thd (that happens when plugins + are loaded during the server start) or when some tables are locked + with the current_thd already (that happens when INSTALL PLUGIN + calls the plugin_init or with queries), we create the new THD for + the local connection. So queries with this MYSQL will be run with + it rather than the current THD. + */ - if (!thd) - { new_thd= new THD(0); - new_thd->thread_stack= (char*) &sql_text; + local_connection_thread_count++; + new_thd->thread_stack= (char*) &thd_orig; new_thd->store_globals(); new_thd->security_ctx->skip_grants(); new_thd->query_cache_is_applicable= 0; new_thd->variables.wsrep_on= 0; + /* + TOSO: decide if we should turn the auditing off + for such threads. + We can do it like this: + new_thd->audit_class_mask[0]= ~0; + */ bzero((char*) &new_thd->net, sizeof(new_thd->net)); - thd= new_thd; + set_current_thd(thd_orig); + thd_orig= new_thd; } else - { - if (thd->lock) - /* Doesn't work if the thread opened/locked tables already. */ - return 2; + new_thd= NULL; - qc_save= thd->query_cache_is_applicable; - thd->query_cache_is_applicable= 0; - save_reprepare_observer= thd->m_reprepare_observer; - thd->m_reprepare_observer= nullptr; - } - sql_text.str= (char *) command; - sql_text.length= strlen(command); - { - Protocol_local p(thd); - Ed_connection con(thd); - result= con.execute_direct(&p, sql_text); - if (!result && p.first_data) - { - int nr= (int) p.first_data->rows; - MYSQL_ROWS *rows= p.first_data->data; + p= new Protocol_local(thd_orig, new_thd, 0); + if (new_thd) + new_thd->protocol= p; - while (nr--) - { - strcpy(hosts, rows->data[0]); - hosts+= strlen(hosts) + 1; - strcpy(names, rows->data[1]); - names+= strlen(names) + 1; - if (filters) - { - strcpy(filters, rows->data[2]); - filters+= strlen(filters) + 1; - } - rows= rows->next; - } - } - if (p.first_data) - { - if (p.alloc) - free_root(p.alloc, MYF(0)); - my_free(p.first_data); - } - } + mysql->thd= p; + mysql->server_status= SERVER_STATUS_AUTOCOMMIT; - if (new_thd) - delete new_thd; - else - { - thd->query_cache_is_applicable= qc_save; - thd->m_reprepare_observer= save_reprepare_observer; - } - *hosts= 0; - return result; + DBUG_PRINT("exit",("Mysql handler: %p", mysql)); + DBUG_RETURN(mysql); } -#endif /*!EMBEDDED_LIBRARY*/ |