summaryrefslogtreecommitdiff
path: root/sql/sql_prepare.cc
diff options
context:
space:
mode:
authorAlexey Botchkov <holyfoot@askmonty.org>2021-09-06 22:34:35 +0400
committerOleksandr Byelkin <sanja@mariadb.com>2021-10-19 17:35:06 +0200
commit0a0dfd63d9a0ae5adb139159e0aeca93c5c2d5c9 (patch)
tree66209a932d5575a6c3021ab1d215c16e9a3bead7 /sql/sql_prepare.cc
parent401ff6994d842a4072b7b155e5a958e178e6497a (diff)
downloadmariadb-git-0a0dfd63d9a0ae5adb139159e0aeca93c5c2d5c9.tar.gz
MDEV-19275 Provide SQL service to plugins.
SQL service added. It provides the limited set of client library functions to be used by plugin.
Diffstat (limited to 'sql/sql_prepare.cc')
-rw-r--r--sql/sql_prepare.cc411
1 files changed, 252 insertions, 159 deletions
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index f9f008602ed..19ae176ddcb 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -133,6 +133,7 @@ static const uint PARAMETER_FLAG_UNSIGNED= 128U << 8;
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */
#include "xa.h" // xa_recover_get_fields
+#include "sql_audit.h" // mysql_audit_release
/**
A result class used to send cursor rows using the binary protocol.
@@ -4049,19 +4050,22 @@ Execute_sql_statement(LEX_STRING sql_text)
executions without having to cleanup/reset THD in between.
*/
-bool
-Execute_sql_statement::execute_server_code(THD *thd)
+static bool execute_server_code(THD *thd,
+ const char *sql_text, size_t sql_len)
{
PSI_statement_locker *parent_locker;
bool error;
+ query_id_t save_query_id= thd->query_id;
+ query_id_t next_id= next_query_id();
- if (alloc_query(thd, m_sql_text.str, m_sql_text.length))
+ if (alloc_query(thd, sql_text, sql_len))
return TRUE;
Parser_state parser_state;
if (parser_state.init(thd, thd->query(), thd->query_length()))
return TRUE;
+ thd->query_id= next_id;
parser_state.m_lip.multi_statements= FALSE;
lex_start(thd);
@@ -4079,17 +4083,23 @@ Execute_sql_statement::execute_server_code(THD *thd)
/* report error issued during command execution */
if (likely(error == 0) && thd->spcont == NULL)
- general_log_write(thd, COM_STMT_EXECUTE,
+ general_log_write(thd, COM_QUERY,
thd->query(), thd->query_length());
end:
thd->lex->restore_set_statement_var();
+ thd->query_id= save_query_id;
delete_explain_query(thd->lex);
lex_end(thd->lex);
return error;
}
+bool Execute_sql_statement::execute_server_code(THD *thd)
+{
+ return ::execute_server_code(thd, m_sql_text.str, m_sql_text.length);
+}
+
/***************************************************************************
Prepared_statement
****************************************************************************/
@@ -4850,7 +4860,10 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
Statement stmt_backup;
bool error;
Query_arena *save_stmt_arena= thd->stmt_arena;
+ my_time_t save_query_start= thd->query_start();
+ ulong save_query_sec= thd->start_time_sec_part;
Item_change_list save_change_list;
+
thd->Item_change_list::move_elements_to(&save_change_list);
state= STMT_CONVENTIONAL_EXECUTION;
@@ -4858,6 +4871,7 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
if (!(lex= new (mem_root) st_lex_local))
return TRUE;
+ thd->set_time();
thd->set_n_backup_statement(this, &stmt_backup);
thd->set_n_backup_active_arena(this, &stmt_backup);
thd->stmt_arena= this;
@@ -4871,6 +4885,7 @@ Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
thd->stmt_arena= save_stmt_arena;
save_change_list.move_elements_to(thd);
+ thd->force_set_time(save_query_start, save_query_sec);
/* Items and memory will freed in destructor */
@@ -5582,14 +5597,6 @@ Ed_connection::store_result_set()
return ed_result_set;
}
-/*
- MENT-56
- Protocol_local and service_sql for plugins to enable 'local' SQL query execution.
-*/
-
-#ifndef EMBEDDED_LIBRARY
-// This part is mostly copied from libmysqld/lib_sql.cc
-// TODO: get rid of code duplications
#include <mysql.h>
#include "../libmysqld/embedded_priv.h"
@@ -5605,11 +5612,13 @@ public:
char **next_field;
MYSQL_FIELD *next_mysql_field;
MEM_ROOT *alloc;
+ THD *new_thd;
- Protocol_local(THD *thd_arg, ulong prealloc= 0) :
+ Protocol_local(THD *thd_arg, THD *new_thd_arg, ulong prealloc) :
Protocol_text(thd_arg, prealloc),
- cur_data(0), first_data(0), data_tail(&first_data), alloc(0)
- {}
+ cur_data(0), first_data(0), data_tail(&first_data), alloc(0),
+ new_thd(new_thd_arg)
+ {}
protected:
bool net_store_data(const uchar *from, size_t length);
@@ -5681,6 +5690,20 @@ MYSQL_DATA *Protocol_local::alloc_new_dataset()
}
+void Protocol_local::clear_data_list()
+{
+ while (first_data)
+ {
+ MYSQL_DATA *data= first_data;
+ first_data= data->embedded_info->next;
+ free_rows(data);
+ }
+ data_tail= &first_data;
+ free_rows(cur_data);
+ cur_data= 0;
+}
+
+
static char *dup_str_aux(MEM_ROOT *root, const char *from, uint length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs)
{
@@ -5974,7 +5997,6 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags)
{
List_iterator_fast<Item> it(*list);
Item *item;
-// Protocol_local prot(thd);
DBUG_ENTER("send_result_set_metadata");
// if (!thd->mysql) // bootstrap file handling
@@ -5985,7 +6007,7 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags)
for (uint pos= 0 ; (item= it++); pos++)
{
- if (/*prot.*/store_item_metadata(thd, item, pos))
+ if (store_item_metadata(thd, item, pos))
goto err;
}
@@ -5999,6 +6021,7 @@ bool Protocol_local::send_result_set_metadata(List<Item> *list, uint flags)
DBUG_RETURN(1); /* purecov: inspected */
}
+
static void
list_fields_send_default(THD *thd, Protocol_local *p, Field *fld, uint pos)
{
@@ -6086,19 +6109,6 @@ bool Protocol_local::store_null()
#include <sql_common.h>
#include <errmsg.h>
-struct local_results
-{
- struct st_mysql_data *cur_data;
- struct st_mysql_data *first_data;
- struct st_mysql_data **data_tail;
- void clear_data_list();
- struct st_mysql_data *alloc_new_dataset();
- char **next_field;
- MYSQL_FIELD *next_mysql_field;
- MEM_ROOT *alloc;
-};
-
-
static void embedded_get_error(MYSQL *mysql, MYSQL_DATA *data)
{
NET *net= &mysql->net;
@@ -6113,11 +6123,11 @@ static void embedded_get_error(MYSQL *mysql, MYSQL_DATA *data)
static my_bool loc_read_query_result(MYSQL *mysql)
{
- local_results *thd= (local_results *) mysql->thd;
+ Protocol_local *p= (Protocol_local *) mysql->thd;
- MYSQL_DATA *res= thd->first_data;
- DBUG_ASSERT(!thd->cur_data);
- thd->first_data= res->embedded_info->next;
+ MYSQL_DATA *res= p->first_data;
+ DBUG_ASSERT(!p->cur_data);
+ p->first_data= res->embedded_info->next;
if (res->embedded_info->last_errno &&
!res->embedded_info->fields_list)
{
@@ -6145,7 +6155,7 @@ static my_bool loc_read_query_result(MYSQL *mysql)
if (res->embedded_info->fields_list)
{
mysql->status=MYSQL_STATUS_GET_RESULT;
- thd->cur_data= res;
+ p->cur_data= res;
}
else
my_free(res);
@@ -6154,23 +6164,193 @@ static my_bool loc_read_query_result(MYSQL *mysql)
}
+static my_bool
+loc_advanced_command(MYSQL *mysql, enum enum_server_command command,
+ const uchar *header, ulong header_length,
+ const uchar *arg, ulong arg_length, my_bool skip_check,
+ MYSQL_STMT *stmt)
+{
+ my_bool result= 1;
+ Protocol_local *p= (Protocol_local *) mysql->thd;
+ NET *net= &mysql->net;
+
+ if (p->thd && p->thd->killed != NOT_KILLED)
+ {
+ if (p->thd->killed < KILL_CONNECTION)
+ p->thd->killed= NOT_KILLED;
+ else
+ return 1;
+ }
+
+ p->clear_data_list();
+ /* Check that we are calling the client functions in right order */
+ if (mysql->status != MYSQL_STATUS_READY)
+ {
+ set_mysql_error(mysql, CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate);
+ goto end;
+ }
+
+ /* Clear result variables */
+ p->thd->clear_error(1);
+ mysql->affected_rows= ~(my_ulonglong) 0;
+ mysql->field_count= 0;
+ net_clear_error(net);
+
+ /*
+ We have to call free_old_query before we start to fill mysql->fields
+ for new query. In the case of embedded server we collect field data
+ during query execution (not during data retrieval as it is in remote
+ client). So we have to call free_old_query here
+ */
+ free_old_query(mysql);
+
+ if (header)
+ {
+ arg= header;
+ arg_length= header_length;
+ }
+
+ if (p->new_thd)
+ {
+ THD *thd_orig= current_thd;
+ set_current_thd(p->thd);
+ p->thd->thread_stack= (char*) &result;
+ p->thd->set_time();
+ result= execute_server_code(p->thd, (const char *)arg, arg_length);
+ p->thd->cleanup_after_query();
+ mysql_audit_release(p->thd);
+ p->end_statement();
+ set_current_thd(thd_orig);
+ }
+ else
+ {
+ Ed_connection con(p->thd);
+ MYSQL_LEX_STRING sql_text;
+ DBUG_ASSERT(current_thd == p->thd);
+ sql_text.str= (char *) arg;
+ sql_text.length= arg_length;
+ result= con.execute_direct(p, sql_text);
+ }
+ if (skip_check)
+ result= 0;
+ p->cur_data= 0;
+
+end:
+ return result;
+}
+
+
+/*
+ reads dataset from the next query result
+
+ SYNOPSIS
+ loc_read_rows()
+ mysql connection handle
+ other parameters are not used
+
+ NOTES
+ It just gets next MYSQL_DATA from the result's queue
+
+ RETURN
+ pointer to MYSQL_DATA with the coming recordset
+*/
+
+static MYSQL_DATA *
+loc_read_rows(MYSQL *mysql, MYSQL_FIELD *mysql_fields __attribute__((unused)),
+ unsigned int fields __attribute__((unused)))
+{
+ MYSQL_DATA *result= ((Protocol_local *)mysql->thd)->cur_data;
+ ((Protocol_local *)mysql->thd)->cur_data= 0;
+ if (result->embedded_info->last_errno)
+ {
+ embedded_get_error(mysql, result);
+ return NULL;
+ }
+ *result->embedded_info->prev_ptr= NULL;
+ return result;
+}
+
+
+/**************************************************************************
+ Get column lengths of the current row
+ If one uses mysql_use_result, res->lengths contains the length information,
+ else the lengths are calculated from the offset between pointers.
+**************************************************************************/
+
+static void loc_fetch_lengths(ulong *to, MYSQL_ROW column,
+ unsigned int field_count)
+{
+ MYSQL_ROW end;
+
+ for (end=column + field_count; column != end ; column++,to++)
+ *to= *column ? *(uint *)((*column) - sizeof(uint)) : 0;
+}
+
+
+static void loc_flush_use_result(MYSQL *mysql, my_bool)
+{
+ Protocol_local *p= (Protocol_local *) mysql->thd;
+ if (p->cur_data)
+ {
+ free_rows(p->cur_data);
+ p->cur_data= 0;
+ }
+ else if (p->first_data)
+ {
+ MYSQL_DATA *data= p->first_data;
+ p->first_data= data->embedded_info->next;
+ free_rows(data);
+ }
+}
+
+
+static void loc_on_close_free(MYSQL *mysql)
+{
+ Protocol_local *p= (Protocol_local *) mysql->thd;
+ THD *thd= p->new_thd;
+ delete p;
+ if (thd)
+ {
+ delete thd;
+ local_connection_thread_count--;
+ }
+ my_free(mysql->info_buffer);
+ mysql->info_buffer= 0;
+}
+
+
static MYSQL_METHODS local_methods=
{
loc_read_query_result, /* read_query_result */
- NULL/*loc_advanced_command*/, /* advanced_command */
- NULL/*loc_read_rows*/, /* read_rows */
- NULL/*loc_use_result*/, /* use_result */
- NULL/*loc_fetch_lengths*/, /* fetch_lengths */
- NULL/*loc_flush_use_result*/, /* flush_use_result */
- NULL/*loc_read_change_user_result*/ /* read_change_user_result */
+ loc_advanced_command, /* advanced_command */
+ loc_read_rows, /* read_rows */
+ mysql_store_result, /* use_result */
+ loc_fetch_lengths, /* fetch_lengths */
+ loc_flush_use_result, /* flush_use_result */
+ NULL, /* read_change_user_result */
+ loc_on_close_free /* on_close_free */
+#ifdef EMBEDDED_LIBRARY
+ ,NULL, /* list_fields */
+ NULL, /* read_prepare_result */
+ NULL, /* stmt_execute */
+ NULL, /* read_binary_rows */
+ NULL, /* unbuffered_fetch */
+ NULL, /* read_statistics */
+ NULL, /* next_result */
+ NULL /* read_rows_from_cursor */
+#endif
};
+Atomic_counter<uint32_t> local_connection_thread_count;
+
extern "C" MYSQL *mysql_real_connect_local(MYSQL *mysql,
- const char *host, const char *user, const char *passwd, const char *db)
+ const char *host, const char *user, const char *db,
+ unsigned long clientflag)
{
- //char name_buff[USERNAME_LENGTH];
-
+ THD *thd_orig= current_thd;
+ THD *new_thd;
+ Protocol_local *p;
DBUG_ENTER("mysql_real_connect_local");
/* Test whether we're already connected */
@@ -6191,137 +6371,50 @@ extern "C" MYSQL *mysql_real_connect_local(MYSQL *mysql,
if (!user || !user[0])
user=mysql->options.user;
- mysql->user= my_strdup(PSI_INSTRUMENT_ME, user, MYF(0));
-
+ mysql->user= NULL;
mysql->info_buffer= (char *) my_malloc(PSI_INSTRUMENT_ME,
MYSQL_ERRMSG_SIZE, MYF(0));
- //mysql->thd= create_embedded_thd(client_flag);
-
- //init_embedded_mysql(mysql, client_flag);
-
- //if (mysql_init_character_set(mysql))
- // goto error;
-
- //if (check_embedded_connection(mysql, db))
- // goto error;
-
- mysql->server_status= SERVER_STATUS_AUTOCOMMIT;
-
- //if (mysql->options.init_commands)
- //{
- // DYNAMIC_ARRAY *init_commands= mysql->options.init_commands;
- // char **ptr= (char**)init_commands->buffer;
- // char **end= ptr + init_commands->elements;
-//
- // for (; ptr<end; ptr++)
- // {
- // MYSQL_RES *res;
- // if (mysql_query(mysql,*ptr))
- // goto error;
- // if (mysql->fields)
- // {
- // if (!(res= (*mysql->methods->use_result)(mysql)))
- // goto error;
- // mysql_free_result(res);
- // }
- // }
- //}
-
- DBUG_PRINT("exit",("Mysql handler: %p", mysql));
- DBUG_RETURN(mysql);
-
-//error:
- DBUG_PRINT("error",("message: %u (%s)",
- mysql->net.last_errno,
- mysql->net.last_error));
+ if (!thd_orig || thd_orig->lock)
{
- /* Free alloced memory */
- my_bool free_me=mysql->free_me;
- free_old_query(mysql);
- mysql->free_me=0;
- mysql_close(mysql);
- mysql->free_me=free_me;
- }
- DBUG_RETURN(0);
-}
-
-
-extern "C" int execute_sql_command(const char *command,
- char *hosts, char *names, char *filters)
-{
- MYSQL_LEX_STRING sql_text;
- THD *thd= current_thd;
- THD *new_thd= 0;
- int result;
- my_bool qc_save= 0;
- Reprepare_observer *save_reprepare_observer= nullptr;
+ /*
+ When we start with the empty current_thd (that happens when plugins
+ are loaded during the server start) or when some tables are locked
+ with the current_thd already (that happens when INSTALL PLUGIN
+ calls the plugin_init or with queries), we create the new THD for
+ the local connection. So queries with this MYSQL will be run with
+ it rather than the current THD.
+ */
- if (!thd)
- {
new_thd= new THD(0);
- new_thd->thread_stack= (char*) &sql_text;
+ local_connection_thread_count++;
+ new_thd->thread_stack= (char*) &thd_orig;
new_thd->store_globals();
new_thd->security_ctx->skip_grants();
new_thd->query_cache_is_applicable= 0;
new_thd->variables.wsrep_on= 0;
+ /*
+ TOSO: decide if we should turn the auditing off
+ for such threads.
+ We can do it like this:
+ new_thd->audit_class_mask[0]= ~0;
+ */
bzero((char*) &new_thd->net, sizeof(new_thd->net));
- thd= new_thd;
+ set_current_thd(thd_orig);
+ thd_orig= new_thd;
}
else
- {
- if (thd->lock)
- /* Doesn't work if the thread opened/locked tables already. */
- return 2;
+ new_thd= NULL;
- qc_save= thd->query_cache_is_applicable;
- thd->query_cache_is_applicable= 0;
- save_reprepare_observer= thd->m_reprepare_observer;
- thd->m_reprepare_observer= nullptr;
- }
- sql_text.str= (char *) command;
- sql_text.length= strlen(command);
- {
- Protocol_local p(thd);
- Ed_connection con(thd);
- result= con.execute_direct(&p, sql_text);
- if (!result && p.first_data)
- {
- int nr= (int) p.first_data->rows;
- MYSQL_ROWS *rows= p.first_data->data;
+ p= new Protocol_local(thd_orig, new_thd, 0);
+ if (new_thd)
+ new_thd->protocol= p;
- while (nr--)
- {
- strcpy(hosts, rows->data[0]);
- hosts+= strlen(hosts) + 1;
- strcpy(names, rows->data[1]);
- names+= strlen(names) + 1;
- if (filters)
- {
- strcpy(filters, rows->data[2]);
- filters+= strlen(filters) + 1;
- }
- rows= rows->next;
- }
- }
- if (p.first_data)
- {
- if (p.alloc)
- free_root(p.alloc, MYF(0));
- my_free(p.first_data);
- }
- }
+ mysql->thd= p;
+ mysql->server_status= SERVER_STATUS_AUTOCOMMIT;
- if (new_thd)
- delete new_thd;
- else
- {
- thd->query_cache_is_applicable= qc_save;
- thd->m_reprepare_observer= save_reprepare_observer;
- }
- *hosts= 0;
- return result;
+ DBUG_PRINT("exit",("Mysql handler: %p", mysql));
+ DBUG_RETURN(mysql);
}
-#endif /*!EMBEDDED_LIBRARY*/