diff options
Diffstat (limited to 'innobase/que/que0que.c')
-rw-r--r-- | innobase/que/que0que.c | 340 |
1 files changed, 59 insertions, 281 deletions
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c index 7e4babd43ef..b2608b6d175 100644 --- a/innobase/que/que0que.c +++ b/innobase/que/que0que.c @@ -7,7 +7,7 @@ Created 5/27/1996 Heikki Tuuri *******************************************************/ #include "que0que.h" - + #ifdef UNIV_NONINL #include "que0que.ic" #endif @@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri #include "log0log.h" #include "eval0proc.h" #include "eval0eval.h" -#include "odbc0odbc.h" #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256) #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256) @@ -73,37 +72,6 @@ void que_thr_move_to_run_state( /*======================*/ que_thr_t* thr); /* in: an query thread */ -/************************************************************************** -Tries to parallelize query if it is not parallel enough yet. */ -static -que_thr_t* -que_try_parallelize( -/*================*/ - /* out: next thread to execute */ - que_thr_t* thr); /* in: query thread */ - -#ifdef notdefined -/******************************************************************** -Adds info about the number of inserted rows etc. to the message to the -client. */ -static -void -que_thr_add_update_info( -/*====================*/ - que_thr_t* thr) /* in: query thread */ -{ - que_fork_t* graph; - - graph = thr->graph; - - mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_INSERTS, - graph->n_inserts); - mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_UPDATES, - graph->n_updates); - mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_DELETES, - graph->n_deletes); -} -#endif /*************************************************************************** Adds a query graph to the session's list of graphs. */ @@ -114,7 +82,9 @@ que_graph_publish( que_t* graph, /* in: graph */ sess_t* sess) /* in: session */ { +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ UT_LIST_ADD_LAST(graphs, sess->graphs, graph); } @@ -221,7 +191,9 @@ que_thr_end_wait( { ibool was_active; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(thr); ut_ad((thr->state == QUE_THR_LOCK_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT) @@ -260,7 +232,9 @@ que_thr_end_wait_no_next_thr( ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the only possible state here */ +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(thr); ut_ad((thr->state == QUE_THR_LOCK_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT) @@ -310,15 +284,9 @@ que_fork_start_command( QUE_THR_RUNNING state, or NULL; the query thread should be executed by que_run_threads by the caller */ - que_fork_t* fork, /* in: a query fork */ - ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */ - ulint param) /* in: possible parameter to the command */ + que_fork_t* fork) /* in: a query fork */ { que_thr_t* thr; - - /* Set the command parameters in the fork root */ - fork->command = command; - fork->param = param; fork->state = QUE_FORK_ACTIVE; @@ -401,7 +369,9 @@ que_fork_error_handle( { que_thr_t* thr; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(trx->sess->state == SESS_ERROR); ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0); ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); @@ -515,7 +485,7 @@ que_graph_free_recursive( "que_thr struct appears corrupt; magic n %lu\n", (unsigned long) thr->magic_n); mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } thr->magic_n = QUE_THR_MAGIC_FREED; @@ -627,7 +597,7 @@ que_graph_free_recursive( "que_node struct appears corrupt; type %lu\n", (unsigned long) que_node_get_type(node)); mem_analyze_corruption((byte*)node); - ut_a(0); + ut_error; } } @@ -671,7 +641,9 @@ que_graph_try_free( { sess_t* sess; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ sess = (graph->trx)->sess; @@ -696,65 +668,20 @@ does nothing! */ void que_thr_handle_error( /*=================*/ - que_thr_t* thr, /* in: query thread */ - ulint err_no, /* in: error number */ - byte* err_str,/* in, own: error string or NULL; NOTE: the + que_thr_t* thr __attribute__((unused)), + /* in: query thread */ + ulint err_no __attribute__((unused)), + /* in: error number */ + byte* err_str __attribute__((unused)), + /* in, own: error string or NULL; NOTE: the function will take care of freeing of the string! */ - ulint err_len)/* in: error string length */ + ulint err_len __attribute__((unused))) + /* in: error string length */ { - UT_NOT_USED(thr); - UT_NOT_USED(err_no); - UT_NOT_USED(err_str); - UT_NOT_USED(err_len); - /* Does nothing */ } -/************************************************************************** -Tries to parallelize query if it is not parallel enough yet. */ -static -que_thr_t* -que_try_parallelize( -/*================*/ - /* out: next thread to execute */ - que_thr_t* thr) /* in: query thread */ -{ - ut_ad(thr); - - /* Does nothing yet */ - - return(thr); -} - -/******************************************************************** -Builds a command completed-message to the client. */ -static -ulint -que_build_srv_msg( -/*==============*/ - /* out: message data length */ - byte* buf, /* in: message buffer */ - que_fork_t* fork, /* in: query graph where execution completed */ - sess_t* sess) /* in: session */ -{ - ulint len; - - /* Currently, we only support stored procedures: */ - ut_ad(fork->fork_type == QUE_FORK_PROCEDURE); - - if (sess->state == SESS_ERROR) { - - return(0); - } - - sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS); - - len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA, - fork); - return(len); -} - /******************************************************************** Performs an execution step on a thr node. */ static @@ -851,10 +778,6 @@ que_thr_dec_refer_count( que_fork_t* fork; trx_t* trx; sess_t* sess; - ibool send_srv_msg = FALSE; - ibool release_stored_proc = FALSE; - ulint msg_len = 0; - byte msg_buf[ODBC_DATAGRAM_SIZE]; ulint fork_type; ibool stopped; @@ -875,8 +798,8 @@ que_thr_dec_refer_count( already canceled before we came here: continue running the thread */ - /* printf( - "!!!!!!!!!! Wait already ended: continue thr\n"); */ + /* fputs("!!!!!!!! Wait already ended: continue thr\n", + stderr); */ if (next_thr && *next_thr == NULL) { *next_thr = thr; @@ -929,40 +852,13 @@ que_thr_dec_refer_count( } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) { /* Do nothing */ - } else if (fork->common.parent == NULL - && fork->caller == NULL - && UT_LIST_GET_LEN(trx->signals) == 0) { - - ut_a(0); /* not used in MySQL */ - - /* Reply to the client */ - - /* que_thr_add_update_info(thr); */ - - fork->state = QUE_FORK_COMMAND_WAIT; - - msg_len = que_build_srv_msg(msg_buf, fork, sess); - - send_srv_msg = TRUE; - - if (fork->fork_type == QUE_FORK_PROCEDURE) { - - release_stored_proc = TRUE; - } - - ut_ad(trx->graph == fork); - - trx->graph = NULL; } else { - /* Subprocedure calls not implemented yet */ - ut_a(0); + ut_error; /* not used in MySQL */ } } if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) { - ut_ad(!send_srv_msg); - /* If the trx is signaled and its query thread count drops to zero, then we start processing a signal; from it we may get a new query thread to run */ @@ -976,26 +872,6 @@ que_thr_dec_refer_count( } mutex_exit(&kernel_mutex); - - if (send_srv_msg) { - /* Note that, as we do not own the kernel mutex at this point, - and neither do we own it all the time when doing the actual - communication operation within the next function, it is - possible that the messages will not get delivered in the right - sequential order. This is possible if the client communicates - an extra message to the server while the message below is still - undelivered. But then the client should notice that there - is an error in the order numbers of the messages. */ - - sess_command_completed_message(sess, msg_buf, msg_len); - } - - if (release_stored_proc) { - - /* Return the stored procedure graph to the dictionary cache */ - - dict_procedure_release_parsed_copy(fork); - } } /************************************************************************** @@ -1013,7 +889,9 @@ que_thr_stop( que_t* graph; ibool ret = TRUE; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ graph = thr->graph; trx = graph->trx; @@ -1110,7 +988,7 @@ que_thr_move_to_run_state_for_mysql( mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } if (!thr->is_active) { @@ -1147,7 +1025,7 @@ que_thr_stop_for_mysql_no_error( mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } thr->state = QUE_THR_COMPLETED; @@ -1158,69 +1036,67 @@ que_thr_stop_for_mysql_no_error( trx->n_active_thrs--; } +#ifdef UNIV_DEBUG /************************************************************************** Prints info of an SQL query graph node. */ - +static void que_node_print_info( /*================*/ que_node_t* node) /* in: query graph node */ { - ulint type; - char* str; - ulint addr; + ulint type; + const char* str; type = que_node_get_type(node); - addr = (ulint)node; - if (type == QUE_NODE_SELECT) { - str = (char *) "SELECT"; + str = "SELECT"; } else if (type == QUE_NODE_INSERT) { - str = (char *) "INSERT"; + str = "INSERT"; } else if (type == QUE_NODE_UPDATE) { - str = (char *) "UPDATE"; + str = "UPDATE"; } else if (type == QUE_NODE_WHILE) { - str = (char *) "WHILE"; + str = "WHILE"; } else if (type == QUE_NODE_ASSIGNMENT) { - str = (char *) "ASSIGNMENT"; + str = "ASSIGNMENT"; } else if (type == QUE_NODE_IF) { - str = (char *) "IF"; + str = "IF"; } else if (type == QUE_NODE_FETCH) { - str = (char *) "FETCH"; + str = "FETCH"; } else if (type == QUE_NODE_OPEN) { - str = (char *) "OPEN"; + str = "OPEN"; } else if (type == QUE_NODE_PROC) { - str = (char *) "STORED PROCEDURE"; + str = "STORED PROCEDURE"; } else if (type == QUE_NODE_FUNC) { - str = (char *) "FUNCTION"; + str = "FUNCTION"; } else if (type == QUE_NODE_LOCK) { - str = (char *) "LOCK"; + str = "LOCK"; } else if (type == QUE_NODE_THR) { - str = (char *) "QUERY THREAD"; + str = "QUERY THREAD"; } else if (type == QUE_NODE_COMMIT) { - str = (char *) "COMMIT"; + str = "COMMIT"; } else if (type == QUE_NODE_UNDO) { - str = (char *) "UNDO ROW"; + str = "UNDO ROW"; } else if (type == QUE_NODE_PURGE) { - str = (char *) "PURGE ROW"; + str = "PURGE ROW"; } else if (type == QUE_NODE_ROLLBACK) { - str = (char *) "ROLLBACK"; + str = "ROLLBACK"; } else if (type == QUE_NODE_CREATE_TABLE) { - str = (char *) "CREATE TABLE"; + str = "CREATE TABLE"; } else if (type == QUE_NODE_CREATE_INDEX) { - str = (char *) "CREATE INDEX"; + str = "CREATE INDEX"; } else if (type == QUE_NODE_FOR) { - str = (char *) "FOR LOOP"; + str = "FOR LOOP"; } else if (type == QUE_NODE_RETURN) { - str = (char *) "RETURN"; + str = "RETURN"; } else { - str = (char *) "UNKNOWN NODE TYPE"; + str = "UNKNOWN NODE TYPE"; } - printf("Node type %lu: %s, address %lx\n", (unsigned long) type, str, - (unsigned long) addr); + fprintf(stderr, "Node type %lu: %s, address %p\n", (ulong) type, str, node); } +#endif /* UNIV_DEBUG */ /************************************************************************** Performs an execution step on a query thread. */ @@ -1249,7 +1125,7 @@ que_thr_step( #ifdef UNIV_DEBUG if (que_trace_on) { - printf("To execute: "); + fputs("To execute: ", stderr); que_node_print_info(node); } #endif @@ -1331,85 +1207,6 @@ que_thr_step( return(thr); } -/*********************************************************************** -Checks if there is a need for a query thread switch or stopping the current -thread. */ - -que_thr_t* -que_thr_check_if_switch( -/*====================*/ - que_thr_t* thr, /* in: current query thread */ - ulint* cumul_resource) /* in: amount of resources used - by the current call of que_run_threads - (resources used by the OS thread!) */ -{ - que_thr_t* next_thr; - ibool stopped; - - if (que_thr_peek_stop(thr)) { - - mutex_enter(&kernel_mutex); - - stopped = que_thr_stop(thr); - - mutex_exit(&kernel_mutex); - - if (stopped) { - /* If a signal is processed, we may get a new query - thread next_thr to run */ - - next_thr = NULL; - - que_thr_dec_refer_count(thr, &next_thr); - - if (next_thr == NULL) { - - return(NULL); - } - - thr = next_thr; - } - } - - if (thr->resource > QUE_PARALLELIZE_LIMIT) { - - /* Try parallelization of the query thread */ - thr = que_try_parallelize(thr); - - thr->resource = 0; - } - - (*cumul_resource)++; - - if (*cumul_resource > QUE_ROUND_ROBIN_LIMIT) { - - /* It is time to round-robin query threads in the - server task queue */ - - if (srv_get_thread_type() == SRV_COM) { - /* This OS thread is a SRV_COM thread: we put - the query thread to the task queue and return - to allow the OS thread to receive more - messages from clients */ - - ut_ad(thr->is_active); - - srv_que_task_enqueue(thr); - - return(NULL); - } else { - /* Change the query thread if there is another - in the server task queue */ - - thr = srv_que_round_robin(thr); - } - - *cumul_resource = 0; - } - - return(thr); -} - /************************************************************************** Runs query threads. Note that the individual query thread which is run within this function may change if, e.g., the OS thread executing this @@ -1425,7 +1222,9 @@ que_run_threads( ulint loop_count; ut_ad(thr->state == QUE_THR_RUNNING); +#ifdef UNIV_SYNC_DEBUG ut_ad(!mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ /* cumul_resource counts how much resources the OS thread (NOT the query thread) has spent in this function */ @@ -1433,27 +1232,6 @@ que_run_threads( loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK; cumul_resource = 0; loop: - if (loop_count >= QUE_MAX_LOOPS_WITHOUT_CHECK) { - -/* In MySQL this thread switch is never needed! - - loop_count = 0; - - next_thr = que_thr_check_if_switch(thr, &cumul_resource); - - if (next_thr != thr) { - if (next_thr == NULL) { - - return; - } - - loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK; - } - - thr = next_thr; -*/ - } - /* Check that there is enough space in the log to accommodate possible log entries by this query step; if the operation can touch more than about 4 pages, checks must be made also within the query |