summaryrefslogtreecommitdiff
path: root/innobase
diff options
context:
space:
mode:
authorunknown <heikki@hundin.mysql.fi>2004-06-17 11:57:45 +0300
committerunknown <heikki@hundin.mysql.fi>2004-06-17 11:57:45 +0300
commita00824b53bf80562418ca4648b8ef31f6d6d57df (patch)
treeba9a44c75175f84c171bb8f340978de12ae2db1b /innobase
parent6a99971f1b3d2f93f93427ad877251328668dfec (diff)
downloadmariadb-git-a00824b53bf80562418ca4648b8ef31f6d6d57df.tar.gz
Cset exclude: marko@hundin.mysql.fi|ChangeSet|20040525171209|56870
BitKeeper/deleted/.del-srv0que.c~d1feebb77b5a9b96: Exclude BitKeeper/deleted/.del-srv0que.h~f12ecb4b5afe203e: Exclude innobase/include/que0que.h: Exclude innobase/include/trx0roll.h: Exclude innobase/include/trx0trx.h: Exclude innobase/include/usr0sess.h: Exclude innobase/que/que0que.c: Exclude innobase/srv/Makefile.am: Exclude innobase/include/Makefile.am: Exclude innobase/srv/srv0srv.c: Exclude innobase/trx/trx0purge.c: Exclude innobase/trx/trx0roll.c: Exclude innobase/trx/trx0trx.c: Exclude innobase/usr/usr0sess.c: Exclude
Diffstat (limited to 'innobase')
-rw-r--r--innobase/include/Makefile.am2
-rw-r--r--innobase/include/que0que.h22
-rw-r--r--innobase/include/trx0roll.h21
-rw-r--r--innobase/include/trx0trx.h31
-rw-r--r--innobase/include/usr0sess.h6
-rw-r--r--innobase/que/que0que.c107
-rw-r--r--innobase/srv/Makefile.am2
-rw-r--r--innobase/srv/srv0srv.c1
-rw-r--r--innobase/trx/trx0purge.c3
-rw-r--r--innobase/trx/trx0roll.c77
-rw-r--r--innobase/trx/trx0trx.c165
-rw-r--r--innobase/usr/usr0sess.c2
12 files changed, 338 insertions, 101 deletions
diff --git a/innobase/include/Makefile.am b/innobase/include/Makefile.am
index 5ec70da97a2..102d25566da 100644
--- a/innobase/include/Makefile.am
+++ b/innobase/include/Makefile.am
@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \
row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic \
row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic \
row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h \
- row0vers.ic srv0srv.h srv0srv.ic srv0start.h \
+ row0vers.ic srv0que.h srv0srv.h srv0srv.ic srv0start.h \
sync0arr.h sync0arr.ic sync0rw.h \
sync0rw.ic sync0sync.h sync0sync.ic sync0types.h \
thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h \
diff --git a/innobase/include/que0que.h b/innobase/include/que0que.h
index a438116781f..bcd7aed7e88 100644
--- a/innobase/include/que0que.h
+++ b/innobase/include/que0que.h
@@ -152,6 +152,17 @@ que_run_threads(
/*============*/
que_thr_t* thr); /* in: query thread which is run initially */
/**************************************************************************
+After signal handling is finished, returns control to a query graph error
+handling routine. (Currently, just returns the control to the root of the
+graph so that the graph can communicate an error message to the client.) */
+
+void
+que_fork_error_handle(
+/*==================*/
+ trx_t* trx, /* in: trx */
+ que_t* fork); /* in: query graph which was run before signal
+ handling started, NULL not allowed */
+/**************************************************************************
Handles an SQL error noticed during query thread execution. At the moment,
does nothing! */
@@ -170,15 +181,18 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure
completion. */
-que_thr_t*
+void
que_thr_end_wait(
/*=============*/
- /* out: next query thread to run;
- NULL if none */
- que_thr_t* thr); /* in: query thread in the
+ que_thr_t* thr, /* in: query thread in the
QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */
+ que_thr_t** next_thr); /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
/**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */
diff --git a/innobase/include/trx0roll.h b/innobase/include/trx0roll.h
index e9c74dc6651..0d7126c9c57 100644
--- a/innobase/include/trx0roll.h
+++ b/innobase/include/trx0roll.h
@@ -91,12 +91,16 @@ trx_undo_rec_release(
/*************************************************************************
Starts a rollback operation. */
-que_thr_t*
+void
trx_rollback(
/*=========*/
- /* out: next query thread to run */
trx_t* trx, /* in: transaction */
- trx_sig_t* sig); /* in: signal starting the rollback */
+ trx_sig_t* sig, /* in: signal starting the rollback */
+ que_thr_t** next_thr);/* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
/***********************************************************************
Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert
@@ -108,12 +112,17 @@ trx_rollback_or_clean_all_without_sess(void);
/********************************************************************
Finishes a transaction rollback. */
-que_thr_t*
+void
trx_finish_rollback_off_kernel(
/*===========================*/
- /* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */
- trx_t* trx); /* in: transaction */
+ trx_t* trx, /* in: transaction */
+ que_thr_t** next_thr);/* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if this parameter is
+ NULL, it is ignored */
/********************************************************************
Builds an undo 'query' graph for a transaction. The actual rollback is
performed by executing this query graph like a query subprocedure call.
diff --git a/innobase/include/trx0trx.h b/innobase/include/trx0trx.h
index 068333778f3..07d5e5a8215 100644
--- a/innobase/include/trx0trx.h
+++ b/innobase/include/trx0trx.h
@@ -194,10 +194,9 @@ trx_end_lock_wait(
/********************************************************************
Sends a signal to a trx object. */
-que_thr_t*
+ibool
trx_sig_send(
/*=========*/
- /* out: next query thread to run */
/* out: TRUE if the signal was
successfully delivered */
trx_t* trx, /* in: trx handle */
@@ -207,17 +206,27 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
- trx_savept_t* savept); /* in: possible rollback savepoint, or
+ trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */
+ que_thr_t** next_thr); /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the parameter
+ is NULL, it is ignored */
/********************************************************************
Send the reply message when a signal in the queue of the trx has
been handled. */
-que_thr_t*
+void
trx_sig_reply(
/*==========*/
- /* out: next query thread to run */
- trx_sig_t* sig); /* in: signal */
+ trx_sig_t* sig, /* in: signal */
+ que_thr_t** next_thr); /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
/********************************************************************
Removes the signal object from a trx signal queue. */
@@ -229,11 +238,15 @@ trx_sig_remove(
/********************************************************************
Starts handling of a trx signal. */
-que_thr_t*
+void
trx_sig_start_handle(
/*=================*/
- /* out: next query thread to run, or NULL */
- trx_t* trx); /* in: trx handle */
+ trx_t* trx, /* in: trx handle */
+ que_thr_t** next_thr); /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
/********************************************************************
Ends signal handling. If the session is in the error state, and
trx->graph_before_signal_handling != NULL, returns control to the error
diff --git a/innobase/include/usr0sess.h b/innobase/include/usr0sess.h
index aeff7191e68..c7bcfb20fed 100644
--- a/innobase/include/usr0sess.h
+++ b/innobase/include/usr0sess.h
@@ -38,6 +38,7 @@ sess_try_close(
/* The session handle. All fields are protected by the kernel mutex */
struct sess_struct{
+ ulint state; /* state of the session */
trx_t* trx; /* transaction object permanently
assigned for the session: the
transaction instance designated by the
@@ -48,6 +49,11 @@ struct sess_struct{
session */
};
+/* Session states */
+#define SESS_ACTIVE 1
+#define SESS_ERROR 2 /* session contains an error message
+ which has not yet been communicated
+ to the client */
#ifndef UNIV_NONINL
#include "usr0sess.ic"
#endif
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c
index 0a2e607807a..b90a5eb3a61 100644
--- a/innobase/que/que0que.c
+++ b/innobase/que/que0que.c
@@ -12,6 +12,7 @@ Created 5/27/1996 Heikki Tuuri
#include "que0que.ic"
#endif
+#include "srv0que.h"
#include "usr0sess.h"
#include "trx0trx.h"
#include "trx0roll.h"
@@ -174,15 +175,19 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure
completion. */
-que_thr_t*
+void
que_thr_end_wait(
/*=============*/
- /* out: next query thread to run;
- NULL if none */
- que_thr_t* thr) /* in: query thread in the
+ que_thr_t* thr, /* in: query thread in the
QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if NULL is passed
+ as the parameter, it is ignored */
{
ibool was_active;
@@ -201,8 +206,17 @@ que_thr_end_wait(
que_thr_move_to_run_state(thr);
- return(was_active ? NULL : thr);
-}
+ if (was_active) {
+
+ return;
+ }
+
+ if (next_thr && *next_thr == NULL) {
+ *next_thr = thr;
+ } else {
+ srv_que_task_enqueue_low(thr);
+ }
+}
/**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */
@@ -239,6 +253,8 @@ que_thr_end_wait_no_next_thr(
for the lock to be released: */
srv_release_mysql_thread_if_suspended(thr);
+
+ /* srv_que_task_enqueue_low(thr); */
}
/**************************************************************************
@@ -339,6 +355,48 @@ que_fork_start_command(
return(NULL);
}
+/**************************************************************************
+After signal handling is finished, returns control to a query graph error
+handling routine. (Currently, just returns the control to the root of the
+graph so that the graph can communicate an error message to the client.) */
+
+void
+que_fork_error_handle(
+/*==================*/
+ trx_t* trx __attribute__((unused)), /* in: trx */
+ que_t* fork) /* in: query graph which was run before signal
+ handling started, NULL not allowed */
+{
+ que_thr_t* thr;
+
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(trx->sess->state == SESS_ERROR);
+ ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
+ ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
+
+ thr = UT_LIST_GET_FIRST(fork->thrs);
+
+ while (thr != NULL) {
+ ut_ad(!thr->is_active);
+ ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
+ ut_ad(thr->state != QUE_THR_LOCK_WAIT);
+
+ thr->run_node = thr;
+ thr->prev_node = thr->child;
+ thr->state = QUE_THR_COMPLETED;
+
+ thr = UT_LIST_GET_NEXT(thrs, thr);
+ }
+
+ thr = UT_LIST_GET_FIRST(fork->thrs);
+
+ que_thr_move_to_run_state(thr);
+
+ srv_que_task_enqueue_low(thr);
+}
+
/********************************************************************
Tests if all the query threads in the same fork have a given state. */
UNIV_INLINE
@@ -707,18 +765,22 @@ this function may only be called from inside que_run_threads or
que_thr_check_if_switch! These restrictions exist to make the rollback code
easier to maintain. */
static
-que_thr_t*
+void
que_thr_dec_refer_count(
/*====================*/
- /* out: next query thread to run */
- que_thr_t* thr) /* in: query thread */
+ que_thr_t* thr, /* in: query thread */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
{
que_fork_t* fork;
trx_t* trx;
sess_t* sess;
ulint fork_type;
- que_thr_t* next_thr = NULL;
-
+ ibool stopped;
+
fork = thr->common.parent;
trx = thr->graph->trx;
sess = trx->sess;
@@ -729,7 +791,9 @@ que_thr_dec_refer_count(
if (thr->state == QUE_THR_RUNNING) {
- if (!que_thr_stop(thr)) {
+ stopped = que_thr_stop(thr);
+
+ if (!stopped) {
/* The reason for the thr suspension or wait was
already canceled before we came here: continue
running the thread */
@@ -737,9 +801,15 @@ que_thr_dec_refer_count(
/* fputs("!!!!!!!! Wait already ended: continue thr\n",
stderr); */
+ if (next_thr && *next_thr == NULL) {
+ *next_thr = thr;
+ } else {
+ srv_que_task_enqueue_low(thr);
+ }
+
mutex_exit(&kernel_mutex);
- return(thr);
+ return;
}
}
@@ -755,7 +825,7 @@ que_thr_dec_refer_count(
mutex_exit(&kernel_mutex);
- return(next_thr);
+ return;
}
fork_type = fork->fork_type;
@@ -771,7 +841,7 @@ que_thr_dec_refer_count(
ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
ut_ad(trx->handling_signals == TRUE);
- next_thr = trx_finish_rollback_off_kernel(fork, trx);
+ trx_finish_rollback_off_kernel(fork, trx, next_thr);
} else if (fork_type == QUE_FORK_PURGE) {
@@ -793,7 +863,7 @@ que_thr_dec_refer_count(
zero, then we start processing a signal; from it we may get
a new query thread to run */
- next_thr = trx_sig_start_handle(trx);
+ trx_sig_start_handle(trx, next_thr);
}
if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
@@ -802,8 +872,6 @@ que_thr_dec_refer_count(
}
mutex_exit(&kernel_mutex);
-
- return(next_thr);
}
/**************************************************************************
@@ -1175,7 +1243,6 @@ loop:
/*-------------------------*/
next_thr = que_thr_step(thr);
/*-------------------------*/
- ut_a(next_thr == thr || next_thr == NULL);
/* Test the effect on performance of adding extra mutex
reservations */
@@ -1192,7 +1259,7 @@ loop:
loop_count++;
if (next_thr != thr) {
- next_thr = que_thr_dec_refer_count(thr);
+ que_thr_dec_refer_count(thr, &next_thr);
if (next_thr == NULL) {
diff --git a/innobase/srv/Makefile.am b/innobase/srv/Makefile.am
index 37fd73a4bf0..752683b82b8 100644
--- a/innobase/srv/Makefile.am
+++ b/innobase/srv/Makefile.am
@@ -19,6 +19,6 @@ include ../include/Makefile.i
noinst_LIBRARIES = libsrv.a
-libsrv_a_SOURCES = srv0srv.c srv0start.c
+libsrv_a_SOURCES = srv0srv.c srv0que.c srv0start.c
EXTRA_PROGRAMS =
diff --git a/innobase/srv/srv0srv.c b/innobase/srv/srv0srv.c
index afc6b996b89..a78bd0d864c 100644
--- a/innobase/srv/srv0srv.c
+++ b/innobase/srv/srv0srv.c
@@ -34,6 +34,7 @@ Created 10/8/1995 Heikki Tuuri
#include "sync0sync.h"
#include "thr0loc.h"
#include "que0que.h"
+#include "srv0que.h"
#include "log0recv.h"
#include "pars0pars.h"
#include "usr0sess.h"
diff --git a/innobase/trx/trx0purge.c b/innobase/trx/trx0purge.c
index 9eae5c37335..a8b6b9fcc21 100644
--- a/innobase/trx/trx0purge.c
+++ b/innobase/trx/trx0purge.c
@@ -23,6 +23,7 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h"
#include "row0upd.h"
#include "trx0rec.h"
+#include "srv0que.h"
#include "os0thread.h"
/* The global data structure coordinating a purge */
@@ -1059,6 +1060,8 @@ trx_purge(void)
mutex_exit(&kernel_mutex);
+/* srv_que_task_enqueue(thr2); */
+
if (srv_print_thread_releases) {
fputs("Starting purge\n", stderr);
diff --git a/innobase/trx/trx0roll.c b/innobase/trx/trx0roll.c
index eccc9cab7f1..eed5e79a20f 100644
--- a/innobase/trx/trx0roll.c
+++ b/innobase/trx/trx0roll.c
@@ -20,6 +20,7 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0rec.h"
#include "que0que.h"
#include "usr0sess.h"
+#include "srv0que.h"
#include "srv0start.h"
#include "row0undo.h"
#include "row0mysql.h"
@@ -931,15 +932,21 @@ trx_undo_rec_release(
/*************************************************************************
Starts a rollback operation. */
-que_thr_t*
+void
trx_rollback(
/*=========*/
- /* out: next query thread to run */
trx_t* trx, /* in: transaction */
- trx_sig_t* sig) /* in: signal starting the rollback */
+ trx_sig_t* sig, /* in: signal starting the rollback */
+ que_thr_t** next_thr)/* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the passed value is
+ NULL, the parameter is ignored */
{
que_t* roll_graph;
que_thr_t* thr;
+/* que_thr_t* thr2; */
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
@@ -981,7 +988,18 @@ trx_rollback(
thr = que_fork_start_command(roll_graph);
ut_ad(thr);
- return(thr);
+
+/* thr2 = que_fork_start_command(roll_graph);
+
+ ut_ad(thr2); */
+
+ if (next_thr && (*next_thr == NULL)) {
+ *next_thr = thr;
+/* srv_que_task_enqueue_low(thr2); */
+ } else {
+ srv_que_task_enqueue_low(thr);
+/* srv_que_task_enqueue_low(thr2); */
+ }
}
/********************************************************************
@@ -1053,14 +1071,17 @@ trx_finish_error_processing(
/*************************************************************************
Finishes a partial rollback operation. */
static
-que_thr_t*
+void
trx_finish_partial_rollback_off_kernel(
/*===================================*/
- /* out: next query thread to run */
- trx_t* trx) /* in: transaction */
+ trx_t* trx, /* in: transaction */
+ que_thr_t** next_thr)/* in/out: next query thread to run;
+ if the value which is passed in is a pointer
+ to a NULL pointer, then the calling function
+ can start running a new query thread; if this
+ parameter is NULL, it is ignored */
{
trx_sig_t* sig;
- que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
@@ -1071,26 +1092,29 @@ trx_finish_partial_rollback_off_kernel(
/* Remove the signal from the signal queue and send reply message
to it */
- next_thr = trx_sig_reply(sig);
+ trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig);
trx->que_state = TRX_QUE_RUNNING;
- return(next_thr);
}
/********************************************************************
Finishes a transaction rollback. */
-que_thr_t*
+void
trx_finish_rollback_off_kernel(
/*===========================*/
- /* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */
- trx_t* trx) /* in: transaction */
+ trx_t* trx, /* in: transaction */
+ que_thr_t** next_thr)/* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if this parameter is
+ NULL, it is ignored */
{
trx_sig_t* sig;
trx_sig_t* next_sig;
- que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
@@ -1105,13 +1129,15 @@ trx_finish_rollback_off_kernel(
if (sig->type == TRX_SIG_ROLLBACK_TO_SAVEPT) {
- return(trx_finish_partial_rollback_off_kernel(trx));
+ trx_finish_partial_rollback_off_kernel(trx, next_thr);
+
+ return;
} else if (sig->type == TRX_SIG_ERROR_OCCURRED) {
trx_finish_error_processing(trx);
- return(NULL);
+ return;
}
if (lock_print_waits) {
@@ -1125,23 +1151,19 @@ trx_finish_rollback_off_kernel(
send reply messages to them */
trx->que_state = TRX_QUE_RUNNING;
-
- next_thr = NULL;
+
while (sig != NULL) {
next_sig = UT_LIST_GET_NEXT(signals, sig);
if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
- ut_a(next_thr == NULL);
- next_thr = trx_sig_reply(sig);
+ trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig);
}
sig = next_sig;
}
-
- return(next_thr);
}
/*************************************************************************
@@ -1174,6 +1196,7 @@ trx_rollback_step(
que_thr_t* thr) /* in: query thread */
{
roll_node_t* node;
+ ibool success;
ulint sig_no;
trx_savept_t* savept;
@@ -1200,13 +1223,19 @@ trx_rollback_step(
/* Send a rollback signal to the transaction */
- trx_sig_send(thr_get_trx(thr), sig_no, TRX_SIG_SELF,
- thr, savept);
+ success = trx_sig_send(thr_get_trx(thr),
+ sig_no, TRX_SIG_SELF,
+ thr, savept, NULL);
thr->state = QUE_THR_SIG_REPLY_WAIT;
mutex_exit(&kernel_mutex);
+ if (!success) {
+ /* Error in delivering the rollback signal */
+ que_thr_handle_error(thr, DB_ERROR, NULL, 0);
+ }
+
return(NULL);
}
diff --git a/innobase/trx/trx0trx.c b/innobase/trx/trx0trx.c
index 54993465f26..335e1f69228 100644
--- a/innobase/trx/trx0trx.c
+++ b/innobase/trx/trx0trx.c
@@ -895,15 +895,18 @@ trx_assign_read_view(
/********************************************************************
Commits a transaction. NOTE that the kernel mutex is temporarily released. */
static
-que_thr_t*
+void
trx_handle_commit_sig_off_kernel(
/*=============================*/
- /* out: next query thread to run */
- trx_t* trx) /* in: transaction */
+ trx_t* trx, /* in: transaction */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
{
trx_sig_t* sig;
trx_sig_t* next_sig;
- que_thr_t* next_thr = NULL;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
@@ -925,8 +928,7 @@ trx_handle_commit_sig_off_kernel(
if (sig->type == TRX_SIG_COMMIT) {
- ut_a(next_thr == NULL);
- next_thr = trx_sig_reply(sig);
+ trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig);
}
@@ -934,8 +936,6 @@ trx_handle_commit_sig_off_kernel(
}
trx->que_state = TRX_QUE_RUNNING;
-
- return(next_thr);
}
/***************************************************************
@@ -997,6 +997,39 @@ trx_lock_wait_to_suspended(
trx->que_state = TRX_QUE_RUNNING;
}
+/***************************************************************
+Moves the query threads in the sig reply wait list of trx to the SUSPENDED
+state. */
+static
+void
+trx_sig_reply_wait_to_suspended(
+/*============================*/
+ trx_t* trx) /* in: transaction */
+{
+ trx_sig_t* sig;
+ que_thr_t* thr;
+
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(mutex_own(&kernel_mutex));
+#endif /* UNIV_SYNC_DEBUG */
+
+ sig = UT_LIST_GET_FIRST(trx->reply_signals);
+
+ while (sig != NULL) {
+ thr = sig->receiver;
+
+ ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT);
+
+ thr->state = QUE_THR_SUSPENDED;
+
+ sig->receiver = NULL;
+
+ UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
+
+ sig = UT_LIST_GET_FIRST(trx->reply_signals);
+ }
+}
+
/*********************************************************************
Checks the compatibility of a new signal with the other signals in the
queue. */
@@ -1076,10 +1109,11 @@ trx_sig_is_compatible(
/********************************************************************
Sends a signal to a trx object. */
-que_thr_t*
+ibool
trx_sig_send(
/*=========*/
- /* out: next query thread to run */
+ /* out: TRUE if the signal was
+ successfully delivered */
trx_t* trx, /* in: trx handle */
ulint type, /* in: signal type */
ulint sender, /* in: TRX_SIG_SELF or
@@ -1087,8 +1121,14 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
- trx_savept_t* savept) /* in: possible rollback savepoint, or
+ trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the parameter
+ is NULL, it is ignored */
{
trx_sig_t* sig;
trx_t* receiver_trx;
@@ -1098,7 +1138,14 @@ trx_sig_send(
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
- ut_a(trx_sig_is_compatible(trx, type, sender));
+ if (!trx_sig_is_compatible(trx, type, sender)) {
+ /* The signal is not compatible with the other signals in
+ the queue: do nothing */
+
+ ut_error;
+
+ return(FALSE);
+ }
/* Queue the signal object */
@@ -1132,6 +1179,11 @@ trx_sig_send(
sig);
}
+ if (trx->sess->state == SESS_ERROR) {
+
+ trx_sig_reply_wait_to_suspended(trx);
+ }
+
if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) {
/* The following call will add a TRX_SIG_ERROR_OCCURRED
@@ -1146,10 +1198,10 @@ trx_sig_send(
if (UT_LIST_GET_FIRST(trx->signals) == sig) {
- return(trx_sig_start_handle(trx));
+ trx_sig_start_handle(trx, next_thr);
}
- return(NULL);
+ return(TRUE);
}
/********************************************************************
@@ -1171,18 +1223,27 @@ trx_end_signal_handling(
trx->handling_signals = FALSE;
trx->graph = trx->graph_before_signal_handling;
+
+ if (trx->graph && (trx->sess->state == SESS_ERROR)) {
+
+ que_fork_error_handle(trx, trx->graph);
+ }
}
/********************************************************************
Starts handling of a trx signal. */
-que_thr_t*
+void
trx_sig_start_handle(
/*=================*/
- /* out: next query thread to run, or NULL */
- trx_t* trx) /* in: trx handle */
+ trx_t* trx, /* in: trx handle */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the parameter
+ is NULL, it is ignored */
{
- que_thr_t* next_thr = NULL;
trx_sig_t* sig;
ulint type;
loop:
@@ -1198,7 +1259,7 @@ loop:
trx_end_signal_handling(trx);
- return(next_thr);
+ return;
}
if (trx->conc_state == TRX_NOT_STARTED) {
@@ -1214,13 +1275,23 @@ loop:
trx_lock_wait_to_suspended(trx);
}
+ /* If the session is in the error state and this trx has threads
+ waiting for reply from signals, moves these threads to the suspended
+ state, canceling wait reservations; note that if the transaction has
+ sent a commit or rollback signal to itself, and its session is not in
+ the error state, then nothing is done here. */
+
+ if (trx->sess->state == SESS_ERROR) {
+ trx_sig_reply_wait_to_suspended(trx);
+ }
+
/* If there are no running query threads, we can start processing of a
signal, otherwise we have to wait until all query threads of this
transaction are aware of the arrival of the signal. */
if (trx->n_active_thrs > 0) {
- return(NULL);
+ return;
}
if (trx->handling_signals == FALSE) {
@@ -1234,19 +1305,30 @@ loop:
if (type == TRX_SIG_COMMIT) {
- next_thr = trx_handle_commit_sig_off_kernel(trx);
+ trx_handle_commit_sig_off_kernel(trx, next_thr);
} else if ((type == TRX_SIG_TOTAL_ROLLBACK)
- || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)
- || (type == TRX_SIG_ERROR_OCCURRED)) {
+ || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) {
+
+ trx_rollback(trx, sig, next_thr);
+
+ /* No further signals can be handled until the rollback
+ completes, therefore we return */
+
+ return;
+
+ } else if (type == TRX_SIG_ERROR_OCCURRED) {
+
+ trx_rollback(trx, sig, next_thr);
+
/* No further signals can be handled until the rollback
completes, therefore we return */
- return(trx_rollback(trx, sig));
+ return;
} else if (type == TRX_SIG_BREAK_EXECUTION) {
- next_thr = trx_sig_reply(sig);
+ trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig);
} else {
ut_error;
@@ -1259,14 +1341,17 @@ loop:
Send the reply message when a signal in the queue of the trx has been
handled. */
-que_thr_t*
+void
trx_sig_reply(
/*==========*/
- /* out: next query thread to run */
- trx_sig_t* sig) /* in: signal */
+ trx_sig_t* sig, /* in: signal */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
{
- trx_t* receiver_trx;
- que_thr_t* next_thr = NULL;
+ trx_t* receiver_trx;
ut_ad(sig);
#ifdef UNIV_SYNC_DEBUG
@@ -1280,13 +1365,13 @@ trx_sig_reply(
UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals,
sig);
- next_thr = que_thr_end_wait(sig->receiver);
+ ut_ad(receiver_trx->sess->state != SESS_ERROR);
+
+ que_thr_end_wait(sig->receiver, next_thr);
sig->receiver = NULL;
}
-
- return(next_thr);
}
/********************************************************************
@@ -1342,6 +1427,7 @@ trx_commit_step(
{
commit_node_t* node;
que_thr_t* next_thr;
+ ibool success;
node = thr->run_node;
@@ -1356,15 +1442,22 @@ trx_commit_step(
node->state = COMMIT_NODE_WAIT;
+ next_thr = NULL;
+
thr->state = QUE_THR_SIG_REPLY_WAIT;
/* Send the commit signal to the transaction */
- next_thr = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
- TRX_SIG_SELF, thr, NULL);
-
+ success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
+ TRX_SIG_SELF, thr, NULL, &next_thr);
+
mutex_exit(&kernel_mutex);
+ if (!success) {
+ /* Error in delivering the commit signal */
+ que_thr_handle_error(thr, DB_ERROR, NULL, 0);
+ }
+
return(next_thr);
}
diff --git a/innobase/usr/usr0sess.c b/innobase/usr/usr0sess.c
index cc016f2b823..359c1552421 100644
--- a/innobase/usr/usr0sess.c
+++ b/innobase/usr/usr0sess.c
@@ -37,6 +37,8 @@ sess_open(void)
#endif /* UNIV_SYNC_DEBUG */
sess = mem_alloc(sizeof(sess_t));
+ sess->state = SESS_ACTIVE;
+
sess->trx = trx_create(sess);
UT_LIST_INIT(sess->graphs);