summaryrefslogtreecommitdiff
path: root/innobase/que/que0que.c
diff options
context:
space:
mode:
Diffstat (limited to 'innobase/que/que0que.c')
-rw-r--r--innobase/que/que0que.c340
1 files changed, 59 insertions, 281 deletions
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c
index 7e4babd43ef..b2608b6d175 100644
--- a/innobase/que/que0que.c
+++ b/innobase/que/que0que.c
@@ -7,7 +7,7 @@ Created 5/27/1996 Heikki Tuuri
*******************************************************/
#include "que0que.h"
-
+
#ifdef UNIV_NONINL
#include "que0que.ic"
#endif
@@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri
#include "log0log.h"
#include "eval0proc.h"
#include "eval0eval.h"
-#include "odbc0odbc.h"
#define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
#define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
@@ -73,37 +72,6 @@ void
que_thr_move_to_run_state(
/*======================*/
que_thr_t* thr); /* in: an query thread */
-/**************************************************************************
-Tries to parallelize query if it is not parallel enough yet. */
-static
-que_thr_t*
-que_try_parallelize(
-/*================*/
- /* out: next thread to execute */
- que_thr_t* thr); /* in: query thread */
-
-#ifdef notdefined
-/********************************************************************
-Adds info about the number of inserted rows etc. to the message to the
-client. */
-static
-void
-que_thr_add_update_info(
-/*====================*/
- que_thr_t* thr) /* in: query thread */
-{
- que_fork_t* graph;
-
- graph = thr->graph;
-
- mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_INSERTS,
- graph->n_inserts);
- mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_UPDATES,
- graph->n_updates);
- mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_DELETES,
- graph->n_deletes);
-}
-#endif
/***************************************************************************
Adds a query graph to the session's list of graphs. */
@@ -114,7 +82,9 @@ que_graph_publish(
que_t* graph, /* in: graph */
sess_t* sess) /* in: session */
{
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
}
@@ -221,7 +191,9 @@ que_thr_end_wait(
{
ibool was_active;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT)
@@ -260,7 +232,9 @@ que_thr_end_wait_no_next_thr(
ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
only possible state here */
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT)
@@ -310,15 +284,9 @@ que_fork_start_command(
QUE_THR_RUNNING state, or NULL; the query
thread should be executed by que_run_threads
by the caller */
- que_fork_t* fork, /* in: a query fork */
- ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */
- ulint param) /* in: possible parameter to the command */
+ que_fork_t* fork) /* in: a query fork */
{
que_thr_t* thr;
-
- /* Set the command parameters in the fork root */
- fork->command = command;
- fork->param = param;
fork->state = QUE_FORK_ACTIVE;
@@ -401,7 +369,9 @@ que_fork_error_handle(
{
que_thr_t* thr;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(trx->sess->state == SESS_ERROR);
ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
@@ -515,7 +485,7 @@ que_graph_free_recursive(
"que_thr struct appears corrupt; magic n %lu\n",
(unsigned long) thr->magic_n);
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
thr->magic_n = QUE_THR_MAGIC_FREED;
@@ -627,7 +597,7 @@ que_graph_free_recursive(
"que_node struct appears corrupt; type %lu\n",
(unsigned long) que_node_get_type(node));
mem_analyze_corruption((byte*)node);
- ut_a(0);
+ ut_error;
}
}
@@ -671,7 +641,9 @@ que_graph_try_free(
{
sess_t* sess;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
sess = (graph->trx)->sess;
@@ -696,65 +668,20 @@ does nothing! */
void
que_thr_handle_error(
/*=================*/
- que_thr_t* thr, /* in: query thread */
- ulint err_no, /* in: error number */
- byte* err_str,/* in, own: error string or NULL; NOTE: the
+ que_thr_t* thr __attribute__((unused)),
+ /* in: query thread */
+ ulint err_no __attribute__((unused)),
+ /* in: error number */
+ byte* err_str __attribute__((unused)),
+ /* in, own: error string or NULL; NOTE: the
function will take care of freeing of the
string! */
- ulint err_len)/* in: error string length */
+ ulint err_len __attribute__((unused)))
+ /* in: error string length */
{
- UT_NOT_USED(thr);
- UT_NOT_USED(err_no);
- UT_NOT_USED(err_str);
- UT_NOT_USED(err_len);
-
/* Does nothing */
}
-/**************************************************************************
-Tries to parallelize query if it is not parallel enough yet. */
-static
-que_thr_t*
-que_try_parallelize(
-/*================*/
- /* out: next thread to execute */
- que_thr_t* thr) /* in: query thread */
-{
- ut_ad(thr);
-
- /* Does nothing yet */
-
- return(thr);
-}
-
-/********************************************************************
-Builds a command completed-message to the client. */
-static
-ulint
-que_build_srv_msg(
-/*==============*/
- /* out: message data length */
- byte* buf, /* in: message buffer */
- que_fork_t* fork, /* in: query graph where execution completed */
- sess_t* sess) /* in: session */
-{
- ulint len;
-
- /* Currently, we only support stored procedures: */
- ut_ad(fork->fork_type == QUE_FORK_PROCEDURE);
-
- if (sess->state == SESS_ERROR) {
-
- return(0);
- }
-
- sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS);
-
- len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA,
- fork);
- return(len);
-}
-
/********************************************************************
Performs an execution step on a thr node. */
static
@@ -851,10 +778,6 @@ que_thr_dec_refer_count(
que_fork_t* fork;
trx_t* trx;
sess_t* sess;
- ibool send_srv_msg = FALSE;
- ibool release_stored_proc = FALSE;
- ulint msg_len = 0;
- byte msg_buf[ODBC_DATAGRAM_SIZE];
ulint fork_type;
ibool stopped;
@@ -875,8 +798,8 @@ que_thr_dec_refer_count(
already canceled before we came here: continue
running the thread */
- /* printf(
- "!!!!!!!!!! Wait already ended: continue thr\n"); */
+ /* fputs("!!!!!!!! Wait already ended: continue thr\n",
+ stderr); */
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
@@ -929,40 +852,13 @@ que_thr_dec_refer_count(
} else if (fork_type == QUE_FORK_MYSQL_INTERFACE) {
/* Do nothing */
- } else if (fork->common.parent == NULL
- && fork->caller == NULL
- && UT_LIST_GET_LEN(trx->signals) == 0) {
-
- ut_a(0); /* not used in MySQL */
-
- /* Reply to the client */
-
- /* que_thr_add_update_info(thr); */
-
- fork->state = QUE_FORK_COMMAND_WAIT;
-
- msg_len = que_build_srv_msg(msg_buf, fork, sess);
-
- send_srv_msg = TRUE;
-
- if (fork->fork_type == QUE_FORK_PROCEDURE) {
-
- release_stored_proc = TRUE;
- }
-
- ut_ad(trx->graph == fork);
-
- trx->graph = NULL;
} else {
- /* Subprocedure calls not implemented yet */
- ut_a(0);
+ ut_error; /* not used in MySQL */
}
}
if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
- ut_ad(!send_srv_msg);
-
/* If the trx is signaled and its query thread count drops to
zero, then we start processing a signal; from it we may get
a new query thread to run */
@@ -976,26 +872,6 @@ que_thr_dec_refer_count(
}
mutex_exit(&kernel_mutex);
-
- if (send_srv_msg) {
- /* Note that, as we do not own the kernel mutex at this point,
- and neither do we own it all the time when doing the actual
- communication operation within the next function, it is
- possible that the messages will not get delivered in the right
- sequential order. This is possible if the client communicates
- an extra message to the server while the message below is still
- undelivered. But then the client should notice that there
- is an error in the order numbers of the messages. */
-
- sess_command_completed_message(sess, msg_buf, msg_len);
- }
-
- if (release_stored_proc) {
-
- /* Return the stored procedure graph to the dictionary cache */
-
- dict_procedure_release_parsed_copy(fork);
- }
}
/**************************************************************************
@@ -1013,7 +889,9 @@ que_thr_stop(
que_t* graph;
ibool ret = TRUE;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
graph = thr->graph;
trx = graph->trx;
@@ -1110,7 +988,7 @@ que_thr_move_to_run_state_for_mysql(
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
if (!thr->is_active) {
@@ -1147,7 +1025,7 @@ que_thr_stop_for_mysql_no_error(
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
thr->state = QUE_THR_COMPLETED;
@@ -1158,69 +1036,67 @@ que_thr_stop_for_mysql_no_error(
trx->n_active_thrs--;
}
+#ifdef UNIV_DEBUG
/**************************************************************************
Prints info of an SQL query graph node. */
-
+static
void
que_node_print_info(
/*================*/
que_node_t* node) /* in: query graph node */
{
- ulint type;
- char* str;
- ulint addr;
+ ulint type;
+ const char* str;
type = que_node_get_type(node);
- addr = (ulint)node;
-
if (type == QUE_NODE_SELECT) {
- str = (char *) "SELECT";
+ str = "SELECT";
} else if (type == QUE_NODE_INSERT) {
- str = (char *) "INSERT";
+ str = "INSERT";
} else if (type == QUE_NODE_UPDATE) {
- str = (char *) "UPDATE";
+ str = "UPDATE";
} else if (type == QUE_NODE_WHILE) {
- str = (char *) "WHILE";
+ str = "WHILE";
} else if (type == QUE_NODE_ASSIGNMENT) {
- str = (char *) "ASSIGNMENT";
+ str = "ASSIGNMENT";
} else if (type == QUE_NODE_IF) {
- str = (char *) "IF";
+ str = "IF";
} else if (type == QUE_NODE_FETCH) {
- str = (char *) "FETCH";
+ str = "FETCH";
} else if (type == QUE_NODE_OPEN) {
- str = (char *) "OPEN";
+ str = "OPEN";
} else if (type == QUE_NODE_PROC) {
- str = (char *) "STORED PROCEDURE";
+ str = "STORED PROCEDURE";
} else if (type == QUE_NODE_FUNC) {
- str = (char *) "FUNCTION";
+ str = "FUNCTION";
} else if (type == QUE_NODE_LOCK) {
- str = (char *) "LOCK";
+ str = "LOCK";
} else if (type == QUE_NODE_THR) {
- str = (char *) "QUERY THREAD";
+ str = "QUERY THREAD";
} else if (type == QUE_NODE_COMMIT) {
- str = (char *) "COMMIT";
+ str = "COMMIT";
} else if (type == QUE_NODE_UNDO) {
- str = (char *) "UNDO ROW";
+ str = "UNDO ROW";
} else if (type == QUE_NODE_PURGE) {
- str = (char *) "PURGE ROW";
+ str = "PURGE ROW";
} else if (type == QUE_NODE_ROLLBACK) {
- str = (char *) "ROLLBACK";
+ str = "ROLLBACK";
} else if (type == QUE_NODE_CREATE_TABLE) {
- str = (char *) "CREATE TABLE";
+ str = "CREATE TABLE";
} else if (type == QUE_NODE_CREATE_INDEX) {
- str = (char *) "CREATE INDEX";
+ str = "CREATE INDEX";
} else if (type == QUE_NODE_FOR) {
- str = (char *) "FOR LOOP";
+ str = "FOR LOOP";
} else if (type == QUE_NODE_RETURN) {
- str = (char *) "RETURN";
+ str = "RETURN";
} else {
- str = (char *) "UNKNOWN NODE TYPE";
+ str = "UNKNOWN NODE TYPE";
}
- printf("Node type %lu: %s, address %lx\n", (unsigned long) type, str,
- (unsigned long) addr);
+ fprintf(stderr, "Node type %lu: %s, address %p\n", (ulong) type, str, node);
}
+#endif /* UNIV_DEBUG */
/**************************************************************************
Performs an execution step on a query thread. */
@@ -1249,7 +1125,7 @@ que_thr_step(
#ifdef UNIV_DEBUG
if (que_trace_on) {
- printf("To execute: ");
+ fputs("To execute: ", stderr);
que_node_print_info(node);
}
#endif
@@ -1331,85 +1207,6 @@ que_thr_step(
return(thr);
}
-/***********************************************************************
-Checks if there is a need for a query thread switch or stopping the current
-thread. */
-
-que_thr_t*
-que_thr_check_if_switch(
-/*====================*/
- que_thr_t* thr, /* in: current query thread */
- ulint* cumul_resource) /* in: amount of resources used
- by the current call of que_run_threads
- (resources used by the OS thread!) */
-{
- que_thr_t* next_thr;
- ibool stopped;
-
- if (que_thr_peek_stop(thr)) {
-
- mutex_enter(&kernel_mutex);
-
- stopped = que_thr_stop(thr);
-
- mutex_exit(&kernel_mutex);
-
- if (stopped) {
- /* If a signal is processed, we may get a new query
- thread next_thr to run */
-
- next_thr = NULL;
-
- que_thr_dec_refer_count(thr, &next_thr);
-
- if (next_thr == NULL) {
-
- return(NULL);
- }
-
- thr = next_thr;
- }
- }
-
- if (thr->resource > QUE_PARALLELIZE_LIMIT) {
-
- /* Try parallelization of the query thread */
- thr = que_try_parallelize(thr);
-
- thr->resource = 0;
- }
-
- (*cumul_resource)++;
-
- if (*cumul_resource > QUE_ROUND_ROBIN_LIMIT) {
-
- /* It is time to round-robin query threads in the
- server task queue */
-
- if (srv_get_thread_type() == SRV_COM) {
- /* This OS thread is a SRV_COM thread: we put
- the query thread to the task queue and return
- to allow the OS thread to receive more
- messages from clients */
-
- ut_ad(thr->is_active);
-
- srv_que_task_enqueue(thr);
-
- return(NULL);
- } else {
- /* Change the query thread if there is another
- in the server task queue */
-
- thr = srv_que_round_robin(thr);
- }
-
- *cumul_resource = 0;
- }
-
- return(thr);
-}
-
/**************************************************************************
Runs query threads. Note that the individual query thread which is run
within this function may change if, e.g., the OS thread executing this
@@ -1425,7 +1222,9 @@ que_run_threads(
ulint loop_count;
ut_ad(thr->state == QUE_THR_RUNNING);
+#ifdef UNIV_SYNC_DEBUG
ut_ad(!mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
/* cumul_resource counts how much resources the OS thread (NOT the
query thread) has spent in this function */
@@ -1433,27 +1232,6 @@ que_run_threads(
loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
cumul_resource = 0;
loop:
- if (loop_count >= QUE_MAX_LOOPS_WITHOUT_CHECK) {
-
-/* In MySQL this thread switch is never needed!
-
- loop_count = 0;
-
- next_thr = que_thr_check_if_switch(thr, &cumul_resource);
-
- if (next_thr != thr) {
- if (next_thr == NULL) {
-
- return;
- }
-
- loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
- }
-
- thr = next_thr;
-*/
- }
-
/* Check that there is enough space in the log to accommodate
possible log entries by this query step; if the operation can touch
more than about 4 pages, checks must be made also within the query