summaryrefslogtreecommitdiff
path: root/innobase/que/que0que.c
diff options
context:
space:
mode:
Diffstat (limited to 'innobase/que/que0que.c')
-rw-r--r--innobase/que/que0que.c186
1 files changed, 55 insertions, 131 deletions
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c
index 40483988c12..127e7f84576 100644
--- a/innobase/que/que0que.c
+++ b/innobase/que/que0que.c
@@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri
#include "log0log.h"
#include "eval0proc.h"
#include "eval0eval.h"
-#include "odbc0odbc.h"
#define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
#define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
@@ -83,7 +82,9 @@ que_graph_publish(
que_t* graph, /* in: graph */
sess_t* sess) /* in: session */
{
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
}
@@ -190,7 +191,9 @@ que_thr_end_wait(
{
ibool was_active;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT)
@@ -229,7 +232,9 @@ que_thr_end_wait_no_next_thr(
ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
only possible state here */
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT)
@@ -279,15 +284,9 @@ que_fork_start_command(
QUE_THR_RUNNING state, or NULL; the query
thread should be executed by que_run_threads
by the caller */
- que_fork_t* fork, /* in: a query fork */
- ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */
- ulint param) /* in: possible parameter to the command */
+ que_fork_t* fork) /* in: a query fork */
{
que_thr_t* thr;
-
- /* Set the command parameters in the fork root */
- fork->command = command;
- fork->param = param;
fork->state = QUE_FORK_ACTIVE;
@@ -370,7 +369,9 @@ que_fork_error_handle(
{
que_thr_t* thr;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
ut_ad(trx->sess->state == SESS_ERROR);
ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
@@ -484,7 +485,7 @@ que_graph_free_recursive(
"que_thr struct appears corrupt; magic n %lu\n",
(unsigned long) thr->magic_n);
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
thr->magic_n = QUE_THR_MAGIC_FREED;
@@ -596,7 +597,7 @@ que_graph_free_recursive(
"que_node struct appears corrupt; type %lu\n",
(unsigned long) que_node_get_type(node));
mem_analyze_corruption((byte*)node);
- ut_a(0);
+ ut_error;
}
}
@@ -640,7 +641,9 @@ que_graph_try_free(
{
sess_t* sess;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
sess = (graph->trx)->sess;
@@ -665,50 +668,21 @@ does nothing! */
void
que_thr_handle_error(
/*=================*/
- que_thr_t* thr, /* in: query thread */
- ulint err_no, /* in: error number */
- byte* err_str,/* in, own: error string or NULL; NOTE: the
+ que_thr_t* thr __attribute((unused)),
+ /* in: query thread */
+ ulint err_no __attribute((unused)),
+ /* in: error number */
+ byte* err_str __attribute((unused)),
+ /* in, own: error string or NULL; NOTE: the
function will take care of freeing of the
string! */
- ulint err_len)/* in: error string length */
+ ulint err_len __attribute((unused)))
+ /* in: error string length */
{
- UT_NOT_USED(thr);
- UT_NOT_USED(err_no);
- UT_NOT_USED(err_str);
- UT_NOT_USED(err_len);
-
/* Does nothing */
}
/********************************************************************
-Builds a command completed-message to the client. */
-static
-ulint
-que_build_srv_msg(
-/*==============*/
- /* out: message data length */
- byte* buf, /* in: message buffer */
- que_fork_t* fork, /* in: query graph where execution completed */
- sess_t* sess) /* in: session */
-{
- ulint len;
-
- /* Currently, we only support stored procedures: */
- ut_ad(fork->fork_type == QUE_FORK_PROCEDURE);
-
- if (sess->state == SESS_ERROR) {
-
- return(0);
- }
-
- sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS);
-
- len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA,
- fork);
- return(len);
-}
-
-/********************************************************************
Performs an execution step on a thr node. */
static
que_thr_t*
@@ -804,10 +778,6 @@ que_thr_dec_refer_count(
que_fork_t* fork;
trx_t* trx;
sess_t* sess;
- ibool send_srv_msg = FALSE;
- ibool release_stored_proc = FALSE;
- ulint msg_len = 0;
- byte msg_buf[ODBC_DATAGRAM_SIZE];
ulint fork_type;
ibool stopped;
@@ -828,8 +798,8 @@ que_thr_dec_refer_count(
already canceled before we came here: continue
running the thread */
- /* printf(
- "!!!!!!!!!! Wait already ended: continue thr\n"); */
+ /* fputs("!!!!!!!! Wait already ended: continue thr\n",
+ stderr); */
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
@@ -882,40 +852,13 @@ que_thr_dec_refer_count(
} else if (fork_type == QUE_FORK_MYSQL_INTERFACE) {
/* Do nothing */
- } else if (fork->common.parent == NULL
- && fork->caller == NULL
- && UT_LIST_GET_LEN(trx->signals) == 0) {
-
- ut_a(0); /* not used in MySQL */
-
- /* Reply to the client */
-
- /* que_thr_add_update_info(thr); */
-
- fork->state = QUE_FORK_COMMAND_WAIT;
-
- msg_len = que_build_srv_msg(msg_buf, fork, sess);
-
- send_srv_msg = TRUE;
-
- if (fork->fork_type == QUE_FORK_PROCEDURE) {
-
- release_stored_proc = TRUE;
- }
-
- ut_ad(trx->graph == fork);
-
- trx->graph = NULL;
} else {
- /* Subprocedure calls not implemented yet */
- ut_a(0);
+ ut_error; /* not used in MySQL */
}
}
if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
- ut_ad(!send_srv_msg);
-
/* If the trx is signaled and its query thread count drops to
zero, then we start processing a signal; from it we may get
a new query thread to run */
@@ -929,26 +872,6 @@ que_thr_dec_refer_count(
}
mutex_exit(&kernel_mutex);
-
- if (send_srv_msg) {
- /* Note that, as we do not own the kernel mutex at this point,
- and neither do we own it all the time when doing the actual
- communication operation within the next function, it is
- possible that the messages will not get delivered in the right
- sequential order. This is possible if the client communicates
- an extra message to the server while the message below is still
- undelivered. But then the client should notice that there
- is an error in the order numbers of the messages. */
-
- sess_command_completed_message(sess, msg_buf, msg_len);
- }
-
- if (release_stored_proc) {
-
- /* Return the stored procedure graph to the dictionary cache */
-
- dict_procedure_release_parsed_copy(fork);
- }
}
/**************************************************************************
@@ -966,7 +889,9 @@ que_thr_stop(
que_t* graph;
ibool ret = TRUE;
+#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
graph = thr->graph;
trx = graph->trx;
@@ -1063,7 +988,7 @@ que_thr_move_to_run_state_for_mysql(
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
if (!thr->is_active) {
@@ -1100,7 +1025,7 @@ que_thr_stop_for_mysql_no_error(
mem_analyze_corruption((byte*)thr);
- ut_a(0);
+ ut_error;
}
thr->state = QUE_THR_COMPLETED;
@@ -1119,59 +1044,56 @@ que_node_print_info(
/*================*/
que_node_t* node) /* in: query graph node */
{
- ulint type;
- char* str;
- ulint addr;
+ ulint type;
+ const char* str;
type = que_node_get_type(node);
- addr = (ulint)node;
-
if (type == QUE_NODE_SELECT) {
- str = (char *) "SELECT";
+ str = "SELECT";
} else if (type == QUE_NODE_INSERT) {
- str = (char *) "INSERT";
+ str = "INSERT";
} else if (type == QUE_NODE_UPDATE) {
- str = (char *) "UPDATE";
+ str = "UPDATE";
} else if (type == QUE_NODE_WHILE) {
- str = (char *) "WHILE";
+ str = "WHILE";
} else if (type == QUE_NODE_ASSIGNMENT) {
- str = (char *) "ASSIGNMENT";
+ str = "ASSIGNMENT";
} else if (type == QUE_NODE_IF) {
- str = (char *) "IF";
+ str = "IF";
} else if (type == QUE_NODE_FETCH) {
- str = (char *) "FETCH";
+ str = "FETCH";
} else if (type == QUE_NODE_OPEN) {
- str = (char *) "OPEN";
+ str = "OPEN";
} else if (type == QUE_NODE_PROC) {
- str = (char *) "STORED PROCEDURE";
+ str = "STORED PROCEDURE";
} else if (type == QUE_NODE_FUNC) {
- str = (char *) "FUNCTION";
+ str = "FUNCTION";
} else if (type == QUE_NODE_LOCK) {
- str = (char *) "LOCK";
+ str = "LOCK";
} else if (type == QUE_NODE_THR) {
- str = (char *) "QUERY THREAD";
+ str = "QUERY THREAD";
} else if (type == QUE_NODE_COMMIT) {
- str = (char *) "COMMIT";
+ str = "COMMIT";
} else if (type == QUE_NODE_UNDO) {
- str = (char *) "UNDO ROW";
+ str = "UNDO ROW";
} else if (type == QUE_NODE_PURGE) {
- str = (char *) "PURGE ROW";
+ str = "PURGE ROW";
} else if (type == QUE_NODE_ROLLBACK) {
- str = (char *) "ROLLBACK";
+ str = "ROLLBACK";
} else if (type == QUE_NODE_CREATE_TABLE) {
- str = (char *) "CREATE TABLE";
+ str = "CREATE TABLE";
} else if (type == QUE_NODE_CREATE_INDEX) {
- str = (char *) "CREATE INDEX";
+ str = "CREATE INDEX";
} else if (type == QUE_NODE_FOR) {
- str = (char *) "FOR LOOP";
+ str = "FOR LOOP";
} else if (type == QUE_NODE_RETURN) {
- str = (char *) "RETURN";
+ str = "RETURN";
} else {
- str = (char *) "UNKNOWN NODE TYPE";
+ str = "UNKNOWN NODE TYPE";
}
- printf("Node type %lu: %s, address %lx\n", (unsigned long) type, str,
+ fprintf(stderr, "Node type %lu: %s, address %lx\n", (unsigned long) type, str,
(unsigned long) addr);
}
@@ -1202,7 +1124,7 @@ que_thr_step(
#ifdef UNIV_DEBUG
if (que_trace_on) {
- printf("To execute: ");
+ fputs("To execute: ", stderr);
que_node_print_info(node);
}
#endif
@@ -1299,7 +1221,9 @@ que_run_threads(
ulint loop_count;
ut_ad(thr->state == QUE_THR_RUNNING);
+#ifdef UNIV_SYNC_DEBUG
ut_ad(!mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
/* cumul_resource counts how much resources the OS thread (NOT the
query thread) has spent in this function */