diff options
Diffstat (limited to 'innobase/usr/usr0sess.c')
-rw-r--r-- | innobase/usr/usr0sess.c | 1116 |
1 files changed, 14 insertions, 1102 deletions
diff --git a/innobase/usr/usr0sess.c b/innobase/usr/usr0sess.c index d74153e323f..359c1552421 100644 --- a/innobase/usr/usr0sess.c +++ b/innobase/usr/usr0sess.c @@ -12,327 +12,54 @@ Created 6/25/1996 Heikki Tuuri #include "usr0sess.ic" #endif -#include "ut0rnd.h" -#include "mach0data.h" -#include "ha0ha.h" #include "trx0trx.h" -#include "que0que.h" -#include "pars0pars.h" -#include "pars0sym.h" -#include "dict0dict.h" -#include "dict0mem.h" -#include "odbc0odbc.h" - -#define SESS_ERR_BUF_SIZE 8192 - -/* The session system global data structure */ -sess_sys_t* sess_sys = NULL; - -/************************************************************************* -Communicates an error message to the client. If sess->client_waits is not -TRUE, puts the session to error state and does not try to send the error -message. */ -static -void -sess_srv_msg_send_error( -/*====================*/ - sess_t* sess); /* in: session object */ -/************************************************************************* -Copies error info to a session. Sends to the transaction a signal which will -rollback the latest incomplete SQL statement and then send the error message -to the client. NOTE: This function will take care of the freeing of the error -string, thus the caller must supply a copy of the error string. */ -static -void -sess_error_low( -/*===========*/ - sess_t* sess, /* in: session object */ - ulint err_no, /* in: error number */ - char* err_str);/* in, own: error string or NULL; - NOTE: the function will take care of freeing of the - string! */ - -/************************************************************************* -Folds a session id to a ulint. Because this function is used also in -calculating a checksum for the id to write in the message, it is performs -also a XOR operation to mix the values more thoroughly. */ -UNIV_INLINE -ulint -sess_id_fold( -/*=========*/ - /* out: folded value; can be used also as the checksum - for id */ - dulint id) /* in: session id */ -{ - return(ut_fold_dulint(id) ^ 2945794411U); -} /************************************************************************* -Sets the session id in a client message. */ - -void -sess_cli_msg_set_sess( -/*==================*/ - byte* str, /* in/out: message string */ - dulint sess_id)/* in: session id */ -{ - ulint fold; - - mach_write_to_8(str + SESS_CLI_MSG_SESS_ID, sess_id); - - fold = sess_id_fold(sess_id); - - mach_write_to_4(str + SESS_CLI_MSG_SESS_ID_CHECK, fold); -} - -/************************************************************************* -Returns the session to which a message from a client is addressed. -NOTE: this function does not assume that the message is uncorrupted. */ +Closes a session, freeing the memory occupied by it. */ static -sess_t* -sess_cli_msg_get_sess( -/*==================*/ - /* out: session, NULL if not found */ - byte* str, /* in: message string */ - ulint len) /* in: message string length */ -{ - sess_t* sess; - ulint fold; - dulint id; - - ut_ad(mutex_own(&kernel_mutex)); - - if (len < SESS_CLI_MSG_SESS_ID_CHECK + 4) { - - return(NULL); - } - - id = mach_read_from_8(str + SESS_CLI_MSG_SESS_ID); - - fold = sess_id_fold(id); - - if (fold != mach_read_from_4(str + SESS_CLI_MSG_SESS_ID_CHECK)) { - - return(NULL); - } - - HASH_SEARCH(hash, sess_sys->hash, fold, sess, - UT_DULINT_EQ(id, sess->id)); - return(sess); -} - -/*************************************************************************** -Decrements the reference count of a session and closes it, if desired. */ -UNIV_INLINE -void -sess_refer_count_dec( -/*=================*/ - sess_t* sess) /* in: session */ -{ - ut_ad(mutex_own(&kernel_mutex)); - ut_ad(sess->refer_count > 0); - - sess->refer_count--; - - if (sess->disconnecting && (sess->refer_count == 0)) { - - sess_close(sess); - } -} - -/*************************************************************************** -Increments the reference count of a session. */ -UNIV_INLINE void -sess_refer_count_inc( -/*=================*/ - sess_t* sess) /* in: session */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - sess->refer_count++; -} - -/*************************************************************************** -Creates a session system at a database start. */ - -void -sess_sys_init_at_db_start(void) -/*===========================*/ -{ - sess_sys = mem_alloc(sizeof(sess_sys_t)); - - sess_sys->state = SESS_SYS_RUNNING; - sess_sys->free_sess_id = ut_dulint_create(0, 1); - sess_sys->hash = hash_create(SESS_HASH_SIZE); -} - -/*************************************************************************** -Gets the message type of a message from client. */ -UNIV_INLINE -ulint -sess_cli_msg_get_type( -/*==================*/ - /* out: message type */ - byte* str) /* in: message string */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - return(mach_read_from_4(str + SESS_CLI_MSG_TYPE)); -} - -/*************************************************************************** -Gets the message number of a message from client. */ -UNIV_INLINE -dulint -sess_cli_msg_get_msg_no( -/*====================*/ - /* out: message number */ - byte* str) /* in: message string */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - return(mach_read_from_8(str + SESS_CLI_MSG_NO)); -} - -/*************************************************************************** -Gets the continue field of a message from client. */ -UNIV_INLINE -ulint -sess_cli_msg_get_continue( -/*======================*/ - /* out: SESS_MSG_SINGLE_PART, ... */ - byte* str) /* in: message string */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - return(mach_read_from_4(str + SESS_CLI_MSG_CONTINUE)); -} - -/*************************************************************************** -Gets the size of a big message in kilobytes. */ -UNIV_INLINE -ulint -sess_cli_msg_get_cont_size( -/*=======================*/ - /* out: size in kilobytes */ - byte* str) /* in: message string */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - return(mach_read_from_4(str + SESS_CLI_MSG_CONT_SIZE)); -} - -/************************************************************************* -Checks the consistency of a message from a client. */ -UNIV_INLINE -ibool -sess_cli_msg_check_consistency( -/*===========================*/ - /* out: TRUE if ok */ - byte* str, /* in: message string */ - ulint len) /* in: message string length */ -{ - ulint fold; - - ut_ad(mutex_own(&kernel_mutex)); - - if (len < SESS_CLI_MSG_DATA) { - - return(FALSE); - } - - ut_ad(SESS_CLI_MSG_CHECKSUM == 0); - - fold = ut_fold_binary(str + 4, len - 4); - - if (mach_read_from_4(str + SESS_CLI_MSG_CHECKSUM) != fold) { - - return(FALSE); - } - - return(TRUE); -} +sess_close( +/*=======*/ + sess_t* sess); /* in, own: session object */ /************************************************************************* Opens a session. */ sess_t* -sess_open( -/*======*/ +sess_open(void) +/*===========*/ /* out, own: session object */ - com_endpoint_t* endpoint, /* in: communication endpoint used - for receiving messages from the client, - or NULL if no client */ - byte* addr_buf, /* in: client address (= user name) */ - ulint addr_len) /* in: client address length */ { sess_t* sess; - ulint fold; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); - +#endif /* UNIV_SYNC_DEBUG */ sess = mem_alloc(sizeof(sess_t)); - sess->id = sess_sys->free_sess_id; - UT_DULINT_INC(sess_sys->free_sess_id); - sess->state = SESS_ACTIVE; - sess->disconnecting = FALSE; - sess->msgs_sent = ut_dulint_zero; - sess->msgs_recv = ut_dulint_zero; - sess->client_waits = TRUE; - sess->err_no = 0; - sess->err_str = NULL; - sess->error_count = ut_dulint_zero; - - sess->big_msg = NULL; sess->trx = trx_create(sess); - sess->next_graph_id = 0; - UT_LIST_INIT(sess->graphs); - fold = sess_id_fold(sess->id); - - HASH_INSERT(sess_t, hash, sess_sys->hash, fold, sess); - - sess->endpoint = endpoint; - sess->addr_buf = mem_alloc(addr_len); - - ut_memcpy(sess->addr_buf, addr_buf, addr_len); - - sess->addr_len = addr_len; - return(sess); } /************************************************************************* Closes a session, freeing the memory occupied by it. */ +static void sess_close( /*=======*/ sess_t* sess) /* in, own: session object */ { - ulint fold; - +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); - ut_ad(sess->disconnecting); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(sess->trx == NULL); - ut_ad(sess->refer_count == 0); - - fold = ut_fold_dulint(sess->id); - HASH_DELETE(sess_t, hash, sess_sys->hash, fold, sess); - -/* sess_reply_to_client_rel_kernel(sess); */ - - if (sess->err_str != NULL) { - mem_free(sess->err_str); - } - mem_free(sess->addr_buf); mem_free(sess); } @@ -346,10 +73,10 @@ sess_try_close( /* out: TRUE if closed */ sess_t* sess) /* in, own: session object */ { +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); - - if (sess->disconnecting && (UT_LIST_GET_LEN(sess->graphs) == 0) - && (sess->refer_count == 0)) { +#endif /* UNIV_SYNC_DEBUG */ + if (UT_LIST_GET_LEN(sess->graphs) == 0) { sess_close(sess); return(TRUE); @@ -357,818 +84,3 @@ sess_try_close( return(FALSE); } - -/************************************************************************* -Initializes the first fields of a message to client. */ - -void -sess_srv_msg_init( -/*==============*/ - sess_t* sess, /* in: session object */ - byte* buf, /* in: message buffer, must be at least of size - SESS_SRV_MSG_DATA */ - ulint type) /* in: message type */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - sess->msgs_sent = ut_dulint_add(sess->msgs_sent, 1); - - mach_write_to_8(buf + SESS_SRV_MSG_SESS_ID, sess->id); - mach_write_to_4(buf + SESS_SRV_MSG_TYPE, type); - mach_write_to_8(buf + SESS_SRV_MSG_NO, sess->msgs_sent); - - ut_ad(com_endpoint_get_max_size(sess->endpoint) >= SESS_SRV_MSG_DATA); -} - -/************************************************************************* -Sends a message to the client. */ -static -ulint -sess_srv_msg_send_low( -/*==================*/ - /* out: 0 if success, else error number */ - sess_t* sess, /* in: session object */ - byte* buf, /* in: message buffer */ - ulint len, /* in: message length */ - ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should - be temporarily released in the call; otherwise - SESS_NOT_RELEASE_KERNEL */ -{ - ulint ret; - - ut_ad((rel_ker == SESS_NOT_RELEASE_KERNEL) - || (rel_ker == SESS_RELEASE_KERNEL)); - ut_ad(mutex_own(&kernel_mutex)); - ut_ad(len <= com_endpoint_get_max_size(sess->endpoint)); - ut_ad(len >= SESS_SRV_MSG_DATA); - - if (sess->client_waits == FALSE) { - sess_error_low(sess, SESS_ERR_EXTRANEOUS_SRV_MSG, NULL); - - return(1); - } - - /* The client will now receive an error message: if the session is - in the error state, we can reset it to the normal state */ - - if (sess->state == SESS_ERROR) { - sess->state = SESS_ACTIVE; - } - - /* We reset the client_waits flag to FALSE, regardless of whether the - message gets delivered to the client or not. This convention makes - things simpler. */ - - sess->client_waits = FALSE; - - if (rel_ker == SESS_RELEASE_KERNEL) { - - mutex_exit(&kernel_mutex); - } - - ret = com_sendto(sess->endpoint, buf, len, sess->addr_buf, - sess->addr_len); - if (rel_ker == SESS_RELEASE_KERNEL) { - - mutex_enter(&kernel_mutex); - } - - if (ret != 0) { - sess_error_low(sess, SESS_ERR_REPLY_FAILED, NULL); - } - - return(ret); -} - -/************************************************************************* -Sends a message to the client. If the session is in the error state, sends -the error message instead of buf. */ -static -ulint -sess_srv_msg_send( -/*==============*/ - /* out: 0 if success, else error number */ - sess_t* sess, /* in: session object */ - byte* buf, /* in: message buffer */ - ulint len, /* in: message length */ - ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should - be temporarily released in the call; otherwise - SESS_NOT_RELEASE_KERNEL */ -{ - ulint ret; - - ut_ad(mutex_own(&kernel_mutex)); - - if (sess->state == SESS_ERROR) { - - sess_srv_msg_send_error(sess); - - return(2); - } - - ret = sess_srv_msg_send_low(sess, buf, len, rel_ker); - - return(ret); -} - -/************************************************************************* -Sends a simple message to client. */ - -void -sess_srv_msg_send_simple( -/*=====================*/ - sess_t* sess, /* in: session object */ - ulint type, /* in: message type */ - ulint rel_kernel) /* in: SESS_RELEASE_KERNEL or - SESS_NOT_RELEASE_KERNEL */ -{ - byte buf[SESS_SRV_MSG_DATA]; - - ut_ad(mutex_own(&kernel_mutex)); - - sess_srv_msg_init(sess, buf, type); - - sess_srv_msg_send(sess, buf, SESS_SRV_MSG_DATA, rel_kernel); -} - -/************************************************************************* -Communicates an error message to the client. If sess->client_waits is not -TRUE, puts the session to error state and does not try to send the error -message. */ -static -void -sess_srv_msg_send_error( -/*====================*/ - sess_t* sess) /* in: session object */ -{ - ulint err_no; - byte* err_str; - ulint err_len; - ulint max_len; - byte buf[SESS_ERR_BUF_SIZE]; - ulint ret; - - ut_ad(sess->client_waits); - ut_ad(mutex_own(&kernel_mutex)); - ut_ad(sess->state == SESS_ERROR); - ut_ad(!UT_LIST_GET_FIRST((sess->trx)->signals)); - - if (!sess->client_waits) { - /* Cannot send the error message now: leave the session to - the error state and send it later */ - - return; - } - - err_no = sess->err_no; - err_str = (byte*)sess->err_str; - err_len = sess->err_len; - - max_len = ut_min(SESS_ERR_BUF_SIZE, - com_endpoint_get_max_size(sess->endpoint)); - - sess_srv_msg_init(sess, buf, SESS_SRV_ERROR); - - if (err_len + SESS_SRV_MSG_DATA > max_len) { - - err_len = max_len - SESS_SRV_MSG_DATA; - } - - ut_memcpy(buf + SESS_SRV_MSG_DATA, err_str, err_len); - - ret = sess_srv_msg_send_low(sess, buf, SESS_SRV_MSG_DATA + err_len, - SESS_NOT_RELEASE_KERNEL); -} - -/************************************************************************* -Copies error info to a session. Sends to the transaction a signal which will -rollback the latest incomplete SQL statement and then send the error message -to the client. NOTE: This function will take care of the freeing of the error -string, thus the caller must supply a copy of the error string. */ -static -void -sess_error_low( -/*===========*/ - sess_t* sess, /* in: session object */ - ulint err_no, /* in: error number */ - char* err_str)/* in, own: error string or NULL; - NOTE: the function will take care of freeing of the - string! */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - UT_DULINT_INC(sess->error_count); - - printf("Error string::: %s\n", err_str); - - if (sess->state == SESS_ERROR) { - /* Ignore the error because the session is already in the - error state */ - - if (err_str) { - mem_free(err_str); - } - - return; - } - - sess->err_no = err_no; - - if (sess->err_str) { - mem_free(sess->err_str); - } - - sess->err_str = err_str; - sess->err_len = ut_strlen(err_str); - sess->state = SESS_ERROR; - - if (sess->big_msg) { - - mem_free(sess->big_msg); - } - - /* Send a signal which will roll back the latest incomplete SQL - statement: the error message will be sent to the client by the error - handling mechanism after the rollback is completed. */ - - trx_sig_send(sess->trx, TRX_SIG_ERROR_OCCURRED, TRX_SIG_SELF, FALSE, - NULL, NULL, NULL); -} - -/************************************************************************* -Raises an SQL error. */ - -void -sess_raise_error_low( -/*=================*/ - trx_t* trx, /* in: transaction */ - ulint err_no, /* in: error number */ - ulint type, /* in: more info of the error, or 0 */ - dict_table_t* table, /* in: dictionary table or NULL */ - dict_index_t* index, /* in: table index or NULL */ - dtuple_t* tuple, /* in: tuple to insert or NULL */ - rec_t* rec, /* in: record or NULL */ - char* err_str)/* in: arbitrary null-terminated error string, - or NULL */ -{ - char* str; - ulint len; - - ut_ad(mutex_own(&kernel_mutex)); - - str = mem_alloc(64000); - - len = 0; - - len += sprintf(str + len, "Error number: %lu", (ulong) err_no); - - if (type) { - len += sprintf(str + len, ", type: %lu", (ulong) type); - } - - if (table) { - len += sprintf(str + len, ", table: %s", table->name); - } - - if (index) { - len += sprintf(str + len, ", index: %s", index->name); - } - - if (tuple) { - len += sprintf(str + len, ", tuple:"); - len += dtuple_sprintf(str + len, 8192, tuple); - } - - if (rec) { - len += sprintf(str + len, ", record:"); - len += rec_sprintf(str + len, 8192, rec); - } - - if (err_str) { - len += sprintf(str + len, ", %s", err_str); - } - - str[len] = '\0'; - - ut_a(len < 64000); - - if (trx->sess) { - sess_error_low(trx->sess, err_no, str); - } else { - mem_free(str); - } -} - -/*************************************************************************** -Processes a client message which is part of a bigger message. */ -static -ibool -sess_receive_msg_part( -/*==================*/ - /* TRUE if message completed */ - sess_t* sess, /* in: session */ - byte* str, /* in: message string */ - ulint len) /* in: message length */ -{ - ulint cont; - - cont = sess_cli_msg_get_continue(str); - - ut_ad(cont != SESS_MSG_SINGLE_PART); - - if (cont == SESS_MSG_FIRST_PART) { - if (sess->big_msg) { - sess_error_low(sess, SESS_ERR_MSG_LOST, NULL); - - return(FALSE); - } - - sess->big_msg_size = 1024 * sess_cli_msg_get_cont_size(str); - sess->big_msg = mem_alloc(sess->big_msg_size); - - if (sess->big_msg == NULL) { - sess_error_low(sess, SESS_ERR_OUT_OF_MEMORY, NULL); - - return(FALSE); - } - - ut_memcpy(sess->big_msg, str, len); - sess->big_msg_len = len; - - return(FALSE); - } else { - if (sess->big_msg == NULL) { - sess_error_low(sess, SESS_ERR_MSG_LOST, NULL); - - return(FALSE); - } - - ut_memcpy(sess->big_msg + sess->big_msg_len, - str + SESS_CLI_MSG_DATA, len - SESS_CLI_MSG_DATA); - - sess->big_msg_len += len - SESS_CLI_MSG_DATA; - - if (cont == SESS_MSG_MIDDLE_PART) { - - return(FALSE); - } - - return(TRUE); - } -} - -/*************************************************************************** -Processes a client message which requires SQL parsing. This function decodes -the client message built in SQLPrepare. NOTE: The kernel mutex is temporarily -released within this function. */ -static -void -sess_receive_prepare( -/*=================*/ - sess_t* sess, /* in: session */ - byte* cli_msg,/* in: client message */ - ulint len) /* in: message length */ -{ - dulint error_count; - que_t* graph; - byte msg[ODBC_DATAGRAM_SIZE]; - - UT_NOT_USED(len); - - ut_ad(mutex_own(&kernel_mutex)); - - error_count = sess->error_count; - - /* Make sure the session object is not freed during the parsing */ - - sess_refer_count_inc(sess); - - /* We release the kernel mutex before parsing the command: this is - to reduce contention on the kernel mutex */ - - mutex_exit(&kernel_mutex); - -/* printf("To parse query %s\n", (char*)(cli_msg + SESS_CLI_MSG_DATA)); */ - - graph = pars_sql((char*)(cli_msg + SESS_CLI_MSG_DATA)); - - mutex_enter(&kernel_mutex); - - if (graph == NULL) { - /* Error in parsing */ - sess_error_low(sess, SESS_ERR_SQL_ERROR, NULL); - - sess_refer_count_dec(sess); - - ut_error; - - return; - } - - if (!UT_DULINT_EQ(error_count, sess->error_count)) { - - /* An error, or an asyncronous signal on the session happened - when the kernel mutex was not reserved: discard graph */ - - graph->state = QUE_FORK_INVALID; - - que_graph_try_free(graph); - - sess_refer_count_dec(sess); - - ut_error; - - return; - } - - UT_LIST_ADD_LAST(graphs, sess->graphs, graph); - - graph->id = sess->next_graph_id; - sess->next_graph_id++; - - /* Tell the client that the preparation succeeded and communicate info - about the possible query parameters: the message will be decoded in - SQLPrepare */ - - ut_ad(sess->client_waits); - - sess_srv_msg_init(sess, msg, SESS_SRV_SUCCESS); - - mach_write_to_4(msg + SESS_SRV_MSG_DATA, graph->id); - - mutex_exit(&kernel_mutex); - - len = pars_write_query_param_info(msg + SESS_SRV_MSG_DATA + 4, graph); - - mutex_enter(&kernel_mutex); - - sess_srv_msg_send(sess, msg, SESS_SRV_MSG_DATA + 4 + len, - SESS_RELEASE_KERNEL); - sess_refer_count_dec(sess); -} - -/*************************************************************************** -Processes a client message which does not require SQL parsing. This function -decodes the client message built in SQLExecute. */ -static -void -sess_receive_command( -/*=================*/ - sess_t* sess, /* in: session */ - byte* cli_msg,/* in: client message */ - ulint len, /* in: message length */ - ulint type) /* in: message type */ -{ - proc_node_t* proc_node; - call_node_t* call_node; - dict_proc_t* dict_proc; - que_thr_t* thr; - que_t* graph; - ulint stat_id; - - UT_NOT_USED(len); - UT_NOT_USED(type); - - ut_ad(mutex_own(&kernel_mutex)); - - sess->client_waits = TRUE; - - stat_id = mach_read_from_4(cli_msg + SESS_CLI_MSG_DATA); - - /* Look for the statement from the list of query graphs */ - - graph = UT_LIST_GET_FIRST(sess->graphs); - - while (graph != NULL) { - - if (graph->id == stat_id) { - - break; - } - - graph = UT_LIST_GET_NEXT(graphs, graph); - } - - if (graph == NULL) { - /* Could not find the right graph: error */ - sess_error_low(sess, SESS_ERR_STMT_NOT_FOUND, NULL); - - return; - } - - if (graph->state != QUE_FORK_COMMAND_WAIT) { - sess_error_low(sess, SESS_ERR_STMT_NOT_READY, NULL); - - return; - } - -/* printf("To execute stat %lu\n", stat_id); */ - - if (graph->fork_type == QUE_FORK_PROCEDURE_CALL) { - /* It is a stored procedure call: retrieve a parsed copy of - the procedure from the dictionary cache */ - - mutex_exit(&kernel_mutex); - - call_node = que_fork_get_child(graph); - - graph = dict_procedure_reserve_parsed_copy( - call_node->procedure_def); - graph->trx = sess->trx; - - /* Retrieve the procedure input parameters from the message */ - - pars_proc_read_input_params_from_buf(graph, - cli_msg + SESS_CLI_MSG_DATA + 4); - mutex_enter(&kernel_mutex); - } else { - /* It is a create procedure command: add the procedure to the - dictionary cache */ - ut_ad(graph->fork_type == QUE_FORK_PROCEDURE); - - mutex_exit(&kernel_mutex); - - proc_node = que_fork_get_child(graph); - - dict_proc = dict_mem_procedure_create(proc_node->proc_id->name, - proc_node->sym_tab->sql_string, - graph); - - dict_procedure_add_to_cache(dict_proc); - - mutex_enter(&kernel_mutex); - - sess_srv_msg_send_simple(sess, SESS_SRV_SUCCESS, - SESS_RELEASE_KERNEL); - return; - } - - /* Choose a query thread for execution */ - thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0); - - ut_ad(thr); - - sess->trx->graph = graph; - - mutex_exit(&kernel_mutex); - - /* Run query threads with the kernel mutex released */ - - que_run_threads(thr); - - mutex_enter(&kernel_mutex); -} - -/*************************************************************************** -When a command has been completed, this function sends the message about it -to the client. */ - -void -sess_command_completed_message( -/*===========================*/ - sess_t* sess, /* in: session */ - byte* msg, /* in: message buffer */ - ulint len) /* in: message data length */ -{ - mutex_enter(&kernel_mutex); - - sess_srv_msg_send(sess, msg, SESS_SRV_MSG_DATA + len, - SESS_RELEASE_KERNEL); - mutex_exit(&kernel_mutex); -} - -/*************************************************************************** -Processes a break message from the client. */ -static -void -sess_receive_break( -/*===============*/ - sess_t* sess) /* in: session */ -{ - ut_ad(mutex_own(&kernel_mutex)); - - /* Rollback the latest incomplete SQL statement */ - - sess_error_low(sess, SESS_ERR_BREAK_BY_CLIENT, NULL); -} - -/*************************************************************************** -Processes a message from a client. NOTE: Releases the kernel mutex temporarily -when parsing an SQL string. */ - -void -sess_receive_msg_rel_kernel( -/*========================*/ - sess_t* sess, /* in: session */ - byte* str, /* in: message string */ - ulint len) /* in: message length */ -{ - dulint msg_no; - ulint msg_type; - ulint cont; - ibool is_big_msg = FALSE; - ibool client_waited; - - ut_ad(mutex_own(&kernel_mutex)); - ut_ad(!sess->disconnecting); - - client_waited = sess->client_waits; - - sess->client_waits = TRUE; - - if (sess->state == SESS_ERROR) { - - /* Send a buffered error message */ - sess_srv_msg_send_error(sess); - - return; - } - - if (FALSE == sess_cli_msg_check_consistency(str, len)) { - /* Message from the client was corrupted */ - - sess_error_low(sess, SESS_ERR_MSG_CORRUPTED, NULL); - - return; - } - - msg_no = sess_cli_msg_get_msg_no(str); - - UT_DULINT_INC(sess->msgs_recv); - - if (!UT_DULINT_EQ(msg_no, sess->msgs_recv)) { - - sess_error_low(sess, SESS_ERR_MSG_LOST, NULL); - - sess->msgs_recv = msg_no; - - return; - } - - msg_type = sess_cli_msg_get_type(str); - - if (msg_type == SESS_CLI_BREAK_EXECUTION) { - - sess_receive_break(sess); - - return; - } - - if (client_waited) { - /* Client sent an extraneous message which is not a break - command: an error */ - - sess_error_low(sess, SESS_ERR_EXTRANEOUS_MSG, NULL); - - return; - } - - /*-----------------------------------------------------------*/ - /* Handle big messages */ - - cont = sess_cli_msg_get_continue(str); - - if (cont == SESS_MSG_SINGLE_PART) { - if (sess->big_msg) { - - sess_error_low(sess, SESS_ERR_MSG_LOST, NULL); - - return; - } - } else { - ut_error; /* Not in use */ - - is_big_msg = sess_receive_msg_part(sess, str, len); - - if (is_big_msg) { - str = sess->big_msg; - len = sess->big_msg_len; - sess->big_msg = NULL; - } else { - return; - } - } - - /*-----------------------------------------------------------*/ - /* The session has received a complete message from the client */ - - ut_ad(!UT_LIST_GET_FIRST((sess->trx)->signals)); - - if (msg_type == SESS_CLI_PREPARE) { - /* Note that the kernel mutex is temporarily released when - the SQL string is parsed */ - - sess_receive_prepare(sess, str, len); - } else { - /* Note that the kernel mutex is temporarily released when the - command is executed */ - - sess_receive_command(sess, str, len, msg_type); - } - - if (is_big_msg) { - mem_free(str); - } -} - -/*********************************************************************** -Opens a new connection and creates a session. */ -static -ibool -sess_open_connection( -/*=================*/ - byte* str, /* in: message string */ - ulint len, /* in: string length */ - byte* addr, /* in: user address string */ - ulint alen) /* in: user address length */ -{ - dulint sess_id; - sess_t* sess; - - sess_id = mach_read_from_8(str + SESS_CLI_MSG_SESS_ID); - - if (!(UT_DULINT_EQ(sess_id, ut_dulint_zero)) - || !(sess_cli_msg_get_type(str) == SESS_CLI_CONNECT)) { - - /* It is not a valid connect message */ - - return(FALSE); - } - - ut_a(len == SESS_CLI_MSG_DATA); - - sess = sess_open(srv_sys->endpoint, addr, alen); - - sess_srv_msg_send_simple(sess, SESS_SRV_ACCEPT_CONNECT, - SESS_NOT_RELEASE_KERNEL); - return(TRUE); -} - -/*********************************************************************** -Starts a new connection and a session, or starts a query based on a client -message. This is called by a SRV_COM thread. */ - -void -sess_process_cli_msg( -/*=================*/ - byte* str, /* in: message string */ - ulint len, /* in: string length */ - byte* addr, /* in: address string */ - ulint alen) /* in: address length */ -{ - sess_t* sess; - ibool success; - - UT_NOT_USED(addr); - UT_NOT_USED(alen); - - mutex_enter(&kernel_mutex); - - sess = sess_cli_msg_get_sess(str, len); - - if (sess == NULL) { - /* There was no matching session */ - - if (sess_cli_msg_check_consistency(str, len)) { - - /* As the message is consistent, it may be a connect - message */ - - /* printf("%s\n", addr); */ - - success = sess_open_connection(str, len, addr, alen); - - if (success) { - mutex_exit(&kernel_mutex); - - return; - } - } - - /* Could not make sense of the message: write an error entry - to the system error log */ - - /* srv_err_log_insert( - "MESSAGE SENT TO AN UNKNOWN SESSION");*/ - ut_error; - - mutex_exit(&kernel_mutex); - - return; - } - - if (sess->disconnecting) { - - /* srv_err_log_insert( - "MESSAGE SENT TO A DISCONNECTING SESSION");*/ - ut_error; - - mutex_exit(&kernel_mutex); - - return; - } - - sess_receive_msg_rel_kernel(sess, str, len); - - mutex_exit(&kernel_mutex); -} |