diff options
Diffstat (limited to 'innobase/que/que0que.c')
-rw-r--r-- | innobase/que/que0que.c | 186 |
1 files changed, 55 insertions, 131 deletions
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c index 40483988c12..127e7f84576 100644 --- a/innobase/que/que0que.c +++ b/innobase/que/que0que.c @@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri #include "log0log.h" #include "eval0proc.h" #include "eval0eval.h" -#include "odbc0odbc.h" #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256) #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256) @@ -83,7 +82,9 @@ que_graph_publish( que_t* graph, /* in: graph */ sess_t* sess) /* in: session */ { +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ UT_LIST_ADD_LAST(graphs, sess->graphs, graph); } @@ -190,7 +191,9 @@ que_thr_end_wait( { ibool was_active; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(thr); ut_ad((thr->state == QUE_THR_LOCK_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT) @@ -229,7 +232,9 @@ que_thr_end_wait_no_next_thr( ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the only possible state here */ +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(thr); ut_ad((thr->state == QUE_THR_LOCK_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT) @@ -279,15 +284,9 @@ que_fork_start_command( QUE_THR_RUNNING state, or NULL; the query thread should be executed by que_run_threads by the caller */ - que_fork_t* fork, /* in: a query fork */ - ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */ - ulint param) /* in: possible parameter to the command */ + que_fork_t* fork) /* in: a query fork */ { que_thr_t* thr; - - /* Set the command parameters in the fork root */ - fork->command = command; - fork->param = param; fork->state = QUE_FORK_ACTIVE; @@ -370,7 +369,9 @@ que_fork_error_handle( { que_thr_t* thr; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ ut_ad(trx->sess->state == SESS_ERROR); ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0); ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); @@ -484,7 +485,7 @@ que_graph_free_recursive( "que_thr struct appears corrupt; magic n %lu\n", (unsigned long) thr->magic_n); mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } thr->magic_n = QUE_THR_MAGIC_FREED; @@ -596,7 +597,7 @@ que_graph_free_recursive( "que_node struct appears corrupt; type %lu\n", (unsigned long) que_node_get_type(node)); mem_analyze_corruption((byte*)node); - ut_a(0); + ut_error; } } @@ -640,7 +641,9 @@ que_graph_try_free( { sess_t* sess; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ sess = (graph->trx)->sess; @@ -665,50 +668,21 @@ does nothing! */ void que_thr_handle_error( /*=================*/ - que_thr_t* thr, /* in: query thread */ - ulint err_no, /* in: error number */ - byte* err_str,/* in, own: error string or NULL; NOTE: the + que_thr_t* thr __attribute((unused)), + /* in: query thread */ + ulint err_no __attribute((unused)), + /* in: error number */ + byte* err_str __attribute((unused)), + /* in, own: error string or NULL; NOTE: the function will take care of freeing of the string! */ - ulint err_len)/* in: error string length */ + ulint err_len __attribute((unused))) + /* in: error string length */ { - UT_NOT_USED(thr); - UT_NOT_USED(err_no); - UT_NOT_USED(err_str); - UT_NOT_USED(err_len); - /* Does nothing */ } /******************************************************************** -Builds a command completed-message to the client. */ -static -ulint -que_build_srv_msg( -/*==============*/ - /* out: message data length */ - byte* buf, /* in: message buffer */ - que_fork_t* fork, /* in: query graph where execution completed */ - sess_t* sess) /* in: session */ -{ - ulint len; - - /* Currently, we only support stored procedures: */ - ut_ad(fork->fork_type == QUE_FORK_PROCEDURE); - - if (sess->state == SESS_ERROR) { - - return(0); - } - - sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS); - - len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA, - fork); - return(len); -} - -/******************************************************************** Performs an execution step on a thr node. */ static que_thr_t* @@ -804,10 +778,6 @@ que_thr_dec_refer_count( que_fork_t* fork; trx_t* trx; sess_t* sess; - ibool send_srv_msg = FALSE; - ibool release_stored_proc = FALSE; - ulint msg_len = 0; - byte msg_buf[ODBC_DATAGRAM_SIZE]; ulint fork_type; ibool stopped; @@ -828,8 +798,8 @@ que_thr_dec_refer_count( already canceled before we came here: continue running the thread */ - /* printf( - "!!!!!!!!!! Wait already ended: continue thr\n"); */ + /* fputs("!!!!!!!! Wait already ended: continue thr\n", + stderr); */ if (next_thr && *next_thr == NULL) { *next_thr = thr; @@ -882,40 +852,13 @@ que_thr_dec_refer_count( } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) { /* Do nothing */ - } else if (fork->common.parent == NULL - && fork->caller == NULL - && UT_LIST_GET_LEN(trx->signals) == 0) { - - ut_a(0); /* not used in MySQL */ - - /* Reply to the client */ - - /* que_thr_add_update_info(thr); */ - - fork->state = QUE_FORK_COMMAND_WAIT; - - msg_len = que_build_srv_msg(msg_buf, fork, sess); - - send_srv_msg = TRUE; - - if (fork->fork_type == QUE_FORK_PROCEDURE) { - - release_stored_proc = TRUE; - } - - ut_ad(trx->graph == fork); - - trx->graph = NULL; } else { - /* Subprocedure calls not implemented yet */ - ut_a(0); + ut_error; /* not used in MySQL */ } } if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) { - ut_ad(!send_srv_msg); - /* If the trx is signaled and its query thread count drops to zero, then we start processing a signal; from it we may get a new query thread to run */ @@ -929,26 +872,6 @@ que_thr_dec_refer_count( } mutex_exit(&kernel_mutex); - - if (send_srv_msg) { - /* Note that, as we do not own the kernel mutex at this point, - and neither do we own it all the time when doing the actual - communication operation within the next function, it is - possible that the messages will not get delivered in the right - sequential order. This is possible if the client communicates - an extra message to the server while the message below is still - undelivered. But then the client should notice that there - is an error in the order numbers of the messages. */ - - sess_command_completed_message(sess, msg_buf, msg_len); - } - - if (release_stored_proc) { - - /* Return the stored procedure graph to the dictionary cache */ - - dict_procedure_release_parsed_copy(fork); - } } /************************************************************************** @@ -966,7 +889,9 @@ que_thr_stop( que_t* graph; ibool ret = TRUE; +#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ graph = thr->graph; trx = graph->trx; @@ -1063,7 +988,7 @@ que_thr_move_to_run_state_for_mysql( mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } if (!thr->is_active) { @@ -1100,7 +1025,7 @@ que_thr_stop_for_mysql_no_error( mem_analyze_corruption((byte*)thr); - ut_a(0); + ut_error; } thr->state = QUE_THR_COMPLETED; @@ -1119,59 +1044,56 @@ que_node_print_info( /*================*/ que_node_t* node) /* in: query graph node */ { - ulint type; - char* str; - ulint addr; + ulint type; + const char* str; type = que_node_get_type(node); - addr = (ulint)node; - if (type == QUE_NODE_SELECT) { - str = (char *) "SELECT"; + str = "SELECT"; } else if (type == QUE_NODE_INSERT) { - str = (char *) "INSERT"; + str = "INSERT"; } else if (type == QUE_NODE_UPDATE) { - str = (char *) "UPDATE"; + str = "UPDATE"; } else if (type == QUE_NODE_WHILE) { - str = (char *) "WHILE"; + str = "WHILE"; } else if (type == QUE_NODE_ASSIGNMENT) { - str = (char *) "ASSIGNMENT"; + str = "ASSIGNMENT"; } else if (type == QUE_NODE_IF) { - str = (char *) "IF"; + str = "IF"; } else if (type == QUE_NODE_FETCH) { - str = (char *) "FETCH"; + str = "FETCH"; } else if (type == QUE_NODE_OPEN) { - str = (char *) "OPEN"; + str = "OPEN"; } else if (type == QUE_NODE_PROC) { - str = (char *) "STORED PROCEDURE"; + str = "STORED PROCEDURE"; } else if (type == QUE_NODE_FUNC) { - str = (char *) "FUNCTION"; + str = "FUNCTION"; } else if (type == QUE_NODE_LOCK) { - str = (char *) "LOCK"; + str = "LOCK"; } else if (type == QUE_NODE_THR) { - str = (char *) "QUERY THREAD"; + str = "QUERY THREAD"; } else if (type == QUE_NODE_COMMIT) { - str = (char *) "COMMIT"; + str = "COMMIT"; } else if (type == QUE_NODE_UNDO) { - str = (char *) "UNDO ROW"; + str = "UNDO ROW"; } else if (type == QUE_NODE_PURGE) { - str = (char *) "PURGE ROW"; + str = "PURGE ROW"; } else if (type == QUE_NODE_ROLLBACK) { - str = (char *) "ROLLBACK"; + str = "ROLLBACK"; } else if (type == QUE_NODE_CREATE_TABLE) { - str = (char *) "CREATE TABLE"; + str = "CREATE TABLE"; } else if (type == QUE_NODE_CREATE_INDEX) { - str = (char *) "CREATE INDEX"; + str = "CREATE INDEX"; } else if (type == QUE_NODE_FOR) { - str = (char *) "FOR LOOP"; + str = "FOR LOOP"; } else if (type == QUE_NODE_RETURN) { - str = (char *) "RETURN"; + str = "RETURN"; } else { - str = (char *) "UNKNOWN NODE TYPE"; + str = "UNKNOWN NODE TYPE"; } - printf("Node type %lu: %s, address %lx\n", (unsigned long) type, str, + fprintf(stderr, "Node type %lu: %s, address %lx\n", (unsigned long) type, str, (unsigned long) addr); } @@ -1202,7 +1124,7 @@ que_thr_step( #ifdef UNIV_DEBUG if (que_trace_on) { - printf("To execute: "); + fputs("To execute: ", stderr); que_node_print_info(node); } #endif @@ -1299,7 +1221,9 @@ que_run_threads( ulint loop_count; ut_ad(thr->state == QUE_THR_RUNNING); +#ifdef UNIV_SYNC_DEBUG ut_ad(!mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ /* cumul_resource counts how much resources the OS thread (NOT the query thread) has spent in this function */ |